You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/10/25 16:55:37 UTC

[1/3] carbondata git commit: [CARBONDATA-3012] Added support for full scan queries for vector direct fill.

Repository: carbondata
Updated Branches:
  refs/heads/master e0baa9b9f -> 3d3b6ff16


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
index 803715c..471f9b2 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnarBatch.java
@@ -56,7 +56,9 @@ public class CarbonColumnarBatch {
     actualSize = 0;
     rowCounter = 0;
     rowsFiltered = 0;
-    Arrays.fill(filteredRows, false);
+    if (filteredRows != null) {
+      Arrays.fill(filteredRows, false);
+    }
     for (int i = 0; i < columnVectors.length; i++) {
       columnVectors[i].reset();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java
index 50d2ac5..2147c43 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonDictionary.java
@@ -27,4 +27,6 @@ public interface CarbonDictionary  {
   void setDictionaryUsed();
 
   byte[] getDictionaryValue(int index);
+
+  byte[][] getAllDictionaryValues();
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
index 59117dd..d127728 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/ColumnVectorInfo.java
@@ -16,7 +16,10 @@
  */
 package org.apache.carbondata.core.scan.result.vector;
 
+import java.util.BitSet;
+
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
@@ -32,6 +35,8 @@ public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> {
   public DirectDictionaryGenerator directDictionaryGenerator;
   public MeasureDataVectorProcessor.MeasureVectorFiller measureVectorFiller;
   public GenericQueryType genericQueryType;
+  public BitSet deletedRows;
+  public DecimalConverterFactory.DecimalConverter decimalConverter;
 
   @Override public int compareTo(ColumnVectorInfo o) {
     return ordinal - o.ordinal;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
index f8f663f..5dfd6ca 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -146,7 +146,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
     }
   }
 
-  @Override public void putBytes(int rowId, byte[] value) {
+  @Override public void putByteArray(int rowId, byte[] value) {
     bytes[rowId] = value;
   }
 
@@ -160,7 +160,7 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
     }
   }
 
-  @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+  @Override public void putByteArray(int rowId, int offset, int length, byte[] value) {
     bytes[rowId] = new byte[length];
     System.arraycopy(value, offset, bytes[rowId], 0, length);
   }
@@ -227,6 +227,31 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
     }
   }
 
+  public Object getDataArray() {
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
+      return  byteArr;
+    } else if (dataType == DataTypes.SHORT) {
+      return shorts;
+    } else if (dataType == DataTypes.INT) {
+      return ints;
+    } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
+      return longs;
+    } else if (dataType == DataTypes.FLOAT) {
+      return floats;
+    } else if (dataType == DataTypes.DOUBLE) {
+      return doubles;
+    } else if (dataType instanceof DecimalType) {
+      return decimals;
+    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+      if (null != carbonDictionary) {
+        return ints;
+      }
+      return bytes;
+    } else {
+      return data;
+    }
+  }
+
   @Override public void reset() {
     nullBytes.clear();
     if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
@@ -287,4 +312,42 @@ public class CarbonColumnVectorImpl implements CarbonColumnVector {
    * as an optimization to prevent setting nulls.
    */
   public final boolean anyNullsSet() { return anyNullsSet; }
+
+  @Override public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      floats[rowId ++] = src[i];
+    }
+  }
+
+  @Override public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      shorts[rowId ++] = src[i];
+    }
+  }
+
+  @Override public void putInts(int rowId, int count, int[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      ints[rowId ++] = src[i];
+    }
+  }
+
+  @Override public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      longs[rowId ++] = src[i];
+    }
+  }
+
+  @Override public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      doubles[rowId ++] = src[i];
+    }
+  }
+
+  @Override public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      byteArr[rowId ++] = src[i];
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java
index cc3a03c..c8fd573 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/impl/CarbonDictionaryImpl.java
@@ -51,4 +51,7 @@ public class CarbonDictionaryImpl implements CarbonDictionary {
     return dictionary[index];
   }
 
+  @Override public byte[][] getAllDictionaryValues() {
+    return dictionary;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
index 4ec8cb6..62674bc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/impl/BlockletFullScanner.java
@@ -123,7 +123,9 @@ public class BlockletFullScanner implements BlockletScanner {
       }
     }
     scannedResult.setPageFilteredRowCount(numberOfRows);
-    scannedResult.fillDataChunks();
+    if (!blockExecutionInfo.isDirectVectorFill()) {
+      scannedResult.fillDataChunks();
+    }
     // adding statistics for carbon scan time
     QueryStatistic scanTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.SCAN_BLOCKlET_TIME);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
index 9635896..2a294ae 100644
--- a/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/stats/QueryStatisticsModel.java
@@ -20,11 +20,20 @@ package org.apache.carbondata.core.stats;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+
 public class QueryStatisticsModel {
+
   private QueryStatisticsRecorder recorder;
+
   private Map<String, QueryStatistic> statisticsTypeAndObjMap =
       new HashMap<String, QueryStatistic>();
 
+  private boolean isEnabled = Boolean.parseBoolean(CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+          CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT));
+
   public QueryStatisticsRecorder getRecorder() {
     return recorder;
   }
@@ -36,4 +45,8 @@ public class QueryStatisticsModel {
   public Map<String, QueryStatistic> getStatisticsTypeAndObjMap() {
     return statisticsTypeAndObjMap;
   }
+
+  public boolean isEnabled() {
+    return isEnabled;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index 596d1dd..6188948 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -733,4 +733,12 @@ public final class ByteUtil {
   public static float toXorFloat(byte[] value, int offset, int length) {
     return Float.intBitsToFloat(toXorInt(value, offset, length));
   }
+
+  public static int[] toIntArray(byte[] data, int size) {
+    int[] ints = new int[size];
+    for (int i = 0; i < ints.length; i++) {
+      ints[i] = ByteUtil.valueOf3Bytes(data, i * 3);
+    }
+    return ints;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java b/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java
index b9e90d6..2662cee 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/filter/executer/IncludeFilterExecuterImplTest.java
@@ -270,11 +270,11 @@ public class IncludeFilterExecuterImplTest extends TestCase {
     long newTime = 0;
     long start;
     long end;
-    
+
     // dimension's data number in a blocklet, usually default is 32000
-    int dataChunkSize = 32000; 
+    int dataChunkSize = 32000;
     //  repeat query times in the test
-    int queryTimes = 10000;    
+    int queryTimes = 10000;
     // repeated times for a dictionary value
     int repeatTimes = 200;
     //filtered value count in a blocklet

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index a4abc61..ecd61bd 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -807,7 +807,7 @@ public class CarbonUtilTest {
         .getFirstIndexUsingBinarySearch(fixedLengthDimensionDataChunk, 1, 3, compareValue, true);
     assertEquals(2, result);
   }
-  
+
   @Test
   public void testBinaryRangeSearch() {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
index b843709..7d6eda0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonColumnVectorWrapper.java
@@ -150,24 +150,24 @@ public class CarbonColumnVectorWrapper implements CarbonColumnVector {
     }
   }
 
-  @Override public void putBytes(int rowId, byte[] value) {
+  @Override public void putByteArray(int rowId, byte[] value) {
     if (!filteredRows[rowId]) {
-      columnVector.putBytes(counter++, value);
+      columnVector.putByteArray(counter++, value);
     }
   }
 
   @Override public void putBytes(int rowId, int count, byte[] value) {
     for (int i = 0; i < count; i++) {
       if (!filteredRows[rowId]) {
-        columnVector.putBytes(counter++, value);
+        columnVector.putByteArray(counter++, value);
       }
       rowId++;
     }
   }
 
-  @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+  @Override public void putByteArray(int rowId, int offset, int length, byte[] value) {
     if (!filteredRows[rowId]) {
-      columnVector.putBytes(counter++, offset, length, value);
+      columnVector.putByteArray(counter++, offset, length, value);
     }
   }
 
@@ -246,4 +246,59 @@ public class CarbonColumnVectorWrapper implements CarbonColumnVector {
     return this.columnVector;
   }
 
+  @Override public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        columnVector.putFloat(counter++, src[i]);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        columnVector.putShort(counter++, src[i]);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putInts(int rowId, int count, int[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        columnVector.putInt(counter++, src[i]);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        columnVector.putLong(counter++, src[i]);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        columnVector.putDouble(counter++, src[i]);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        columnVector.putByte(counter++, src[i]);
+      }
+      rowId++;
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index 39fd19a..ab270fc 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -71,11 +71,11 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
     values[rowId] = value;
   }
 
-  @Override public void putBytes(int rowId, byte[] value) {
+  @Override public void putByteArray(int rowId, byte[] value) {
     type.writeSlice(builder, wrappedBuffer(value));
   }
 
-  @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+  @Override public void putByteArray(int rowId, int offset, int length, byte[] value) {
     byte[] byteArr = new byte[length];
     System.arraycopy(value, offset, byteArr, 0, length);
     type.writeSlice(builder, wrappedBuffer(byteArr));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
index 73786c8..c96e643 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
@@ -57,15 +57,25 @@ class AllDataTypesTestCaseFilter extends QueryTest with BeforeAndAfterAll {
   test("verify like query ends with filter push down") {
     val df = sql("select * from alldatatypestableFilter where empname like '%nandh'").queryExecution
       .sparkPlan
-    assert(df.asInstanceOf[CarbonDataSourceScan].metadata
-      .get("PushedFilters").get.contains("CarbonEndsWith"))
+    if (df.isInstanceOf[CarbonDataSourceScan]) {
+      assert(df.asInstanceOf[CarbonDataSourceScan].metadata
+        .get("PushedFilters").get.contains("CarbonEndsWith"))
+    } else {
+      assert(df.children.head.asInstanceOf[CarbonDataSourceScan].metadata
+        .get("PushedFilters").get.contains("CarbonEndsWith"))
+    }
   }
 
   test("verify like query contains with filter push down") {
     val df = sql("select * from alldatatypestableFilter where empname like '%nand%'").queryExecution
       .sparkPlan
-    assert(df.asInstanceOf[CarbonDataSourceScan].metadata
-      .get("PushedFilters").get.contains("CarbonContainsWith"))
+    if (df.isInstanceOf[CarbonDataSourceScan]) {
+      assert(df.asInstanceOf[CarbonDataSourceScan].metadata
+        .get("PushedFilters").get.contains("CarbonContainsWith"))
+    } else {
+      assert(df.children.head.asInstanceOf[CarbonDataSourceScan].metadata
+        .get("PushedFilters").get.contains("CarbonContainsWith"))
+    }
   }
   
   override def afterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 5121027..a605134 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -29,7 +29,9 @@ import org.apache.spark.sql.types.Decimal;
 
 class ColumnarVectorWrapper implements CarbonColumnVector {
 
-  private CarbonVectorProxy sparkColumnVectorProxy;
+  private CarbonVectorProxy.ColumnVectorProxy sparkColumnVectorProxy;
+
+  private CarbonVectorProxy carbonVectorProxy;
 
   private boolean[] filteredRows;
 
@@ -47,8 +49,9 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   ColumnarVectorWrapper(CarbonVectorProxy writableColumnVector,
       boolean[] filteredRows, int ordinal) {
-    this.sparkColumnVectorProxy = writableColumnVector;
+    this.sparkColumnVectorProxy = writableColumnVector.getColumnVector(ordinal);
     this.filteredRows = filteredRows;
+    this.carbonVectorProxy = writableColumnVector;
     this.ordinal = ordinal;
   }
 
@@ -167,7 +170,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     }
   }
 
-  @Override public void putBytes(int rowId, byte[] value) {
+  @Override public void putByteArray(int rowId, byte[] value) {
     if (!filteredRows[rowId]) {
       sparkColumnVectorProxy.putByteArray(counter++, value, ordinal);
     }
@@ -182,7 +185,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     }
   }
 
-  @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+  @Override public void putByteArray(int rowId, int offset, int length, byte[] value) {
     if (!filteredRows[rowId]) {
       sparkColumnVectorProxy.putByteArray(counter++, value, offset, length, ordinal);
     }
@@ -276,12 +279,67 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
   }
 
   public void reserveDictionaryIds() {
-    sparkColumnVectorProxy.reserveDictionaryIds(sparkColumnVectorProxy.numRows(), ordinal);
-    dictionaryVector = new ColumnarVectorWrapper(sparkColumnVectorProxy, filteredRows, ordinal);
+    sparkColumnVectorProxy.reserveDictionaryIds(carbonVectorProxy.numRows(), ordinal);
+    dictionaryVector = new ColumnarVectorWrapper(carbonVectorProxy, filteredRows, ordinal);
     ((ColumnarVectorWrapper) dictionaryVector).isDictionary = true;
   }
 
   @Override public CarbonColumnVector getDictionaryVector() {
     return dictionaryVector;
   }
+
+  @Override public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        sparkColumnVectorProxy.putFloat(counter++, src[i], ordinal);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        sparkColumnVectorProxy.putShort(counter++, src[i], ordinal);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putInts(int rowId, int count, int[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        sparkColumnVectorProxy.putInt(counter++, src[i], ordinal);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        sparkColumnVectorProxy.putLong(counter++, src[i], ordinal);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        sparkColumnVectorProxy.putDouble(counter++, src[i], ordinal);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+    for (int i = srcIndex; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        sparkColumnVectorProxy.putByte(counter++, src[i], ordinal);
+      }
+      rowId++;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
new file mode 100644
index 0000000..b55749e
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapperDirect.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.vectorreader;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
+import org.apache.spark.sql.CarbonVectorProxy;
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil;
+import org.apache.spark.sql.types.Decimal;
+
+/**
+ * Fills the vector directly with out considering any deleted rows.
+ */
+class ColumnarVectorWrapperDirect implements CarbonColumnVector {
+
+  /**
+   * It is one column vector adapter class.
+   */
+  protected CarbonVectorProxy.ColumnVectorProxy sparkColumnVectorProxy;
+
+  /**
+   * It is adapter class of complete ColumnarBatch.
+   */
+  protected CarbonVectorProxy carbonVectorProxy;
+
+  protected int ordinal;
+
+  protected boolean isDictionary;
+
+  private DataType blockDataType;
+
+  private CarbonColumnVector dictionaryVector;
+
+  ColumnarVectorWrapperDirect(CarbonVectorProxy writableColumnVector, int ordinal) {
+    this.sparkColumnVectorProxy = writableColumnVector.getColumnVector(ordinal);
+    this.carbonVectorProxy = writableColumnVector;
+    this.ordinal = ordinal;
+  }
+
+  @Override public void putBoolean(int rowId, boolean value) {
+    sparkColumnVectorProxy.putBoolean(rowId, value, ordinal);
+  }
+
+  @Override public void putFloat(int rowId, float value) {
+    sparkColumnVectorProxy.putFloat(rowId, value, ordinal);
+  }
+
+  @Override public void putShort(int rowId, short value) {
+    sparkColumnVectorProxy.putShort(rowId, value, ordinal);
+  }
+
+  @Override public void putShorts(int rowId, int count, short value) {
+    sparkColumnVectorProxy.putShorts(rowId, count, value, ordinal);
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    if (isDictionary) {
+      sparkColumnVectorProxy.putDictionaryInt(rowId, value, ordinal);
+    } else {
+      sparkColumnVectorProxy.putInt(rowId, value, ordinal);
+    }
+  }
+
+  @Override public void putInts(int rowId, int count, int value) {
+    sparkColumnVectorProxy.putInts(rowId, count, value, ordinal);
+  }
+
+  @Override public void putLong(int rowId, long value) {
+    sparkColumnVectorProxy.putLong(rowId, value, ordinal);
+  }
+
+  @Override public void putLongs(int rowId, int count, long value) {
+    sparkColumnVectorProxy.putLongs(rowId, count, value, ordinal);
+  }
+
+  @Override public void putDecimal(int rowId, BigDecimal value, int precision) {
+    Decimal toDecimal = Decimal.apply(value);
+    sparkColumnVectorProxy.putDecimal(rowId, toDecimal, precision, ordinal);
+  }
+
+  @Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
+    Decimal decimal = Decimal.apply(value);
+    for (int i = 0; i < count; i++) {
+      sparkColumnVectorProxy.putDecimal(rowId, decimal, precision, ordinal);
+      rowId++;
+    }
+  }
+
+  @Override public void putDouble(int rowId, double value) {
+    sparkColumnVectorProxy.putDouble(rowId, value, ordinal);
+  }
+
+  @Override public void putDoubles(int rowId, int count, double value) {
+    sparkColumnVectorProxy.putDoubles(rowId, count, value, ordinal);
+  }
+
+  @Override public void putByteArray(int rowId, byte[] value) {
+    sparkColumnVectorProxy.putByteArray(rowId, value, ordinal);
+  }
+
+  @Override
+  public void putBytes(int rowId, int count, byte[] value) {
+    for (int i = 0; i < count; i++) {
+      sparkColumnVectorProxy.putByteArray(rowId, value, ordinal);
+      rowId++;
+    }
+  }
+
+  @Override public void putByteArray(int rowId, int offset, int length, byte[] value) {
+    sparkColumnVectorProxy.putByteArray(rowId, value, offset, length, ordinal);
+  }
+
+  @Override public void putNull(int rowId) {
+    sparkColumnVectorProxy.putNull(rowId, ordinal);
+  }
+
+  @Override public void putNulls(int rowId, int count) {
+    sparkColumnVectorProxy.putNulls(rowId, count, ordinal);
+  }
+
+  @Override public void putNotNull(int rowId) {
+    sparkColumnVectorProxy.putNotNull(rowId, ordinal);
+  }
+
+  @Override public void putNotNull(int rowId, int count) {
+    sparkColumnVectorProxy.putNotNulls(rowId, count, ordinal);
+  }
+
+  @Override public boolean isNull(int rowId) {
+    return sparkColumnVectorProxy.isNullAt(rowId, ordinal);
+  }
+
+  @Override public void putObject(int rowId, Object obj) {
+    //TODO handle complex types
+  }
+
+  @Override public Object getData(int rowId) {
+    //TODO handle complex types
+    return null;
+  }
+
+  @Override public void reset() {
+    if (null != dictionaryVector) {
+      dictionaryVector.reset();
+    }
+  }
+
+  @Override public DataType getType() {
+    return CarbonSparkDataSourceUtil
+        .convertSparkToCarbonDataType(sparkColumnVectorProxy.dataType(ordinal));
+  }
+
+  @Override public DataType getBlockDataType() {
+    return blockDataType;
+  }
+
+  @Override public void setBlockDataType(DataType blockDataType) {
+    this.blockDataType = blockDataType;
+  }
+
+  @Override public void setDictionary(CarbonDictionary dictionary) {
+    sparkColumnVectorProxy.setDictionary(dictionary, ordinal);
+  }
+
+  @Override public boolean hasDictionary() {
+    return sparkColumnVectorProxy.hasDictionary(ordinal);
+  }
+
+  public void reserveDictionaryIds() {
+    sparkColumnVectorProxy.reserveDictionaryIds(carbonVectorProxy.numRows(), ordinal);
+    dictionaryVector = new ColumnarVectorWrapperDirect(carbonVectorProxy, ordinal);
+    ((ColumnarVectorWrapperDirect) dictionaryVector).isDictionary = true;
+  }
+
+  @Override public CarbonColumnVector getDictionaryVector() {
+    return dictionaryVector;
+  }
+
+  @Override public void putByte(int rowId, byte value) {
+    sparkColumnVectorProxy.putByte(rowId, value, ordinal);
+  }
+
+  @Override public void setFilteredRowsExist(boolean filteredRowsExist) {
+
+  }
+
+  @Override public void putFloats(int rowId, int count, float[] src, int srcIndex) {
+    sparkColumnVectorProxy.putFloats(rowId, count, src, srcIndex);
+  }
+
+  @Override public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+    sparkColumnVectorProxy.putShorts(rowId, count, src, srcIndex);
+  }
+
+  @Override public void putInts(int rowId, int count, int[] src, int srcIndex) {
+    sparkColumnVectorProxy.putInts(rowId, count, src, srcIndex);
+  }
+
+  @Override public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+    sparkColumnVectorProxy.putLongs(rowId, count, src, srcIndex);
+  }
+
+  @Override public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+    sparkColumnVectorProxy.putDoubles(rowId, count, src, srcIndex);
+  }
+
+  @Override public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+    sparkColumnVectorProxy.putBytes(rowId, count, src, srcIndex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 839a8a0..45686ea 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.log4j.Logger;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -281,7 +282,12 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
         schema = schema.add(field);
       }
     }
-    vectorProxy = new CarbonVectorProxy(DEFAULT_MEMORY_MODE,schema,DEFAULT_BATCH_SIZE);
+    short batchSize = DEFAULT_BATCH_SIZE;
+    if (queryModel.isDirectVectorFill()) {
+      batchSize = CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT;
+    }
+    vectorProxy = new CarbonVectorProxy(DEFAULT_MEMORY_MODE, schema, batchSize);
+
     if (partitionColumns != null) {
       int partitionIdx = fields.length;
       for (int i = 0; i < partitionColumns.fields().length; i++) {
@@ -290,12 +296,22 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
       }
     }
     CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
-    boolean[] filteredRows = new boolean[vectorProxy.numRows()];
-    for (int i = 0; i < fields.length; i++) {
-      vectors[i] = new ColumnarVectorWrapper(vectorProxy, filteredRows, i);
-      if (isNoDictStringField[i]) {
-        if (vectors[i] instanceof ColumnarVectorWrapper) {
-          ((ColumnarVectorWrapper) vectors[i]).reserveDictionaryIds();
+    boolean[] filteredRows = null;
+    if (queryModel.isDirectVectorFill()) {
+      for (int i = 0; i < fields.length; i++) {
+        vectors[i] = new ColumnarVectorWrapperDirect(vectorProxy, i);
+        if (isNoDictStringField[i]) {
+          ((ColumnarVectorWrapperDirect) vectors[i]).reserveDictionaryIds();
+        }
+      }
+    } else {
+      filteredRows = new boolean[vectorProxy.numRows()];
+      for (int i = 0; i < fields.length; i++) {
+        vectors[i] = new ColumnarVectorWrapper(vectorProxy, filteredRows, i);
+        if (isNoDictStringField[i]) {
+          if (vectors[i] instanceof ColumnarVectorWrapper) {
+            ((ColumnarVectorWrapper) vectors[i]).reserveDictionaryIds();
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
index 80e6dbd..03466cc 100644
--- a/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
+++ b/integration/spark-datasource/src/main/spark2.1andspark2.2/org/apache/spark/sql/CarbonVectorProxy.java
@@ -45,6 +45,8 @@ public class CarbonVectorProxy {
 
     private ColumnarBatch columnarBatch;
 
+    private ColumnVectorProxy[] columnVectorProxies;
+
     /**
      * Adapter class which handles the columnar vector reading of the carbondata
      * based on the spark ColumnVector and ColumnarBatch API. This proxy class
@@ -57,14 +59,22 @@ public class CarbonVectorProxy {
      */
     public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
         columnarBatch = ColumnarBatch.allocate(new StructType(structFileds), memMode, rowNum);
+        columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
+        for (int i = 0; i < columnVectorProxies.length; i++) {
+            columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch, i);
+        }
     }
 
     public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
         columnarBatch = ColumnarBatch.allocate(outputSchema, memMode, rowNum);
+        columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
+        for (int i = 0; i < columnVectorProxies.length; i++) {
+            columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch, i);
+        }
     }
 
-    public ColumnVector getColumnVector(int ordinal) {
-        return columnarBatch.column(ordinal);
+    public ColumnVectorProxy getColumnVector(int ordinal) {
+        return columnVectorProxies[ordinal];
     }
 
     /**
@@ -74,9 +84,6 @@ public class CarbonVectorProxy {
         columnarBatch.setNumRows(numRows);
     }
 
-    public Object reserveDictionaryIds(int capacity , int ordinal) {
-        return columnarBatch.column(ordinal).reserveDictionaryIds(capacity);
-    }
 
     /**
      * Returns the number of rows for read, including filtered rows.
@@ -85,22 +92,6 @@ public class CarbonVectorProxy {
         return columnarBatch.capacity();
     }
 
-    public void setDictionary(CarbonDictionary dictionary, int ordinal) {
-        if (null != dictionary) {
-            columnarBatch.column(ordinal)
-                .setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary));
-        } else {
-            columnarBatch.column(ordinal).setDictionary(null);
-        }
-    }
-
-    public void putNull(int rowId, int ordinal) {
-        columnarBatch.column(ordinal).putNull(rowId);
-    }
-
-    public void putNulls(int rowId, int count, int ordinal) {
-        columnarBatch.column(ordinal).putNulls(rowId, count);
-    }
 
     /**
      * Called to close all the columns in this batch. It is not valid to access the data after
@@ -139,9 +130,7 @@ public class CarbonVectorProxy {
         return columnarBatch.column(ordinal);
     }
 
-    public boolean hasDictionary(int ordinal) {
-        return columnarBatch.column(ordinal).hasDictionary();
-    }
+
 
     /**
      * Resets this column for writing. The currently stored values are no longer accessible.
@@ -150,127 +139,187 @@ public class CarbonVectorProxy {
         columnarBatch.reset();
     }
 
-    public void putRowToColumnBatch(int rowId, Object value, int offset) {
-        org.apache.spark.sql.types.DataType t = dataType(offset);
-        if (null == value) {
-            putNull(rowId, offset);
-        } else {
-            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
-                putBoolean(rowId, (boolean) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
-                putByte(rowId, (byte) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
-                putShort(rowId, (short) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
-                putInt(rowId, (int) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
-                putLong(rowId, (long) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
-                putFloat(rowId, (float) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
-                putDouble(rowId, (double) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
-                UTF8String v = (UTF8String) value;
-                putByteArray(rowId, v.getBytes(), offset);
-            } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
-                DecimalType dt = (DecimalType) t;
-                Decimal d = Decimal.fromDecimal(value);
-                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-                    putInt(rowId, (int) d.toUnscaledLong(), offset);
-                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
-                    putLong(rowId, d.toUnscaledLong(), offset);
-                } else {
-                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
-                    byte[] bytes = integer.toByteArray();
-                    putByteArray(rowId, bytes, 0, bytes.length, offset);
+
+    public static class ColumnVectorProxy {
+
+        private ColumnVector vector;
+
+        public ColumnVectorProxy(ColumnarBatch columnarBatch, int ordinal) {
+            this.vector = columnarBatch.column(ordinal);
+        }
+
+        public void putRowToColumnBatch(int rowId, Object value, int offset) {
+            org.apache.spark.sql.types.DataType t = dataType(offset);
+            if (null == value) {
+                putNull(rowId, offset);
+            } else {
+                if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
+                    putBoolean(rowId, (boolean) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
+                    putByte(rowId, (byte) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
+                    putShort(rowId, (short) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
+                    putInt(rowId, (int) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
+                    putLong(rowId, (long) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
+                    putFloat(rowId, (float) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
+                    putDouble(rowId, (double) value, offset);
+                } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
+                    UTF8String v = (UTF8String) value;
+                    putByteArray(rowId, v.getBytes(), offset);
+                } else if (t instanceof org.apache.spark.sql.types.DecimalType) {
+                    DecimalType dt = (DecimalType) t;
+                    Decimal d = Decimal.fromDecimal(value);
+                    if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+                        putInt(rowId, (int) d.toUnscaledLong(), offset);
+                    } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+                        putLong(rowId, d.toUnscaledLong(), offset);
+                    } else {
+                        final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+                        byte[] bytes = integer.toByteArray();
+                        putByteArray(rowId, bytes, 0, bytes.length, offset);
+                    }
+                } else if (t instanceof CalendarIntervalType) {
+                    CalendarInterval c = (CalendarInterval) value;
+                    vector.getChildColumn(0).putInt(rowId, c.months);
+                    vector.getChildColumn(1).putLong(rowId, c.microseconds);
+                } else if (t instanceof org.apache.spark.sql.types.DateType) {
+                    putInt(rowId, (int) value, offset);
+                } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
+                    putLong(rowId, (long) value, offset);
                 }
-            } else if (t instanceof CalendarIntervalType) {
-                CalendarInterval c = (CalendarInterval) value;
-                columnarBatch.column(offset).getChildColumn(0).putInt(rowId, c.months);
-                columnarBatch.column(offset).getChildColumn(1).putLong(rowId, c.microseconds);
-            } else if (t instanceof org.apache.spark.sql.types.DateType) {
-                putInt(rowId, (int) value, offset);
-            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
-                putLong(rowId, (long) value, offset);
             }
         }
-    }
 
-    public void putBoolean(int rowId, boolean value, int ordinal) {
-        columnarBatch.column(ordinal).putBoolean(rowId, (boolean) value);
-    }
+        public void putBoolean(int rowId, boolean value, int ordinal) {
+            vector.putBoolean(rowId, value);
+        }
 
-    public void putByte(int rowId, byte value, int ordinal) {
-        columnarBatch.column(ordinal).putByte(rowId, (byte) value);
-    }
+        public void putByte(int rowId, byte value, int ordinal) {
+            vector.putByte(rowId, value);
+        }
 
-    public void putShort(int rowId, short value, int ordinal) {
-        columnarBatch.column(ordinal).putShort(rowId, (short) value);
-    }
+        public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+            vector.putBytes(rowId, count, src, srcIndex);
+        }
 
-    public void putInt(int rowId, int value, int ordinal) {
-        columnarBatch.column(ordinal).putInt(rowId, (int) value);
-    }
+        public void putShort(int rowId, short value, int ordinal) {
+            vector.putShort(rowId, value);
+        }
 
-    public void putFloat(int rowId, float value, int ordinal) {
-        columnarBatch.column(ordinal).putFloat(rowId, (float) value);
-    }
+        public void putInt(int rowId, int value, int ordinal) {
+            vector.putInt(rowId, value);
+        }
 
-    public void putLong(int rowId, long value, int ordinal) {
-        columnarBatch.column(ordinal).putLong(rowId, (long) value);
-    }
+        public void putFloat(int rowId, float value, int ordinal) {
+            vector.putFloat(rowId, value);
+        }
 
-    public void putDouble(int rowId, double value, int ordinal) {
-        columnarBatch.column(ordinal).putDouble(rowId, (double) value);
-    }
+        public void putFloats(int rowId, int count, float[] src, int srcIndex)  {
+            vector.putFloats(rowId, count, src, srcIndex);
+        }
 
-    public void putByteArray(int rowId, byte[] value, int ordinal) {
-        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value);
-    }
+        public void putLong(int rowId, long value, int ordinal) {
+            vector.putLong(rowId, value);
+        }
 
-    public void putInts(int rowId, int count, int value, int ordinal) {
-        columnarBatch.column(ordinal).putInts(rowId, count, value);
-    }
+        public void putDouble(int rowId, double value, int ordinal) {
+            vector.putDouble(rowId, value);
+        }
 
-    public void putShorts(int rowId, int count, short value, int ordinal) {
-        columnarBatch.column(ordinal).putShorts(rowId, count, value);
-    }
+        public void putByteArray(int rowId, byte[] value, int ordinal) {
+            vector.putByteArray(rowId, value);
+        }
 
-    public void putLongs(int rowId, int count, long value, int ordinal) {
-        columnarBatch.column(ordinal).putLongs(rowId, count, value);
-    }
+        public void putInts(int rowId, int count, int value, int ordinal) {
+            vector.putInts(rowId, count, value);
+        }
 
-    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
-        columnarBatch.column(ordinal).putDecimal(rowId, value, precision);
+        public void putInts(int rowId, int count, int[] src, int srcIndex) {
+            vector.putInts(rowId, count, src, srcIndex);
+        }
 
-    }
+        public void putShorts(int rowId, int count, short value, int ordinal) {
+            vector.putShorts(rowId, count, value);
+        }
 
-    public void putDoubles(int rowId, int count, double value, int ordinal) {
-        columnarBatch.column(ordinal).putDoubles(rowId, count, value);
-    }
+        public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+            vector.putShorts(rowId, count, src, srcIndex);
+        }
 
-    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
-        columnarBatch.column(ordinal).putByteArray(rowId, (byte[]) value, offset, length);
-    }
+        public void putLongs(int rowId, int count, long value, int ordinal) {
+            vector.putLongs(rowId, count, value);
+        }
 
-    public boolean isNullAt(int rowId, int ordinal) {
-        return columnarBatch
-                .column(ordinal).isNullAt(rowId);
-    }
+        public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+            vector.putLongs(rowId, count, src, srcIndex);
+        }
 
-    public DataType dataType(int ordinal) {
-        return columnarBatch.column(ordinal).dataType();
-    }
+        public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
+            vector.putDecimal(rowId, value, precision);
 
-    public void putNotNull(int rowId, int ordinal) {
-        columnarBatch.column(ordinal).putNotNull(rowId);
-    }
+        }
 
-    public void putNotNulls(int rowId, int count, int ordinal) {
-        columnarBatch.column(ordinal).putNotNulls(rowId, count);
-    }
+        public void putDoubles(int rowId, int count, double value, int ordinal) {
+            vector.putDoubles(rowId, count, value);
+        }
+
+        public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+            vector.putDoubles(rowId, count, src, srcIndex);
+        }
+
+        public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
+            vector.putByteArray(rowId, value, offset, length);
+        }
+
+        public boolean isNullAt(int rowId, int ordinal) {
+            return vector.isNullAt(rowId);
+        }
+
+        public DataType dataType(int ordinal) {
+            return vector.dataType();
+        }
+
+        public void putNotNull(int rowId, int ordinal) {
+            vector.putNotNull(rowId);
+        }
+
+        public void putNotNulls(int rowId, int count, int ordinal) {
+            vector.putNotNulls(rowId, count);
+        }
+
+        public void putDictionaryInt(int rowId, int value, int ordinal) {
+            vector.getDictionaryIds().putInt(rowId, value);
+        }
+
+      public void setDictionary(CarbonDictionary dictionary, int ordinal) {
+        if (null != dictionary) {
+          vector.setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary));
+        } else {
+          vector.setDictionary(null);
+        }
+      }
+
+        public void putNull(int rowId, int ordinal) {
+            vector.putNull(rowId);
+        }
+
+        public void putNulls(int rowId, int count, int ordinal) {
+            vector.putNulls(rowId, count);
+        }
+
+        public boolean hasDictionary(int ordinal) {
+            return vector.hasDictionary();
+        }
+
+        public Object reserveDictionaryIds(int capacity , int ordinal) {
+            return vector.reserveDictionaryIds(capacity);
+        }
+
+     
 
-    public void putDictionaryInt(int rowId, int value, int ordinal) {
-        columnarBatch.column(ordinal).getDictionaryIds().putInt(rowId, (int) value);
     }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
index 5a99c68..bd8c57c 100644
--- a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
+++ b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonDictionaryWrapper.java
@@ -28,10 +28,7 @@ public class CarbonDictionaryWrapper implements Dictionary {
   private byte[][] binaries;
 
   CarbonDictionaryWrapper(CarbonDictionary dictionary) {
-    binaries = new byte[dictionary.getDictionarySize()][];
-    for (int i = 0; i < binaries.length; i++) {
-      binaries[i] = dictionary.getDictionaryValue(i);
-    }
+    binaries = dictionary.getAllDictionaryValues();
   }
 
   @Override public int decodeToInt(int id) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
index 4a0fb9e..bd74b05 100644
--- a/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
+++ b/integration/spark-datasource/src/main/spark2.3plus/org/apache/spark/sql/CarbonVectorProxy.java
@@ -22,7 +22,6 @@ import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.execution.vectorized.Dictionary;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -38,7 +37,7 @@ import org.apache.spark.unsafe.types.UTF8String;
 public class CarbonVectorProxy {
 
     private ColumnarBatch columnarBatch;
-    private WritableColumnVector[] columnVectors;
+    private ColumnVectorProxy[] columnVectorProxies;
 
     /**
      * Adapter class which handles the columnar vector reading of the carbondata
@@ -51,17 +50,25 @@ public class CarbonVectorProxy {
      * @param structFileds, metadata related to current schema of table.
      */
     public CarbonVectorProxy(MemoryMode memMode, int rowNum, StructField[] structFileds) {
-        columnVectors = ColumnVectorFactory
-                .getColumnVector(memMode, new StructType(structFileds), rowNum);
+        WritableColumnVector[] columnVectors =
+            ColumnVectorFactory.getColumnVector(memMode, new StructType(structFileds), rowNum);
         columnarBatch = new ColumnarBatch(columnVectors);
         columnarBatch.setNumRows(rowNum);
+        columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
+        for (int i = 0; i < columnVectorProxies.length; i++) {
+            columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch, i);
+        }
     }
 
     public CarbonVectorProxy(MemoryMode memMode, StructType outputSchema, int rowNum) {
-        columnVectors = ColumnVectorFactory
+        WritableColumnVector[] columnVectors = ColumnVectorFactory
                 .getColumnVector(memMode, outputSchema, rowNum);
         columnarBatch = new ColumnarBatch(columnVectors);
         columnarBatch.setNumRows(rowNum);
+        columnVectorProxies = new ColumnVectorProxy[columnarBatch.numCols()];
+        for (int i = 0; i < columnVectorProxies.length; i++) {
+            columnVectorProxies[i] = new ColumnVectorProxy(columnarBatch, i);
+        }
     }
 
     /**
@@ -71,10 +78,6 @@ public class CarbonVectorProxy {
         return columnarBatch.numRows();
     }
 
-    public Object reserveDictionaryIds(int capacity, int ordinal) {
-        return columnVectors[ordinal].reserveDictionaryIds(capacity);
-    }
-
     /**
      * This API will return a columnvector from a batch of column vector rows
      * based on the ordinal
@@ -86,21 +89,20 @@ public class CarbonVectorProxy {
         return (WritableColumnVector) columnarBatch.column(ordinal);
     }
 
-    public WritableColumnVector getColumnVector(int ordinal) {
-        return columnVectors[ordinal];
+    public ColumnVectorProxy getColumnVector(int ordinal) {
+        return columnVectorProxies[ordinal];
     }
-
     /**
      * Resets this column for writing. The currently stored values are no longer accessible.
      */
     public void reset() {
-        for (WritableColumnVector col : columnVectors) {
-            col.reset();
+        for (int i = 0; i < columnarBatch.numCols(); i++) {
+            ((WritableColumnVector)columnarBatch.column(i)).reset();
         }
     }
 
     public void resetDictionaryIds(int ordinal) {
-        columnVectors[ordinal].getDictionaryIds().reset();
+        ((WritableColumnVector)columnarBatch.column(ordinal)).getDictionaryIds().reset();
     }
 
     /**
@@ -133,146 +135,189 @@ public class CarbonVectorProxy {
         columnarBatch.setNumRows(numRows);
     }
 
-    public void putRowToColumnBatch(int rowId, Object value, int offset) {
-        org.apache.spark.sql.types.DataType t = dataType(offset);
-        if (null == value) {
-            putNull(rowId, offset);
-        } else {
-            if (t == org.apache.spark.sql.types.DataTypes.BooleanType) {
-                putBoolean(rowId, (boolean) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.ByteType) {
-                putByte(rowId, (byte) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.ShortType) {
-                putShort(rowId, (short) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.IntegerType) {
-                putInt(rowId, (int) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.LongType) {
-                putLong(rowId, (long) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.FloatType) {
-                putFloat(rowId, (float) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.DoubleType) {
-                putDouble(rowId, (double) value, offset);
-            } else if (t == org.apache.spark.sql.types.DataTypes.StringType) {
-                UTF8String v = (UTF8String) value;
-                putByteArray(rowId, v.getBytes(), offset);
-            } else if (t instanceof DecimalType) {
-                DecimalType dt = (DecimalType) t;
-                Decimal d = Decimal.fromDecimal(value);
-                if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-                    putInt(rowId, (int) d.toUnscaledLong(), offset);
-                } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
-                    putLong(rowId, d.toUnscaledLong(), offset);
-                } else {
-                    final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
-                    byte[] bytes = integer.toByteArray();
-                    putByteArray(rowId, bytes, 0, bytes.length, offset);
+
+    public DataType dataType(int ordinal) {
+        return columnarBatch.column(ordinal).dataType();
+    }
+
+    public static class ColumnVectorProxy {
+
+        private WritableColumnVector vector;
+
+        public ColumnVectorProxy(ColumnarBatch columnarBatch, int ordinal) {
+            vector = (WritableColumnVector) columnarBatch.column(ordinal);
+        }
+
+        public void putRowToColumnBatch(int rowId, Object value, int offset) {
+            DataType t = dataType(offset);
+            if (null == value) {
+                putNull(rowId, offset);
+            } else {
+                if (t == DataTypes.BooleanType) {
+                    putBoolean(rowId, (boolean) value, offset);
+                } else if (t == DataTypes.ByteType) {
+                    putByte(rowId, (byte) value, offset);
+                } else if (t == DataTypes.ShortType) {
+                    putShort(rowId, (short) value, offset);
+                } else if (t == DataTypes.IntegerType) {
+                    putInt(rowId, (int) value, offset);
+                } else if (t == DataTypes.LongType) {
+                    putLong(rowId, (long) value, offset);
+                } else if (t == DataTypes.FloatType) {
+                    putFloat(rowId, (float) value, offset);
+                } else if (t == DataTypes.DoubleType) {
+                    putDouble(rowId, (double) value, offset);
+                } else if (t == DataTypes.StringType) {
+                    UTF8String v = (UTF8String) value;
+                    putByteArray(rowId, v.getBytes(), offset);
+                } else if (t instanceof DecimalType) {
+                    DecimalType dt = (DecimalType) t;
+                    Decimal d = Decimal.fromDecimal(value);
+                    if (dt.precision() <= Decimal.MAX_INT_DIGITS()) {
+                        putInt(rowId, (int) d.toUnscaledLong(), offset);
+                    } else if (dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
+                        putLong(rowId, d.toUnscaledLong(), offset);
+                    } else {
+                        final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
+                        byte[] bytes = integer.toByteArray();
+                        putByteArray(rowId, bytes, 0, bytes.length, offset);
+                    }
+                } else if (t instanceof CalendarIntervalType) {
+                    CalendarInterval c = (CalendarInterval) value;
+                    vector.getChild(0).putInt(rowId, c.months);
+                    vector.getChild(1).putLong(rowId, c.microseconds);
+                } else if (t instanceof DateType) {
+                    putInt(rowId, (int) value, offset);
+                } else if (t instanceof TimestampType) {
+                    putLong(rowId, (long) value, offset);
                 }
-            } else if (t instanceof CalendarIntervalType) {
-                CalendarInterval c = (CalendarInterval) value;
-                columnVectors[offset].getChild(0).putInt(rowId, c.months);
-                columnVectors[offset].getChild(1).putLong(rowId, c.microseconds);
-            } else if (t instanceof org.apache.spark.sql.types.DateType) {
-                putInt(rowId, (int) value, offset);
-            } else if (t instanceof org.apache.spark.sql.types.TimestampType) {
-                putLong(rowId, (long) value, offset);
             }
         }
-    }
 
-    public void putBoolean(int rowId, boolean value, int ordinal) {
-        columnVectors[ordinal].putBoolean(rowId, (boolean) value);
-    }
+        public void putBoolean(int rowId, boolean value, int ordinal) {
+            vector.putBoolean(rowId, value);
+        }
 
-    public void putByte(int rowId, byte value, int ordinal) {
-        columnVectors[ordinal].putByte(rowId, (byte) value);
-    }
+        public void putByte(int rowId, byte value, int ordinal) {
+            vector.putByte(rowId, value);
+        }
 
-    public void putShort(int rowId, short value, int ordinal) {
-        columnVectors[ordinal].putShort(rowId, (short) value);
-    }
+        public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+            vector.putBytes(rowId, count, src, srcIndex);
+        }
 
-    public void putInt(int rowId, int value, int ordinal) {
-        columnVectors[ordinal].putInt(rowId, (int) value);
-    }
+        public void putShort(int rowId, short value, int ordinal) {
+            vector.putShort(rowId, value);
+        }
 
-    public void putDictionaryInt(int rowId, int value, int ordinal) {
-        columnVectors[ordinal].getDictionaryIds().putInt(rowId, (int) value);
-    }
+        public void putInt(int rowId, int value, int ordinal) {
+            vector.putInt(rowId, value);
+        }
 
-    public void putFloat(int rowId, float value, int ordinal) {
-        columnVectors[ordinal].putFloat(rowId, (float) value);
-    }
+        public void putFloat(int rowId, float value, int ordinal) {
+            vector.putFloat(rowId, value);
+        }
 
-    public void putLong(int rowId, long value, int ordinal) {
-        columnVectors[ordinal].putLong(rowId, (long) value);
-    }
+        public void putFloats(int rowId, int count, float[] src, int srcIndex)  {
+            vector.putFloats(rowId, count, src, srcIndex);
+        }
 
-    public void putDouble(int rowId, double value, int ordinal) {
-        columnVectors[ordinal].putDouble(rowId, (double) value);
-    }
+        public void putLong(int rowId, long value, int ordinal) {
+            vector.putLong(rowId, value);
+        }
 
-    public void putByteArray(int rowId, byte[] value, int ordinal) {
-        columnVectors[ordinal].putByteArray(rowId, (byte[]) value);
-    }
+        public void putDouble(int rowId, double value, int ordinal) {
+            vector.putDouble(rowId, value);
+        }
 
-    public void putInts(int rowId, int count, int value, int ordinal) {
-        columnVectors[ordinal].putInts(rowId, count, value);
-    }
+        public void putByteArray(int rowId, byte[] value, int ordinal) {
+            vector.putByteArray(rowId, value);
+        }
 
-    public void putShorts(int rowId, int count, short value, int ordinal) {
-        columnVectors[ordinal].putShorts(rowId, count, value);
-    }
+        public void putInts(int rowId, int count, int value, int ordinal) {
+            vector.putInts(rowId, count, value);
+        }
 
-    public void putLongs(int rowId, int count, long value, int ordinal) {
-        columnVectors[ordinal].putLongs(rowId, count, value);
-    }
+        public void putInts(int rowId, int count, int[] src, int srcIndex) {
+            vector.putInts(rowId, count, src, srcIndex);
+        }
 
-    public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
-        columnVectors[ordinal].putDecimal(rowId, value, precision);
+        public void putShorts(int rowId, int count, short value, int ordinal) {
+            vector.putShorts(rowId, count, value);
+        }
 
-    }
+        public void putShorts(int rowId, int count, short[] src, int srcIndex) {
+            vector.putShorts(rowId, count, src, srcIndex);
+        }
 
-    public void putDoubles(int rowId, int count, double value, int ordinal) {
-        columnVectors[ordinal].putDoubles(rowId, count, value);
-    }
+        public void putLongs(int rowId, int count, long value, int ordinal) {
+            vector.putLongs(rowId, count, value);
+        }
 
-    public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
-        columnVectors[ordinal].putByteArray(rowId, (byte[]) value, offset, length);
-    }
+        public void putLongs(int rowId, int count, long[] src, int srcIndex) {
+            vector.putLongs(rowId, count, src, srcIndex);
+        }
 
-    public void putNull(int rowId, int ordinal) {
-        columnVectors[ordinal].putNull(rowId);
-    }
+        public void putDecimal(int rowId, Decimal value, int precision, int ordinal) {
+            vector.putDecimal(rowId, value, precision);
 
-    public void putNulls(int rowId, int count, int ordinal) {
-        columnVectors[ordinal].putNulls(rowId, count);
-    }
+        }
 
-    public void putNotNull(int rowId, int ordinal) {
-        columnVectors[ordinal].putNotNull(rowId);
-    }
+        public void putDoubles(int rowId, int count, double value, int ordinal) {
+            vector.putDoubles(rowId, count, value);
+        }
 
-    public void putNotNulls(int rowId, int count, int ordinal) {
-        columnVectors[ordinal].putNotNulls(rowId, count);
-    }
+        public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+            vector.putDoubles(rowId, count, src, srcIndex);
+        }
 
-    public boolean isNullAt(int rowId, int ordinal) {
-        return columnVectors[ordinal].isNullAt(rowId);
-    }
+        public void putByteArray(int rowId, byte[] value, int offset, int length, int ordinal) {
+            vector.putByteArray(rowId, value, offset, length);
+        }
 
-    public boolean hasDictionary(int ordinal) {
-        return columnVectors[ordinal].hasDictionary();
-    }
+        public boolean isNullAt(int rowId, int ordinal) {
+            return vector.isNullAt(rowId);
+        }
 
-    public void setDictionary(CarbonDictionary dictionary, int ordinal) {
+        public DataType dataType(int ordinal) {
+            return vector.dataType();
+        }
+
+        public void putNotNull(int rowId, int ordinal) {
+            vector.putNotNull(rowId);
+        }
+
+        public void putNotNulls(int rowId, int count, int ordinal) {
+            vector.putNotNulls(rowId, count);
+        }
+
+        public void putDictionaryInt(int rowId, int value, int ordinal) {
+            vector.getDictionaryIds().putInt(rowId, value);
+        }
+
+      public void setDictionary(CarbonDictionary dictionary, int ordinal) {
         if (null != dictionary) {
-            columnVectors[ordinal].setDictionary(new CarbonDictionaryWrapper(dictionary));
+          vector.setDictionary(new CarbonDictionaryWrapper(dictionary));
         } else {
-            columnVectors[ordinal].setDictionary(null);
+          vector.setDictionary(null);
+        }
+      }
+
+        public void putNull(int rowId, int ordinal) {
+            vector.putNull(rowId);
+        }
+
+        public void putNulls(int rowId, int count, int ordinal) {
+            vector.putNulls(rowId, count);
+        }
+
+        public boolean hasDictionary(int ordinal) {
+            return vector.hasDictionary();
+        }
+
+        public Object reserveDictionaryIds(int capacity, int ordinal) {
+            return vector.reserveDictionaryIds(capacity);
         }
-    }
 
-    public DataType dataType(int ordinal) {
-        return columnVectors[ordinal].dataType();
     }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
index 6c65285..3330e8b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
@@ -705,7 +705,7 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
     private void putRowToColumnBatch(int rowId) {
         for (int i = 0; i < projection.length; i++) {
             Object value = outputValues[i];
-            vectorProxy.putRowToColumnBatch(rowId,value,i);
+            vectorProxy.getColumnVector(i).putRowToColumnBatch(rowId,value,i);
 
         }
     }


[3/3] carbondata git commit: [CARBONDATA-3012] Added support for full scan queries for vector direct fill.

Posted by ku...@apache.org.
[CARBONDATA-3012] Added support for full scan queries for vector direct fill.

After decompressing the page in our V3 reader we can immediately fill the data to a vector without any condition checks inside loops.
So here complete column page data is set to column vector in a single batch and gives back data to Spark/Presto.
For this purpose, a new method is added in ColumnPageDecoder

ColumnPage decodeAndFillVector(byte[] input, int offset, int length, ColumnVectorInfo vectorInfo,
      BitSet nullBits, boolean isLVEncoded)
The above method takes vector fill it in a single loop without any checks inside loop.

And also added new method inside DimensionDataChunkStore

 void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
      ColumnVectorInfo vectorInfo);
The above method takes vector fill it in a single loop without any checks inside loop.

This closes #2818


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3d3b6ff1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3d3b6ff1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3d3b6ff1

Branch: refs/heads/master
Commit: 3d3b6ff1615e08131f6bcaea23dec0116a18081d
Parents: e0baa9b
Author: ravipesala <ra...@gmail.com>
Authored: Tue Oct 16 11:30:43 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Thu Oct 25 22:24:24 2018 +0530

----------------------------------------------------------------------
 .../chunk/impl/DimensionRawColumnChunk.java     |  17 ++
 .../impl/FixedLengthDimensionColumnPage.java    |  29 +-
 .../chunk/impl/MeasureRawColumnChunk.java       |  17 ++
 .../impl/VariableLengthDimensionColumnPage.java |  29 +-
 .../reader/DimensionColumnChunkReader.java      |   7 +
 .../chunk/reader/MeasureColumnChunkReader.java  |   7 +
 .../reader/dimension/AbstractChunkReader.java   |  11 +
 ...essedDimChunkFileBasedPageLevelReaderV3.java |   2 +-
 ...mpressedDimensionChunkFileBasedReaderV3.java |  78 +++--
 .../measure/AbstractMeasureChunkReader.java     |  12 +
 ...CompressedMeasureChunkFileBasedReaderV3.java |  45 ++-
 ...essedMsrChunkFileBasedPageLevelReaderV3.java |   6 +-
 .../chunk/store/DimensionChunkStoreFactory.java |  16 +-
 .../chunk/store/DimensionDataChunkStore.java    |   7 +
 .../impl/LocalDictDimensionDataChunkStore.java  |  25 ++
 .../safe/AbstractNonDictionaryVectorFiller.java | 282 ++++++++++++++++++
 .../SafeFixedLengthDimensionDataChunkStore.java |  51 +++-
 ...feVariableLengthDimensionDataChunkStore.java |  17 +-
 .../UnsafeAbstractDimensionDataChunkStore.java  |   6 +
 .../datastore/columnar/BlockIndexerStorage.java |   5 +-
 .../BlockIndexerStorageForNoDictionary.java     |   3 +-
 .../columnar/BlockIndexerStorageForShort.java   |   3 +-
 .../core/datastore/columnar/UnBlockIndexer.java |   3 +
 .../core/datastore/impl/FileReaderImpl.java     |   1 +
 .../core/datastore/page/ColumnPage.java         | 130 ++++----
 .../page/ColumnPageValueConverter.java          |   3 +
 .../datastore/page/SafeDecimalColumnPage.java   |  25 ++
 .../datastore/page/VarLengthColumnPageBase.java |  17 +-
 .../page/encoding/ColumnPageDecoder.java        |   8 +
 .../page/encoding/ColumnPageEncoderMeta.java    |  11 +
 .../page/encoding/EncodingFactory.java          |  44 ++-
 .../adaptive/AdaptiveDeltaFloatingCodec.java    |  82 +++++
 .../adaptive/AdaptiveDeltaIntegralCodec.java    | 194 +++++++++++-
 .../adaptive/AdaptiveFloatingCodec.java         |  84 +++++-
 .../adaptive/AdaptiveIntegralCodec.java         | 157 ++++++++++
 .../encoding/compress/DirectCompressCodec.java  | 170 ++++++++++-
 .../datastore/page/encoding/rle/RLECodec.java   |   9 +
 .../DateDirectDictionaryGenerator.java          |   2 +-
 .../datatype/DecimalConverterFactory.java       |  91 +++++-
 .../carbondata/core/mutate/DeleteDeltaVo.java   |   4 +
 .../DictionaryBasedVectorResultCollector.java   | 112 +++++--
 .../executor/impl/AbstractQueryExecutor.java    |  13 +
 .../scan/executor/infos/BlockExecutionInfo.java |  13 +
 .../core/scan/executor/util/QueryUtil.java      |   2 +-
 .../carbondata/core/scan/model/QueryModel.java  |   6 +-
 .../core/scan/result/BlockletScannedResult.java |  76 ++++-
 .../scan/result/vector/CarbonColumnVector.java  |  18 +-
 .../scan/result/vector/CarbonColumnarBatch.java |   4 +-
 .../scan/result/vector/CarbonDictionary.java    |   2 +
 .../scan/result/vector/ColumnVectorInfo.java    |   5 +
 .../vector/impl/CarbonColumnVectorImpl.java     |  67 ++++-
 .../vector/impl/CarbonDictionaryImpl.java       |   3 +
 .../scan/scanner/impl/BlockletFullScanner.java  |   4 +-
 .../core/stats/QueryStatisticsModel.java        |  13 +
 .../apache/carbondata/core/util/ByteUtil.java   |   8 +
 .../executer/IncludeFilterExecuterImplTest.java |   6 +-
 .../carbondata/core/util/CarbonUtilTest.java    |   2 +-
 .../presto/CarbonColumnVectorWrapper.java       |  65 +++-
 .../presto/readers/SliceStreamReader.java       |   4 +-
 .../filterexpr/AllDataTypesTestCaseFilter.scala |  18 +-
 .../vectorreader/ColumnarVectorWrapper.java     |  70 ++++-
 .../ColumnarVectorWrapperDirect.java            | 229 ++++++++++++++
 .../VectorizedCarbonRecordReader.java           |  30 +-
 .../org/apache/spark/sql/CarbonVectorProxy.java | 295 ++++++++++--------
 .../spark/sql/CarbonDictionaryWrapper.java      |   5 +-
 .../org/apache/spark/sql/CarbonVectorProxy.java | 297 +++++++++++--------
 .../stream/CarbonStreamRecordReader.java        |   2 +-
 67 files changed, 2616 insertions(+), 463 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
index 7b1aca1..d84434e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.scan.result.vector.impl.CarbonDictionaryImpl;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.format.Encoding;
@@ -121,6 +122,22 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk {
     }
   }
 
+  /**
+   * Convert raw data with specified page number processed to DimensionColumnDataChunk and fill
+   * the vector
+   *
+   * @param pageNumber page number to decode and fill the vector
+   * @param vectorInfo vector to be filled with column page
+   */
+  public void convertToDimColDataChunkAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
+    assert pageNumber < pagesCount;
+    try {
+      chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override public void freeMemory() {
     super.freeMemory();
     if (null != dataChunks) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
index c815e4d..e650e0e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
@@ -46,11 +46,38 @@ public class FixedLengthDimensionColumnPage extends AbstractDimensionColumnPage
         dataChunk.length;
     dataChunkStore = DimensionChunkStoreFactory.INSTANCE
         .getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
-            DimensionStoreType.FIXED_LENGTH, null);
+            DimensionStoreType.FIXED_LENGTH, null, false);
     dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
   }
 
   /**
+   * Constructor
+   *
+   * @param dataChunk            data chunk
+   * @param invertedIndex        inverted index
+   * @param invertedIndexReverse reverse inverted index
+   * @param numberOfRows         number of rows
+   * @param columnValueSize      size of each column value
+   * @param vectorInfo           vector to be filled with decoded column page.
+   */
+  public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex,
+      int[] invertedIndexReverse, int numberOfRows, int columnValueSize,
+      ColumnVectorInfo vectorInfo) {
+    boolean isExplicitSorted = isExplicitSorted(invertedIndex);
+    long totalSize = isExplicitSorted ?
+        dataChunk.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) :
+        dataChunk.length;
+    dataChunkStore = DimensionChunkStoreFactory.INSTANCE
+        .getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
+            DimensionStoreType.FIXED_LENGTH, null, vectorInfo != null);
+    if (vectorInfo == null) {
+      dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
+    } else {
+      dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunk, vectorInfo);
+    }
+  }
+
+  /**
    * Below method will be used to fill the data based on offset and row id
    *
    * @param rowId            row id of the chunk

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
index 9448f30..6a90569 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Contains raw measure data
@@ -105,6 +106,22 @@ public class MeasureRawColumnChunk extends AbstractRawColumnChunk {
     }
   }
 
+  /**
+   * Convert raw data with specified page number processed to DimensionColumnDataChunk and fill the
+   * vector
+   *
+   * @param pageNumber page number to decode and fill the vector
+   * @param vectorInfo vector to be filled with column page
+   */
+  public void convertToColumnPageAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
+    assert pageNumber < pagesCount;
+    try {
+      chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
+    } catch (IOException | MemoryException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override public void freeMemory() {
     super.freeMemory();
     if (null != columnPages) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
index a404ff7..6cb8174 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
@@ -30,10 +30,31 @@ public class VariableLengthDimensionColumnPage extends AbstractDimensionColumnPa
 
   /**
    * Constructor for this class
+   * @param dataChunks           data chunk
+   * @param invertedIndex        inverted index
+   * @param invertedIndexReverse reverse inverted index
+   * @param numberOfRows         number of rows
+   * @param dictionary           carbon local dictionary for string column.
    */
   public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
       int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
       CarbonDictionary dictionary) {
+    this(dataChunks, invertedIndex, invertedIndexReverse, numberOfRows, dimStoreType, dictionary,
+        null);
+  }
+
+  /**
+   * Constructor for this class
+   * @param dataChunks           data chunk
+   * @param invertedIndex        inverted index
+   * @param invertedIndexReverse reverse inverted index
+   * @param numberOfRows         number of rows
+   * @param dictionary           carbon local dictionary for string column.
+   * @param vectorInfo           vector to be filled with decoded column page.
+   */
+  public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
+      int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
+      CarbonDictionary dictionary, ColumnVectorInfo vectorInfo) {
     boolean isExplicitSorted = isExplicitSorted(invertedIndex);
     long totalSize = 0;
     switch (dimStoreType) {
@@ -54,8 +75,12 @@ public class VariableLengthDimensionColumnPage extends AbstractDimensionColumnPa
     }
     dataChunkStore = DimensionChunkStoreFactory.INSTANCE
         .getDimensionChunkStore(0, isExplicitSorted, numberOfRows, totalSize, dimStoreType,
-            dictionary);
-    dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
+            dictionary, vectorInfo != null);
+    if (vectorInfo != null) {
+      dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunks, vectorInfo);
+    } else {
+      dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
index fd81973..e2d6be7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Interface for reading the data chunk
@@ -60,4 +61,10 @@ public interface DimensionColumnChunkReader {
    */
   DimensionColumnPage decodeColumnPage(DimensionRawColumnChunk dimensionRawColumnChunk,
       int pageNumber) throws IOException, MemoryException;
+
+  /**
+   * Decodes the raw data chunk of given page number and fill the vector with decoded data.
+   */
+  void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
+      int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
index f1392d0..0fbbe6b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Reader interface for reading the measure blocks from file
@@ -58,4 +59,10 @@ public interface MeasureColumnChunkReader {
   ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
       int pageNumber) throws IOException, MemoryException;
 
+  /**
+   * Decode raw data and fill the vector
+   */
+  void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
+      int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
index b08f9ed..2c42abe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
@@ -16,10 +16,15 @@
  */
 package org.apache.carbondata.core.datastore.chunk.reader.dimension;
 
+import java.io.IOException;
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
@@ -79,4 +84,10 @@ public abstract class AbstractChunkReader implements DimensionColumnChunkReader
     this.numberOfRows = numberOfRows;
   }
 
+  @Override
+  public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
+      int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+    throw new UnsupportedOperationException(
+        "This operation is not supported in this reader " + this.getClass().getName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
index 6efaf8a..86a4334 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
@@ -171,6 +171,6 @@ public class CompressedDimChunkFileBasedPageLevelReaderV3
     ByteBuffer rawData = dimensionRawColumnChunk.getFileReader()
         .readByteBuffer(filePath, offset, length);
 
-    return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0);
+    return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0, null);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index b96e52e..a9f9338 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.dimension.v3;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -39,6 +40,7 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
@@ -207,6 +209,12 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
    */
   @Override public DimensionColumnPage decodeColumnPage(
       DimensionRawColumnChunk rawColumnPage, int pageNumber) throws IOException, MemoryException {
+    return decodeColumnPage(rawColumnPage, pageNumber, null);
+  }
+
+  private DimensionColumnPage decodeColumnPage(
+      DimensionRawColumnChunk rawColumnPage, int pageNumber,
+      ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
     // data chunk of blocklet column
     DataChunk3 dataChunk3 = rawColumnPage.getDataChunkV3();
     // get the data buffer
@@ -221,49 +229,65 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
     int offset = (int) rawColumnPage.getOffSet() + dimensionChunksLength
         .get(rawColumnPage.getColumnIndex()) + dataChunk3.getPage_offset().get(pageNumber);
     // first read the data and uncompressed it
-    return decodeDimension(rawColumnPage, rawData, pageMetadata, offset);
+    return decodeDimension(rawColumnPage, rawData, pageMetadata, offset, vectorInfo);
+  }
+
+  @Override
+  public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
+      int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+    DimensionColumnPage columnPage =
+        decodeColumnPage(dimensionRawColumnChunk, pageNumber, vectorInfo);
+    columnPage.freeMemory();
   }
 
-  private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata,
-      ByteBuffer pageData, int offset, boolean isLocalDictEncodedPage)
+  private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata, ByteBuffer pageData, int offset,
+      boolean isLocalDictEncodedPage, ColumnVectorInfo vectorInfo, BitSet nullBitSet)
       throws IOException, MemoryException {
     List<Encoding> encodings = pageMetadata.getEncoders();
     List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
     String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
         pageMetadata.getChunk_meta());
     ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas,
-        compressorName);
-    return decoder
-        .decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
+        compressorName, vectorInfo != null);
+    if (vectorInfo != null) {
+      decoder
+          .decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
+              nullBitSet, isLocalDictEncodedPage);
+      return null;
+    } else {
+      return decoder
+          .decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
+    }
   }
 
   protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnPage,
-      ByteBuffer pageData, DataChunk2 pageMetadata, int offset)
+      ByteBuffer pageData, DataChunk2 pageMetadata, int offset, ColumnVectorInfo vectorInfo)
       throws IOException, MemoryException {
     List<Encoding> encodings = pageMetadata.getEncoders();
     if (CarbonUtil.isEncodedWithMeta(encodings)) {
-      ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset,
-          null != rawColumnPage.getLocalDictionary());
-      decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
       int[] invertedIndexes = new int[0];
       int[] invertedIndexesReverse = new int[0];
       // in case of no dictionary measure data types, if it is included in sort columns
       // then inverted index to be uncompressed
-      if (encodings.contains(Encoding.INVERTED_INDEX)) {
+      boolean isExplicitSorted =
+          CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX);
+      int dataOffset = offset;
+      if (isExplicitSorted) {
         offset += pageMetadata.data_page_length;
-        if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
-          invertedIndexes = CarbonUtil
-              .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
-          // get the reverse index
-          invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
-        }
+        invertedIndexes = CarbonUtil
+            .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
+        // get the reverse index
+        invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
       }
+      BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
+      ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, dataOffset,
+          null != rawColumnPage.getLocalDictionary(), vectorInfo, nullBitSet);
+      decodedPage.setNullBits(nullBitSet);
       return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), invertedIndexes,
-          invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata),
-          CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX));
+          invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata), isExplicitSorted);
     } else {
       // following code is for backward compatibility
-      return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset);
+      return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset, vectorInfo);
     }
   }
 
@@ -283,8 +307,8 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
   }
 
   private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage,
-      ByteBuffer pageData, DataChunk2 pageMetadata, int offset) throws IOException,
-      MemoryException {
+      ByteBuffer pageData, DataChunk2 pageMetadata, int offset, ColumnVectorInfo vectorInfo)
+      throws IOException, MemoryException {
     byte[] dataPage;
     int[] rlePage;
     int[] invertedIndexes = new int[0];
@@ -296,8 +320,10 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
       invertedIndexes = CarbonUtil
           .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
       offset += pageMetadata.rowid_page_length;
-      // get the reverse index
-      invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
+      if (vectorInfo == null) {
+        // get the reverse index
+        invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
+      }
     }
     // if rle is applied then read the rle block chunk and then uncompress
     //then actual data based on rle block
@@ -324,13 +350,13 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
       columnDataChunk =
           new VariableLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
               pageMetadata.getNumberOfRowsInpage(), dimStoreType,
-              rawColumnPage.getLocalDictionary());
+              rawColumnPage.getLocalDictionary(), vectorInfo);
     } else {
       // to store fixed length column chunk values
       columnDataChunk =
           new FixedLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
               pageMetadata.getNumberOfRowsInpage(),
-              eachColumnValueSize[rawColumnPage.getColumnIndex()]);
+              eachColumnValueSize[rawColumnPage.getColumnIndex()], vectorInfo);
     }
     return columnDataChunk;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
index 6774fcb..cd233d2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
@@ -16,10 +16,15 @@
  */
 package org.apache.carbondata.core.datastore.chunk.reader.measure;
 
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
 import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Measure block reader abstract class
@@ -48,4 +53,11 @@ public abstract class AbstractMeasureChunkReader implements MeasureColumnChunkRe
     this.filePath = filePath;
     this.numberOfRows = numberOfRows;
   }
+
+  @Override
+  public void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
+      int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+    throw new UnsupportedOperationException(
+        "This operation is not supported in this class " + getClass().getName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index 240771a..8394029 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileReader;
@@ -29,6 +30,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
@@ -190,6 +192,18 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
   public ColumnPage decodeColumnPage(
       MeasureRawColumnChunk rawColumnChunk, int pageNumber)
       throws IOException, MemoryException {
+    return decodeColumnPage(rawColumnChunk, pageNumber, null);
+  }
+
+  @Override
+  public void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
+      int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+    decodeColumnPage(measureRawColumnChunk, pageNumber, vectorInfo);
+  }
+
+  private ColumnPage decodeColumnPage(
+      MeasureRawColumnChunk rawColumnChunk, int pageNumber, ColumnVectorInfo vectorInfo)
+      throws IOException, MemoryException {
     // data chunk of blocklet column
     DataChunk3 dataChunk3 = rawColumnChunk.getDataChunkV3();
     // data chunk of page
@@ -203,23 +217,34 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
     int offset = (int) rawColumnChunk.getOffSet() +
         measureColumnChunkLength.get(rawColumnChunk.getColumnIndex()) +
         dataChunk3.getPage_offset().get(pageNumber);
-    ColumnPage decodedPage = decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset);
-    decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
+    BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
+    ColumnPage decodedPage =
+        decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset, vectorInfo, nullBitSet);
+    if (decodedPage == null) {
+      return null;
+    }
+    decodedPage.setNullBits(nullBitSet);
     return decodedPage;
   }
 
   /**
    * Decode measure column page with page header and raw data starting from offset
    */
-  protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData, int offset)
-      throws MemoryException, IOException {
+  protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData, int offset,
+      ColumnVectorInfo vectorInfo, BitSet nullBitSet) throws MemoryException, IOException {
     List<Encoding> encodings = pageMetadata.getEncoders();
     List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
-    String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
-        pageMetadata.getChunk_meta());
-    ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas,
-        compressorName);
-    return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
+    String compressorName =
+        CarbonMetadataUtil.getCompressorNameFromChunkMeta(pageMetadata.getChunk_meta());
+    ColumnPageDecoder codec =
+        encodingFactory.createDecoder(encodings, encoderMetas, compressorName, vectorInfo != null);
+    if (vectorInfo != null) {
+      codec
+          .decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
+              nullBitSet, false);
+      return null;
+    } else {
+      return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
+    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
index 924a206..b092350 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
@@ -151,8 +152,9 @@ public class CompressedMsrChunkFileBasedPageLevelReaderV3
     ByteBuffer buffer = rawColumnPage.getFileReader()
         .readByteBuffer(filePath, offset, pageMetadata.data_page_length);
 
-    ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0);
-    decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
+    BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
+    ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0, null, nullBitSet);
+    decodedPage.setNullBits(nullBitSet);
     return decodedPage;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
index c7bcef1..5346f35 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
@@ -65,8 +65,8 @@ public class DimensionChunkStoreFactory {
    */
   public DimensionDataChunkStore getDimensionChunkStore(int columnValueSize,
       boolean isInvertedIndex, int numberOfRows, long totalSize, DimensionStoreType storeType,
-      CarbonDictionary dictionary) {
-    if (isUnsafe) {
+      CarbonDictionary dictionary, boolean fillDirectVector) {
+    if (isUnsafe && !fillDirectVector) {
       switch (storeType) {
         case FIXED_LENGTH:
           return new UnsafeFixedLengthDimensionDataChunkStore(totalSize, columnValueSize,
@@ -79,24 +79,24 @@ public class DimensionChunkStoreFactory {
               numberOfRows);
         case LOCAL_DICT:
           return new LocalDictDimensionDataChunkStore(
-              new UnsafeFixedLengthDimensionDataChunkStore(totalSize,
-                  3, isInvertedIndex, numberOfRows),
-              dictionary);
+              new UnsafeFixedLengthDimensionDataChunkStore(totalSize, 3, isInvertedIndex,
+                  numberOfRows), dictionary);
         default:
           throw new UnsupportedOperationException("Invalid dimension store type");
       }
     } else {
       switch (storeType) {
         case FIXED_LENGTH:
-          return new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, columnValueSize);
+          return new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, columnValueSize,
+              numberOfRows);
         case VARIABLE_SHORT_LENGTH:
           return new SafeVariableShortLengthDimensionDataChunkStore(isInvertedIndex, numberOfRows);
         case VARIABLE_INT_LENGTH:
           return new SafeVariableIntLengthDimensionDataChunkStore(isInvertedIndex, numberOfRows);
         case LOCAL_DICT:
           return new LocalDictDimensionDataChunkStore(
-              new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex,
-                  3), dictionary);
+              new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, 3, numberOfRows),
+              dictionary);
         default:
           throw new UnsupportedOperationException("Invalid dimension store type");
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
index 28aed5b..8972ddb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.datastore.chunk.store;
 
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Interface responsibility is to store dimension data in memory.
@@ -35,6 +36,12 @@ public interface DimensionDataChunkStore {
   void putArray(int[] invertedIndex, int[] invertedIndexReverse, byte[] data);
 
   /**
+   * Fill the vector with decoded data.
+   */
+  void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+      ColumnVectorInfo vectorInfo);
+
+  /**
    * Below method will be used to get the row
    * based on row id passed
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
index 0d06f61..e70424f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
@@ -21,6 +21,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Dimension chunk store for local dictionary encoded data.
@@ -49,6 +51,29 @@ public class LocalDictDimensionDataChunkStore implements DimensionDataChunkStore
     this.dimensionDataChunkStore.putArray(invertedIndex, invertedIndexReverse, data);
   }
 
+  @Override
+  public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+      ColumnVectorInfo vectorInfo) {
+    int columnValueSize = dimensionDataChunkStore.getColumnValueSize();
+    int rowsNum = data.length / columnValueSize;
+    CarbonColumnVector vector = vectorInfo.vector;
+    if (!dictionary.isDictionaryUsed()) {
+      vector.setDictionary(dictionary);
+      dictionary.setDictionaryUsed();
+    }
+    for (int i = 0; i < rowsNum; i++) {
+      int surrogate = CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
+      if (surrogate == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
+        vector.putNull(i);
+        vector.getDictionaryVector().putNull(i);
+      } else {
+        vector.putNotNull(i);
+        vector.getDictionaryVector().putInt(i, surrogate);
+      }
+
+    }
+  }
+
   @Override public byte[] getRow(int rowId) {
     return dictionary.getDictionaryValue(dimensionDataChunkStore.getSurrogate(rowId));
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
new file mode 100644
index 0000000..ddfa470
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+@InterfaceAudience.Internal
+@InterfaceStability.Stable
+public abstract class AbstractNonDictionaryVectorFiller {
+
+  protected int lengthSize;
+  protected int numberOfRows;
+
+  public AbstractNonDictionaryVectorFiller(int lengthSize, int numberOfRows) {
+    this.lengthSize = lengthSize;
+    this.numberOfRows = numberOfRows;
+  }
+
+  public abstract void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer);
+
+  public int getLengthFromBuffer(ByteBuffer buffer) {
+    return buffer.getShort();
+  }
+}
+
+class NonDictionaryVectorFillerFactory {
+
+  public static AbstractNonDictionaryVectorFiller getVectorFiller(DataType type, int lengthSize,
+      int numberOfRows) {
+    if (type == DataTypes.STRING) {
+      return new StringVectorFiller(lengthSize, numberOfRows);
+    } else if (type == DataTypes.VARCHAR) {
+      return new LongStringVectorFiller(lengthSize, numberOfRows);
+    } else if (type == DataTypes.TIMESTAMP) {
+      return new TimeStampVectorFiller(lengthSize, numberOfRows);
+    } else if (type == DataTypes.BOOLEAN) {
+      return new BooleanVectorFiller(lengthSize, numberOfRows);
+    } else if (type == DataTypes.SHORT) {
+      return new ShortVectorFiller(lengthSize, numberOfRows);
+    } else if (type == DataTypes.INT) {
+      return new IntVectorFiller(lengthSize, numberOfRows);
+    } else if (type == DataTypes.LONG) {
+      return new LongVectorFiller(lengthSize, numberOfRows);
+    } else {
+      throw new UnsupportedOperationException("Not supported datatype : " + type);
+    }
+
+  }
+
+}
+
+class StringVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+  public StringVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    // as first position will be start from length of bytes as data is stored first in the memory
+    // block we need to skip first two bytes this is because first two bytes will be length of the
+    // data which we have to skip
+    int currentOffset = lengthSize;
+    ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
+    for (int i = 0; i < numberOfRows - 1; i++) {
+      buffer.position(startOffset);
+      startOffset += getLengthFromBuffer(buffer) + lengthSize;
+      int length = startOffset - (currentOffset);
+      if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentOffset, length)) {
+        vector.putNull(i);
+      } else {
+        vector.putByteArray(i, currentOffset, length, data);
+      }
+      currentOffset = startOffset + lengthSize;
+    }
+    // Handle last row
+    int length = (data.length - currentOffset);
+    if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+        CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentOffset, length)) {
+      vector.putNull(numberOfRows - 1);
+    } else {
+      vector.putByteArray(numberOfRows - 1, currentOffset, length, data);
+    }
+  }
+}
+
+class LongStringVectorFiller extends StringVectorFiller {
+  public LongStringVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public int getLengthFromBuffer(ByteBuffer buffer) {
+    return buffer.getInt();
+  }
+}
+
+class BooleanVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+  public BooleanVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    int currentOffset = lengthSize;
+    for (int i = 0; i < numberOfRows - 1; i++) {
+      buffer.position(startOffset);
+      startOffset += getLengthFromBuffer(buffer) + lengthSize;
+      int length = startOffset - (currentOffset);
+      if (length == 0) {
+        vector.putNull(i);
+      } else {
+        vector.putBoolean(i, ByteUtil.toBoolean(data[currentOffset]));
+      }
+      currentOffset = startOffset + lengthSize;
+    }
+    int length = (data.length - currentOffset);
+    if (length == 0) {
+      vector.putNull(numberOfRows - 1);
+    } else {
+      vector.putBoolean(numberOfRows - 1, ByteUtil.toBoolean(data[currentOffset]));
+    }
+  }
+}
+
+class ShortVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+  public ShortVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    int currentOffset = lengthSize;
+    for (int i = 0; i < numberOfRows - 1; i++) {
+      buffer.position(startOffset);
+      startOffset += getLengthFromBuffer(buffer) + lengthSize;
+      int length = startOffset - (currentOffset);
+      if (length == 0) {
+        vector.putNull(i);
+      } else {
+        vector.putShort(i, ByteUtil.toXorShort(data, currentOffset, length));
+      }
+      currentOffset = startOffset + lengthSize;
+    }
+    int length = (data.length - currentOffset);
+    if (length == 0) {
+      vector.putNull(numberOfRows - 1);
+    } else {
+      vector.putShort(numberOfRows - 1, ByteUtil.toXorShort(data, currentOffset, length));
+    }
+  }
+}
+
+class IntVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+  public IntVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    int currentOffset = lengthSize;
+    for (int i = 0; i < numberOfRows - 1; i++) {
+      buffer.position(startOffset);
+      startOffset += getLengthFromBuffer(buffer) + lengthSize;
+      int length = startOffset - (currentOffset);
+      if (length == 0) {
+        vector.putNull(i);
+      } else {
+        vector.putInt(i, ByteUtil.toXorInt(data, currentOffset, length));
+      }
+      currentOffset = startOffset + lengthSize;
+    }
+    int length = (data.length - currentOffset);
+    if (length == 0) {
+      vector.putNull(numberOfRows - 1);
+    } else {
+      vector.putInt(numberOfRows - 1, ByteUtil.toXorInt(data, currentOffset, length));
+    }
+  }
+}
+
+class LongVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+  public LongVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    int currentOffset = lengthSize;
+    for (int i = 0; i < numberOfRows - 1; i++) {
+      buffer.position(startOffset);
+      startOffset += getLengthFromBuffer(buffer) + lengthSize;
+      int length = startOffset - (currentOffset);
+      if (length == 0) {
+        vector.putNull(i);
+      } else {
+        vector.putLong(i, DataTypeUtil
+            .getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), currentOffset,
+                length));
+      }
+      currentOffset = startOffset + lengthSize;
+    }
+    int length = (data.length - currentOffset);
+    if (length == 0) {
+      vector.putNull(numberOfRows - 1);
+    } else {
+      vector.putLong(numberOfRows - 1, DataTypeUtil
+          .getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), currentOffset,
+              length));
+    }
+  }
+}
+
+class TimeStampVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+  public TimeStampVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    int currentOffset = lengthSize;
+    for (int i = 0; i < numberOfRows - 1; i++) {
+      buffer.position(startOffset);
+      startOffset += getLengthFromBuffer(buffer) + lengthSize;
+      int length = startOffset - (currentOffset);
+      if (length == 0) {
+        vector.putNull(i);
+      } else {
+        vector.putLong(i, ByteUtil.toXorLong(data, currentOffset, length) * 1000L);
+      }
+      currentOffset = startOffset + lengthSize;
+    }
+    int length = (data.length - currentOffset);
+    if (length == 0) {
+      vector.putNull(numberOfRows - 1);
+    } else {
+      vector.putLong(numberOfRows - 1, ByteUtil.toXorLong(data, currentOffset, length) * 1000L);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
index 41218d0..d30650d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
@@ -17,6 +17,12 @@
 
 package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -30,9 +36,52 @@ public class SafeFixedLengthDimensionDataChunkStore extends SafeAbsractDimension
    */
   private int columnValueSize;
 
-  public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int columnValueSize) {
+  private int numOfRows;
+
+  public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int columnValueSize,
+      int numOfRows) {
     super(isInvertedIndex);
     this.columnValueSize = columnValueSize;
+    this.numOfRows = numOfRows;
+  }
+
+  @Override
+  public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+      ColumnVectorInfo vectorInfo) {
+    CarbonColumnVector vector = vectorInfo.vector;
+    fillVector(data, vectorInfo, vector);
+  }
+
+  private void fillVector(byte[] data, ColumnVectorInfo vectorInfo, CarbonColumnVector vector) {
+    DataType dataType = vectorInfo.vector.getBlockDataType();
+    if (dataType == DataTypes.DATE) {
+      for (int i = 0; i < numOfRows; i++) {
+        int surrogateInternal =
+            CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
+        if (surrogateInternal == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
+          vector.putNull(i);
+        } else {
+          vector.putInt(i, surrogateInternal - DateDirectDictionaryGenerator.cutOffDate);
+        }
+      }
+    } else if (dataType == DataTypes.TIMESTAMP) {
+      for (int i = 0; i < numOfRows; i++) {
+        int surrogateInternal =
+            CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
+        if (surrogateInternal == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
+          vector.putNull(i);
+        } else {
+          Object valueFromSurrogate =
+              vectorInfo.directDictionaryGenerator.getValueFromSurrogate(surrogateInternal);
+          vector.putLong(i, (long)valueFromSurrogate);
+        }
+      }
+    } else {
+      for (int i = 0; i < numOfRows; i++) {
+        vector.putInt(i,
+            CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize));
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index 8553506..0fb4854 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -91,6 +92,20 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
     }
   }
 
+  @Override
+  public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+      ColumnVectorInfo vectorInfo) {
+    this.invertedIndexReverse = invertedIndex;
+    int lengthSize = getLengthSize();
+    CarbonColumnVector vector = vectorInfo.vector;
+    DataType dt = vector.getType();
+    // creating a byte buffer which will wrap the length of the row
+    ByteBuffer buffer = ByteBuffer.wrap(data);
+    AbstractNonDictionaryVectorFiller vectorFiller =
+        NonDictionaryVectorFillerFactory.getVectorFiller(dt, lengthSize, numberOfRows);
+    vectorFiller.fillVector(data, vector, buffer);
+  }
+
   protected abstract int getLengthSize();
   protected abstract int getLengthFromBuffer(ByteBuffer buffer);
 
@@ -150,7 +165,7 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
       vector.putNull(vectorRow);
     } else {
       if (dt == DataTypes.STRING) {
-        vector.putBytes(vectorRow, currentDataOffset, length, data);
+        vector.putByteArray(vectorRow, currentDataOffset, length, data);
       } else if (dt == DataTypes.BOOLEAN) {
         vector.putBoolean(vectorRow, ByteUtil.toBoolean(data[currentDataOffset]));
       } else if (dt == DataTypes.SHORT) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
index 89bce2d..57e9de5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
 /**
@@ -115,6 +116,11 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension
     }
   }
 
+  @Override public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+      ColumnVectorInfo vectorInfo) {
+    throw new UnsupportedOperationException("This method not supposed to be called here");
+  }
+
   /**
    * Below method will be used to free the memory occupied by the column chunk
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
index 6f3f139..44b3c12 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
@@ -50,8 +50,9 @@ public abstract class BlockIndexerStorage<T> {
    *
    * @param rowIds
    */
-  protected Map<String, short[]> rleEncodeOnRowId(short[] rowIds, short[] rowIdPage,
-      short[] rowIdRlePage) {
+  protected Map<String, short[]> rleEncodeOnRowId(short[] rowIds) {
+    short[] rowIdPage;
+    short[] rowIdRlePage;
     List<Short> list = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     int k = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
index b3e25d3..bcf5432 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
@@ -39,8 +39,7 @@ public class BlockIndexerStorageForNoDictionary extends BlockIndexerStorage<Obje
       Arrays.sort(dataWithRowId);
     }
     short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
-    Map<String, short[]> rowIdAndRleRowIdPages =
-        rleEncodeOnRowId(rowIds, getRowIdPage(), getRowIdRlePage());
+    Map<String, short[]> rowIdAndRleRowIdPages = rleEncodeOnRowId(rowIds);
     rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
     rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
index f1b9af2..b30396c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
@@ -43,8 +43,7 @@ public class BlockIndexerStorageForShort extends BlockIndexerStorage<byte[][]> {
       Arrays.sort(dataWithRowId);
     }
     short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
-    Map<String, short[]> rowIdAndRleRowIdPages =
-        rleEncodeOnRowId(rowIds, getRowIdPage(), getRowIdRlePage());
+    Map<String, short[]> rowIdAndRleRowIdPages = rleEncodeOnRowId(rowIds);
     rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
     rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
     if (rleOnData) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
index a7f38cd..48484ce 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
@@ -28,6 +28,9 @@ public final class UnBlockIndexer {
   public static int[] uncompressIndex(int[] indexData, int[] indexMap) {
     int actualSize = indexData.length;
     int mapLength = indexMap.length;
+    if (indexMap.length == 0) {
+      return indexData;
+    }
     for (int i = 0; i < mapLength; i++) {
       actualSize += indexData[indexMap[i] + 1] - indexData[indexMap[i]] - 1;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
index 6fef278..9f0abd9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
@@ -76,6 +76,7 @@ public class FileReaderImpl implements FileReader {
         channel.close();
       }
     }
+    fileNameAndStreamCache.clear();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index e8097da..e5312f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -37,14 +37,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
 
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE_ARRAY;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.FLOAT;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.INT;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.LONG;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT_INT;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.*;
 
 public abstract class ColumnPage {
 
@@ -90,7 +83,7 @@ public abstract class ColumnPage {
 
   private static ColumnPage createDecimalPage(ColumnPageEncoderMeta columnPageEncoderMeta,
       int pageSize) {
-    if (unsafe) {
+    if (isUnsafeEnabled(columnPageEncoderMeta)) {
       try {
         return new UnsafeDecimalColumnPage(columnPageEncoderMeta, pageSize);
       } catch (MemoryException e) {
@@ -103,7 +96,7 @@ public abstract class ColumnPage {
 
   private static ColumnPage createVarLengthPage(ColumnPageEncoderMeta columnPageEncoderMeta,
       int pageSize) {
-    if (unsafe) {
+    if (isUnsafeEnabled(columnPageEncoderMeta)) {
       try {
         return new UnsafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
       } catch (MemoryException e) {
@@ -116,7 +109,7 @@ public abstract class ColumnPage {
 
   private static ColumnPage createFixLengthPage(
       ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
-    if (unsafe) {
+    if (isUnsafeEnabled(columnPageEncoderMeta)) {
       try {
         return new UnsafeFixLengthColumnPage(columnPageEncoderMeta, pageSize);
       } catch (MemoryException e) {
@@ -129,7 +122,7 @@ public abstract class ColumnPage {
 
   private static ColumnPage createFixLengthByteArrayPage(
       ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize, int eachValueSize) {
-    if (unsafe) {
+    if (isUnsafeEnabled(columnPageEncoderMeta)) {
       try {
         return new UnsafeFixLengthColumnPage(columnPageEncoderMeta, pageSize, eachValueSize);
       } catch (MemoryException e) {
@@ -163,7 +156,7 @@ public abstract class ColumnPage {
             CarbonCommonConstants.LOCAL_DICTIONARY_DECODER_BASED_FALLBACK_DEFAULT));
     ColumnPage actualPage;
     ColumnPage encodedPage;
-    if (unsafe) {
+    if (isUnsafeEnabled(columnPageEncoderMeta)) {
       actualPage = new UnsafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
       encodedPage = new UnsafeFixLengthColumnPage(
           new ColumnPageEncoderMeta(columnPageEncoderMeta.getColumnSpec(), DataTypes.BYTE_ARRAY,
@@ -190,7 +183,7 @@ public abstract class ColumnPage {
     DataType dataType = columnPageEncoderMeta.getStoreDataType();
     TableSpec.ColumnSpec columnSpec = columnPageEncoderMeta.getColumnSpec();
     String compressorName = columnPageEncoderMeta.getCompressorName();
-    if (unsafe) {
+    if (isUnsafeEnabled(columnPageEncoderMeta)) {
       if (dataType == DataTypes.BOOLEAN) {
         instance = new UnsafeFixLengthColumnPage(
             new ColumnPageEncoderMeta(columnSpec, BYTE, compressorName), pageSize);
@@ -219,21 +212,23 @@ public abstract class ColumnPage {
       }
     } else {
       if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
-        instance = newBytePage(columnSpec, new byte[pageSize], compressorName);
+        instance = newBytePage(columnPageEncoderMeta, new byte[pageSize]);
       } else if (dataType == DataTypes.SHORT) {
-        instance = newShortPage(columnSpec, new short[pageSize], compressorName);
+        instance = newShortPage(columnPageEncoderMeta, new short[pageSize]);
       } else if (dataType == DataTypes.SHORT_INT) {
-        instance = newShortIntPage(columnSpec, new byte[pageSize * 3], compressorName);
+        instance = newShortIntPage(columnPageEncoderMeta, new byte[pageSize * 3]);
       } else if (dataType == DataTypes.INT) {
-        instance = newIntPage(columnSpec, new int[pageSize], compressorName);
+        instance = newIntPage(columnPageEncoderMeta, new int[pageSize]);
       } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
-        instance = newLongPage(columnSpec, new long[pageSize], compressorName);
+        instance = newLongPage(
+            new ColumnPageEncoderMeta(columnPageEncoderMeta.getColumnSpec(), LONG,
+                columnPageEncoderMeta.getCompressorName()), new long[pageSize]);
       } else if (dataType == DataTypes.FLOAT) {
-        instance = newFloatPage(columnSpec, new float[pageSize], compressorName);
+        instance = newFloatPage(columnPageEncoderMeta, new float[pageSize]);
       } else if (dataType == DataTypes.DOUBLE) {
-        instance = newDoublePage(columnSpec, new double[pageSize], compressorName);
+        instance = newDoublePage(columnPageEncoderMeta, new double[pageSize]);
       } else if (DataTypes.isDecimal(dataType)) {
-        instance = newDecimalPage(columnSpec, new byte[pageSize][], compressorName);
+        instance = newDecimalPage(columnPageEncoderMeta, new byte[pageSize][]);
       } else if (dataType == DataTypes.STRING
           || dataType == DataTypes.BYTE_ARRAY
           || dataType == DataTypes.VARCHAR) {
@@ -253,75 +248,67 @@ public abstract class ColumnPage {
     return columnPage;
   }
 
-  private static ColumnPage newBytePage(TableSpec.ColumnSpec columnSpec, byte[] byteData,
-      String compressorName) {
+  private static ColumnPage newBytePage(ColumnPageEncoderMeta meta, byte[] byteData) {
+    ColumnPageEncoderMeta encoderMeta =
+        new ColumnPageEncoderMeta(meta.getColumnSpec(), BYTE, meta.getCompressorName());
+    encoderMeta.setFillCompleteVector(meta.isFillCompleteVector());
     ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, BYTE, compressorName), byteData.length);
+        encoderMeta, byteData.length);
     columnPage.setBytePage(byteData);
     return columnPage;
   }
 
-  private static ColumnPage newShortPage(TableSpec.ColumnSpec columnSpec, short[] shortData,
-      String compressorName) {
-    ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, SHORT, compressorName), shortData.length);
+  private static ColumnPage newShortPage(ColumnPageEncoderMeta meta, short[] shortData) {
+    ColumnPage columnPage = createPage(meta, shortData.length);
     columnPage.setShortPage(shortData);
     return columnPage;
   }
 
-  private static ColumnPage newShortIntPage(TableSpec.ColumnSpec columnSpec, byte[] shortIntData,
-      String compressorName) {
-    ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, SHORT_INT, compressorName), shortIntData.length / 3);
+  private static ColumnPage newShortIntPage(ColumnPageEncoderMeta meta, byte[] shortIntData) {
+    ColumnPage columnPage = createPage(meta, shortIntData.length / 3);
     columnPage.setShortIntPage(shortIntData);
     return columnPage;
   }
 
-  private static ColumnPage newIntPage(TableSpec.ColumnSpec columnSpec, int[] intData,
-      String compressorName) {
-    ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, INT, compressorName), intData.length);
+  private static ColumnPage newIntPage(ColumnPageEncoderMeta meta, int[] intData) {
+    ColumnPage columnPage = createPage(meta, intData.length);
     columnPage.setIntPage(intData);
     return columnPage;
   }
 
-  private static ColumnPage newLongPage(TableSpec.ColumnSpec columnSpec, long[] longData,
-      String compressorName) {
-    ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, LONG, compressorName), longData.length);
+  private static ColumnPage newLongPage(ColumnPageEncoderMeta meta, long[] longData) {
+    ColumnPage columnPage = createPage(meta, longData.length);
     columnPage.setLongPage(longData);
     return columnPage;
   }
 
-  private static ColumnPage newFloatPage(TableSpec.ColumnSpec columnSpec, float[] floatData,
-      String compressorName) {
-    ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, FLOAT, compressorName), floatData.length);
+  private static ColumnPage newFloatPage(ColumnPageEncoderMeta meta, float[] floatData) {
+    ColumnPage columnPage = createPage(meta, floatData.length);
     columnPage.setFloatPage(floatData);
     return columnPage;
   }
 
-  private static ColumnPage newDoublePage(TableSpec.ColumnSpec columnSpec, double[] doubleData,
-      String compressorName) {
-    ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, DOUBLE, compressorName), doubleData.length);
+  private static ColumnPage newDoublePage(ColumnPageEncoderMeta meta, double[] doubleData) {
+    ColumnPage columnPage = createPage(meta, doubleData.length);
     columnPage.setDoublePage(doubleData);
     return columnPage;
   }
 
-  private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray,
-      String compressorName) {
+  private static ColumnPage newDecimalPage(ColumnPageEncoderMeta meta, byte[][] byteArray) {
+    ColumnPageEncoderMeta encoderMeta =
+        new ColumnPageEncoderMeta(meta.getColumnSpec(), meta.getColumnSpec().getSchemaDataType(),
+            meta.getCompressorName());
+    encoderMeta.setFillCompleteVector(meta.isFillCompleteVector());
     ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
+        encoderMeta,
         byteArray.length);
     columnPage.setByteArrayPage(byteArray);
     return columnPage;
   }
 
-  private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedByteArray, String compressorName) throws MemoryException {
-    return VarLengthColumnPageBase.newDecimalColumnPage(
-        columnSpec, lvEncodedByteArray, compressorName);
+  private static ColumnPage newDecimalPage(ColumnPageEncoderMeta meta,
+      byte[] lvEncodedByteArray) throws MemoryException {
+    return VarLengthColumnPageBase.newDecimalColumnPage(meta, lvEncodedByteArray);
   }
 
   private static ColumnPage newLVBytesPage(TableSpec.ColumnSpec columnSpec,
@@ -813,25 +800,25 @@ public abstract class ColumnPage {
     DataType storeDataType = meta.getStoreDataType();
     if (storeDataType == DataTypes.BOOLEAN || storeDataType == DataTypes.BYTE) {
       byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
-      return newBytePage(columnSpec, byteData, meta.getCompressorName());
+      return newBytePage(meta, byteData);
     } else if (storeDataType == DataTypes.SHORT) {
       short[] shortData = compressor.unCompressShort(compressedData, offset, length);
-      return newShortPage(columnSpec, shortData, meta.getCompressorName());
+      return newShortPage(meta, shortData);
     } else if (storeDataType == DataTypes.SHORT_INT) {
       byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
-      return newShortIntPage(columnSpec, shortIntData, meta.getCompressorName());
+      return newShortIntPage(meta, shortIntData);
     } else if (storeDataType == DataTypes.INT) {
       int[] intData = compressor.unCompressInt(compressedData, offset, length);
-      return newIntPage(columnSpec, intData, meta.getCompressorName());
+      return newIntPage(meta, intData);
     } else if (storeDataType == DataTypes.LONG) {
       long[] longData = compressor.unCompressLong(compressedData, offset, length);
-      return newLongPage(columnSpec, longData, meta.getCompressorName());
+      return newLongPage(meta, longData);
     } else if (storeDataType == DataTypes.FLOAT) {
       float[] floatData = compressor.unCompressFloat(compressedData, offset, length);
-      return newFloatPage(columnSpec, floatData, meta.getCompressorName());
+      return newFloatPage(meta, floatData);
     } else if (storeDataType == DataTypes.DOUBLE) {
       double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
-      return newDoublePage(columnSpec, doubleData, meta.getCompressorName());
+      return newDoublePage(meta, doubleData);
     } else if (!isLVEncoded && storeDataType == DataTypes.BYTE_ARRAY && (
         columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE
             || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
@@ -873,8 +860,7 @@ public abstract class ColumnPage {
   public static ColumnPage decompressDecimalPage(ColumnPageEncoderMeta meta, byte[] compressedData,
       int offset, int length) throws MemoryException {
     Compressor compressor = CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
-    TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
-    ColumnPage decimalPage = null;
+    ColumnPage decimalPage;
     DataType storeDataType = meta.getStoreDataType();
     if (storeDataType == DataTypes.BYTE) {
       byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
@@ -888,7 +874,7 @@ public abstract class ColumnPage {
       return decimalPage;
     } else if (storeDataType == DataTypes.SHORT_INT) {
       byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
-      decimalPage = createDecimalPage(meta, shortIntData.length);
+      decimalPage = createDecimalPage(meta, shortIntData.length / 3);
       decimalPage.setShortIntPage(shortIntData);
       return decimalPage;
     }  else if (storeDataType == DataTypes.INT) {
@@ -903,10 +889,20 @@ public abstract class ColumnPage {
       return decimalPage;
     } else {
       byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
-      return newDecimalPage(columnSpec, lvEncodedBytes, meta.getCompressorName());
+      return newDecimalPage(meta, lvEncodedBytes);
     }
   }
 
+  /**
+   * Whether unsafe enabled or not. In case of filling complete vector flow there is no need to use
+   * unsafe flow as we don't store the data in memory for long time.
+   * @param meta ColumnPageEncoderMeta
+   * @return boolean Whether unsafe enabled or not
+   */
+  protected static boolean isUnsafeEnabled(ColumnPageEncoderMeta meta) {
+    return unsafe && !meta.isFillCompleteVector();
+  }
+
   public BitSet getNullBits() {
     return nullBitSet;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
index 53ad956..82ccd22 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datastore.page;
 
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+
 // Transformation type that can be applied to ColumnPage
 public interface ColumnPageValueConverter {
   void encode(int rowId, byte value);
@@ -35,4 +37,5 @@ public interface ColumnPageValueConverter {
   double decodeDouble(long value);
   double decodeDouble(float value);
   double decodeDouble(double value);
+  void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
index d3e945d..1867354 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
@@ -194,6 +194,31 @@ public class SafeDecimalColumnPage extends DecimalColumnPage {
   }
 
   @Override
+  public byte[] getBytePage() {
+    return byteData;
+  }
+
+  @Override public short[] getShortPage() {
+    return shortData;
+  }
+
+  @Override public byte[] getShortIntPage() {
+    return shortIntData;
+  }
+
+  @Override public int[] getIntPage() {
+    return intData;
+  }
+
+  @Override public long[] getLongPage() {
+    return longData;
+  }
+
+  @Override public byte[][] getByteArrayPage() {
+    return byteArrayData;
+  }
+
+  @Override
   public void freeMemory() {
     byteArrayData = null;
     super.freeMemory();


[2/3] carbondata git commit: [CARBONDATA-3012] Added support for full scan queries for vector direct fill.

Posted by ku...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 39b8282..a760b64 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -124,8 +124,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   /**
    * Create a new column page for decimal page
    */
-  static ColumnPage newDecimalColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
-      String compressorName) throws MemoryException {
+  static ColumnPage newDecimalColumnPage(ColumnPageEncoderMeta meta,
+      byte[] lvEncodedBytes) throws MemoryException {
+    TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
     DecimalConverterFactory.DecimalConverter decimalConverter =
         DecimalConverterFactory.INSTANCE.getDecimalConverter(columnSpec.getPrecision(),
             columnSpec.getScale());
@@ -133,10 +134,10 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     if (size < 0) {
       return getLVBytesColumnPage(columnSpec, lvEncodedBytes,
           DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()),
-          CarbonCommonConstants.INT_SIZE_IN_BYTE, compressorName);
+          CarbonCommonConstants.INT_SIZE_IN_BYTE, meta.getCompressorName());
     } else {
       // Here the size is always fixed.
-      return getDecimalColumnPage(columnSpec, lvEncodedBytes, size, compressorName);
+      return getDecimalColumnPage(meta, lvEncodedBytes, size);
     }
   }
 
@@ -158,8 +159,10 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
         lvLength, compressorName);
   }
 
-  private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedBytes, int size, String compressorName) throws MemoryException {
+  private static ColumnPage getDecimalColumnPage(ColumnPageEncoderMeta meta,
+      byte[] lvEncodedBytes, int size) throws MemoryException {
+    TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
+    String compressorName = meta.getCompressorName();
     TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
         .newInstance(columnSpec.getFieldName(), DataTypes.INT, ColumnType.MEASURE);
     ColumnPage rowOffset = ColumnPage.newPage(
@@ -176,7 +179,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     rowOffset.putInt(counter, offset);
 
     VarLengthColumnPageBase page;
-    if (unsafe) {
+    if (isUnsafeEnabled(meta)) {
       page = new UnsafeDecimalColumnPage(
           new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
           rowId);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
index 4e491c5..d82a873 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageDecoder.java
@@ -18,9 +18,11 @@
 package org.apache.carbondata.core.datastore.page.encoding;
 
 import java.io.IOException;
+import java.util.BitSet;
 
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 public interface ColumnPageDecoder {
 
@@ -29,6 +31,12 @@ public interface ColumnPageDecoder {
    */
   ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, IOException;
 
+  /**
+   *  Apply decoding algorithm on input byte array and fill the vector here.
+   */
+  void decodeAndFillVector(byte[] input, int offset, int length, ColumnVectorInfo vectorInfo,
+      BitSet nullBits, boolean isLVEncoded) throws MemoryException, IOException;
+
   ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
       throws MemoryException, IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
index e6aafa0..03a43f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoderMeta.java
@@ -49,6 +49,9 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
   // Make it protected for RLEEncoderMeta
   protected String compressorName;
 
+  // Whether the flow shoild go to fill complete vector while decoding the page.
+  private transient boolean fillCompleteVector;
+
   public ColumnPageEncoderMeta() {
   }
 
@@ -284,4 +287,12 @@ public class ColumnPageEncoderMeta extends ValueEncoderMeta implements Writable
   public DataType getSchemaDataType() {
     return columnSpec.getSchemaDataType();
   }
+
+  public boolean isFillCompleteVector() {
+    return fillCompleteVector;
+  }
+
+  public void setFillCompleteVector(boolean fillCompleteVector) {
+    this.fillCompleteVector = fillCompleteVector;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index 920a516..d3070b4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -66,6 +66,21 @@ public abstract class EncodingFactory {
    */
   public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas,
       String compressor) throws IOException {
+    return createDecoder(encodings, encoderMetas, compressor, false);
+  }
+
+  /**
+   * Return new decoder based on encoder metadata read from file
+   * @param encodings encodings used to decode the page
+   * @param encoderMetas metadata of encodings to decode the data
+   * @param compressor Compressor name which will be used to decode data.
+   * @param fullVectorFill whether the flow should go to fill the given vector completely while
+   *                       decoding the data itself.
+   * @return decoder to decode page.
+   * @throws IOException
+   */
+  public ColumnPageDecoder createDecoder(List<Encoding> encodings, List<ByteBuffer> encoderMetas,
+      String compressor, boolean fullVectorFill) throws IOException {
     assert (encodings.size() >= 1);
     assert (encoderMetas.size() == 1);
     Encoding encoding = encodings.get(0);
@@ -74,16 +89,19 @@ public abstract class EncodingFactory {
     DataInputStream in = new DataInputStream(stream);
     if (encoding == DIRECT_COMPRESS || encoding == DIRECT_COMPRESS_VARCHAR) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
       return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata);
     } else if (encoding == ADAPTIVE_INTEGRAL) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveIntegralCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
           stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata);
     } else if (encoding == ADAPTIVE_DELTA_INTEGRAL) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveDeltaIntegralCodec(metadata.getSchemaDataType(),
@@ -91,12 +109,14 @@ public abstract class EncodingFactory {
           .createDecoder(metadata);
     } else if (encoding == ADAPTIVE_FLOATING) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveFloatingCodec(metadata.getSchemaDataType(), metadata.getStoreDataType(),
           stats, encodings.contains(Encoding.INVERTED_INDEX)).createDecoder(metadata);
     } else if (encoding == ADAPTIVE_DELTA_FLOATING) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
       SimpleStatsResult stats = PrimitivePageStatsCollector.newInstance(metadata);
       return new AdaptiveDeltaFloatingCodec(metadata.getSchemaDataType(),
@@ -108,12 +128,13 @@ public abstract class EncodingFactory {
       return new RLECodec().createDecoder(metadata);
     } else if (encoding == BOOL_BYTE) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
+      metadata.setFillCompleteVector(fullVectorFill);
       metadata.readFields(in);
       return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata);
     } else {
       // for backward compatibility
       ValueEncoderMeta metadata = CarbonUtil.deserializeEncoderMetaV3(encoderMeta);
-      return createDecoderLegacy(metadata, compressor);
+      return createDecoderLegacy(metadata, compressor, fullVectorFill);
     }
   }
 
@@ -121,6 +142,14 @@ public abstract class EncodingFactory {
    * Old way of creating decoder, based on algorithm
    */
   public ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor) {
+    return createDecoderLegacy(metadata, compressor, false);
+  }
+
+  /**
+   * Old way of creating decoder, based on algorithm
+   */
+  private ColumnPageDecoder createDecoderLegacy(ValueEncoderMeta metadata, String compressor,
+      boolean fullVectorFill) {
     if (null == metadata) {
       throw new RuntimeException("internal error");
     }
@@ -139,16 +168,19 @@ public abstract class EncodingFactory {
         AdaptiveIntegralCodec adaptiveCodec = (AdaptiveIntegralCodec) codec;
         ColumnPageEncoderMeta meta =
             new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+        meta.setFillCompleteVector(fullVectorFill);
         return codec.createDecoder(meta);
       } else if (codec instanceof AdaptiveDeltaIntegralCodec) {
         AdaptiveDeltaIntegralCodec adaptiveCodec = (AdaptiveDeltaIntegralCodec) codec;
         ColumnPageEncoderMeta meta =
             new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+        meta.setFillCompleteVector(fullVectorFill);
         return codec.createDecoder(meta);
       } else if (codec instanceof DirectCompressCodec) {
         ColumnPageEncoderMeta meta =
             new ColumnPageEncoderMeta(spec, DataType.getDataType(metadata.getType()), stats,
                 compressor);
+        meta.setFillCompleteVector(fullVectorFill);
         return codec.createDecoder(meta);
       } else {
         throw new RuntimeException("internal error");
@@ -161,30 +193,36 @@ public abstract class EncodingFactory {
         AdaptiveFloatingCodec adaptiveCodec = (AdaptiveFloatingCodec) codec;
         ColumnPageEncoderMeta meta =
             new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+        meta.setFillCompleteVector(fullVectorFill);
         return codec.createDecoder(meta);
       } else if (codec instanceof DirectCompressCodec) {
         ColumnPageEncoderMeta meta =
             new ColumnPageEncoderMeta(spec, DataType.getDataType(metadata.getType()), stats,
                 compressor);
+        meta.setFillCompleteVector(fullVectorFill);
         return codec.createDecoder(meta);
       } else if (codec instanceof AdaptiveDeltaFloatingCodec) {
         AdaptiveDeltaFloatingCodec adaptiveCodec = (AdaptiveDeltaFloatingCodec) codec;
         ColumnPageEncoderMeta meta =
             new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+        meta.setFillCompleteVector(fullVectorFill);
         return codec.createDecoder(meta);
       } else {
         throw new RuntimeException("internal error");
       }
     } else if (DataTypes.isDecimal(dataType) || dataType == DataTypes.BYTE_ARRAY) {
       // no dictionary dimension
-      return new DirectCompressCodec(stats.getDataType())
-          .createDecoder(new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor));
+      ColumnPageEncoderMeta meta =
+          new ColumnPageEncoderMeta(spec, stats.getDataType(), stats, compressor);
+      meta.setFillCompleteVector(fullVectorFill);
+      return new DirectCompressCodec(stats.getDataType()).createDecoder(meta);
     } else if (dataType == DataTypes.LEGACY_LONG) {
       // In case of older versions like in V1 format it has special datatype to handle
       AdaptiveIntegralCodec adaptiveCodec =
           new AdaptiveIntegralCodec(DataTypes.LONG, DataTypes.LONG, stats, false);
       ColumnPageEncoderMeta meta =
           new ColumnPageEncoderMeta(spec, adaptiveCodec.getTargetDataType(), stats, compressor);
+      meta.setFillCompleteVector(fullVectorFill);
       return adaptiveCodec.createDecoder(meta);
     } else {
       throw new RuntimeException("unsupported data type: " + stats.getDataType());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
index 9b0b574..1826798 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -35,6 +37,9 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
@@ -125,6 +130,18 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
         return LazyColumnPage.newPage(page, converter);
       }
 
+      @Override
+      public void decodeAndFillVector(byte[] input, int offset, int length,
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          throws MemoryException, IOException {
+        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+        page.setNullBits(nullBits);
+        if (page instanceof DecimalColumnPage) {
+          vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+        }
+        converter.decodeAndFillVector(page, vectorInfo);
+      }
+
       @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
           throws MemoryException, IOException {
         return decode(input, offset, length);
@@ -226,6 +243,71 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec {
     }
 
     @Override
+    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+      CarbonColumnVector vector = vectorInfo.vector;
+      BitSet nullBits = columnPage.getNullBits();
+      DataType pageDataType = columnPage.getDataType();
+      int pageSize = columnPage.getPageSize();
+      BitSet deletedRows = vectorInfo.deletedRows;
+      DataType vectorDataType = vector.getType();
+      if (vectorDataType == DataTypes.FLOAT) {
+        float floatFactor = factor.floatValue();
+        if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+          byte[] byteData = columnPage.getBytePage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putFloat(i, (max - byteData[i]) / floatFactor);
+          }
+        } else if (pageDataType == DataTypes.SHORT) {
+          short[] shortData = columnPage.getShortPage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putFloat(i, (max - shortData[i]) / floatFactor);
+          }
+
+        } else if (pageDataType == DataTypes.SHORT_INT) {
+          byte[] shortIntPage = columnPage.getShortIntPage();
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putFloat(i, (max - shortInt) / floatFactor);
+          }
+        } else {
+          throw new RuntimeException("internal error: " + this.toString());
+        }
+      } else {
+        if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+          byte[] byteData = columnPage.getBytePage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (max - byteData[i]) / factor);
+          }
+        } else if (pageDataType == DataTypes.SHORT) {
+          short[] shortData = columnPage.getShortPage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (max - shortData[i]) / factor);
+          }
+
+        } else if (pageDataType == DataTypes.SHORT_INT) {
+          byte[] shortIntPage = columnPage.getShortIntPage();
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putDouble(i, (max - shortInt) / factor);
+          }
+        } else if (pageDataType == DataTypes.INT) {
+          int[] intData = columnPage.getIntPage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (max - intData[i]) / factor);
+          }
+        } else {
+          throw new RuntimeException("Unsupported datatype : " + pageDataType);
+        }
+      }
+
+      if (deletedRows == null || deletedRows.isEmpty()) {
+        for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+          vector.putNull(i);
+        }
+      }
+    }
+
+    @Override
     public double decodeDouble(float value) {
       throw new RuntimeException("internal error: " + debugInfo());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
index 0e61b33..0d7ad8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -27,6 +28,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -35,6 +37,10 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
@@ -119,9 +125,11 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
     };
   }
 
-  @Override public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
+  @Override
+  public ColumnPageDecoder createDecoder(final ColumnPageEncoderMeta meta) {
     return new ColumnPageDecoder() {
-      @Override public ColumnPage decode(byte[] input, int offset, int length)
+      @Override
+      public ColumnPage decode(byte[] input, int offset, int length)
           throws MemoryException, IOException {
         ColumnPage page = null;
         if (DataTypes.isDecimal(meta.getSchemaDataType())) {
@@ -132,7 +140,23 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
         return LazyColumnPage.newPage(page, converter);
       }
 
-      @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
+      @Override
+      public void decodeAndFillVector(byte[] input, int offset, int length,
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          throws MemoryException, IOException {
+        ColumnPage page = null;
+        if (DataTypes.isDecimal(meta.getSchemaDataType())) {
+          page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+          vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+        } else {
+          page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+        }
+        page.setNullBits(nullBits);
+        converter.decodeAndFillVector(page, vectorInfo);
+      }
+
+      @Override
+      public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
           throws MemoryException, IOException {
         return decode(input, offset, length);
       }
@@ -272,5 +296,169 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec {
       // this codec is for integer type only
       throw new RuntimeException("internal error");
     }
+
+    @Override
+    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+      CarbonColumnVector vector = vectorInfo.vector;
+      BitSet nullBits = columnPage.getNullBits();
+      DataType vectorDataType = vector.getType();
+      DataType pageDataType = columnPage.getDataType();
+      int pageSize = columnPage.getPageSize();
+      BitSet deletedRows = vectorInfo.deletedRows;
+      fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+      if (deletedRows == null || deletedRows.isEmpty()) {
+        for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+          vector.putNull(i);
+        }
+      }
+    }
+
+    private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+        DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+      if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+        byte[] byteData = columnPage.getBytePage();
+        if (vectorDataType == DataTypes.SHORT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putShort(i, (short) (max - byteData[i]));
+          }
+        } else if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) (max - byteData[i]));
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - byteData[i]));
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - byteData[i]) * 1000);
+          }
+        } else if (vectorDataType == DataTypes.BOOLEAN) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putByte(i, (byte) (max - byteData[i]));
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int precision = vectorInfo.measure.getMeasure().getPrecision();
+          for (int i = 0; i < pageSize; i++) {
+            BigDecimal decimal = decimalConverter.getDecimal(max - byteData[i]);
+            vector.putDecimal(i, decimal, precision);
+          }
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (max - byteData[i]));
+          }
+        }
+      } else if (pageDataType == DataTypes.SHORT) {
+        short[] shortData = columnPage.getShortPage();
+        if (vectorDataType == DataTypes.SHORT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putShort(i, (short) (max - shortData[i]));
+          }
+        } else if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) (max - shortData[i]));
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - shortData[i]));
+          }
+        }  else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - shortData[i]) * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int precision = vectorInfo.measure.getMeasure().getPrecision();
+          for (int i = 0; i < pageSize; i++) {
+            BigDecimal decimal = decimalConverter.getDecimal(max - shortData[i]);
+            vector.putDecimal(i, decimal, precision);
+          }
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (max - shortData[i]));
+          }
+        }
+
+      } else if (pageDataType == DataTypes.SHORT_INT) {
+        byte[] shortIntPage = columnPage.getShortIntPage();
+        if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putInt(i, (int) (max - shortInt));
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putLong(i, (max - shortInt));
+          }
+        }  else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putLong(i, (max - shortInt) * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int precision = vectorInfo.measure.getMeasure().getPrecision();
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            BigDecimal decimal = decimalConverter.getDecimal(max - shortInt);
+            vector.putDecimal(i, decimal, precision);
+          }
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putDouble(i, (max - shortInt));
+          }
+        }
+      } else if (pageDataType == DataTypes.INT) {
+        int[] intData = columnPage.getIntPage();
+        if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) (max - intData[i]));
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - intData[i]));
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - intData[i]) * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int precision = vectorInfo.measure.getMeasure().getPrecision();
+          for (int i = 0; i < pageSize; i++) {
+            BigDecimal decimal = decimalConverter.getDecimal(max - intData[i]);
+            vector.putDecimal(i, decimal, precision);
+          }
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (max - intData[i]));
+          }
+        }
+      } else if (pageDataType == DataTypes.LONG) {
+        long[] longData = columnPage.getLongPage();
+        if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - longData[i]));
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, (max - longData[i]) * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int precision = vectorInfo.measure.getMeasure().getPrecision();
+          for (int i = 0; i < pageSize; i++) {
+            BigDecimal decimal = decimalConverter.getDecimal(max - longData[i]);
+            vector.putDecimal(i, decimal, precision);
+          }
+        }
+      } else {
+        throw new RuntimeException("Unsupported datatype : " + pageDataType);
+      }
+    }
+
   };
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
index 836af26..38bf9b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -34,6 +36,9 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
@@ -113,7 +118,20 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
         return LazyColumnPage.newPage(page, converter);
       }
 
-      @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
+      @Override
+      public void decodeAndFillVector(byte[] input, int offset, int length,
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          throws MemoryException, IOException {
+        ColumnPage page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+        page.setNullBits(nullBits);
+        if (page instanceof DecimalColumnPage) {
+          vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+        }
+        converter.decodeAndFillVector(page, vectorInfo);
+      }
+
+      @Override
+      public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
           throws MemoryException, IOException {
         return decode(input, offset, length);
       }
@@ -226,5 +244,69 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec {
     public double decodeDouble(double value) {
       throw new RuntimeException("internal error: " + debugInfo());
     }
+
+    @Override
+    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+      CarbonColumnVector vector = vectorInfo.vector;
+      BitSet nullBits = columnPage.getNullBits();
+      DataType pageDataType = columnPage.getDataType();
+      int pageSize = columnPage.getPageSize();
+      BitSet deletedRows = vectorInfo.deletedRows;
+      DataType vectorDataType = vector.getType();
+      if (vectorDataType == DataTypes.FLOAT) {
+        if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+          byte[] byteData = columnPage.getBytePage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putFloat(i, (byteData[i] / floatFactor));
+          }
+        } else if (pageDataType == DataTypes.SHORT) {
+          short[] shortData = columnPage.getShortPage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putFloat(i, (shortData[i] / floatFactor));
+          }
+
+        } else if (pageDataType == DataTypes.SHORT_INT) {
+          byte[] shortIntPage = columnPage.getShortIntPage();
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putFloat(i, (shortInt / floatFactor));
+          }
+        } else {
+          throw new RuntimeException("internal error: " + this.toString());
+        }
+      } else {
+        if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+          byte[] byteData = columnPage.getBytePage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (byteData[i] / factor));
+          }
+        } else if (pageDataType == DataTypes.SHORT) {
+          short[] shortData = columnPage.getShortPage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (shortData[i] / factor));
+          }
+
+        } else if (pageDataType == DataTypes.SHORT_INT) {
+          byte[] shortIntPage = columnPage.getShortIntPage();
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putDouble(i, (shortInt / factor));
+          }
+        } else if (pageDataType == DataTypes.INT) {
+          int[] intData = columnPage.getIntPage();
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, (intData[i] / factor));
+          }
+        } else {
+          throw new RuntimeException("Unsupported datatype : " + pageDataType);
+        }
+      }
+
+      if (deletedRows == null || deletedRows.isEmpty()) {
+        for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+          vector.putNull(i);
+        }
+      }
+    }
   };
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
index f1c0ea0..bdf5373 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.adaptive;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
@@ -34,6 +36,10 @@ import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.Encoding;
 
@@ -111,6 +117,21 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
         return LazyColumnPage.newPage(page, converter);
       }
 
+      @Override
+      public void decodeAndFillVector(byte[] input, int offset, int length,
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          throws MemoryException, IOException {
+        ColumnPage page = null;
+        if (DataTypes.isDecimal(meta.getSchemaDataType())) {
+          page = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+          vectorInfo.decimalConverter = ((DecimalColumnPage) page).getDecimalConverter();
+        } else {
+          page = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+        }
+        page.setNullBits(nullBits);
+        converter.decodeAndFillVector(page, vectorInfo);
+      }
+
       @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
           throws MemoryException, IOException {
         return decode(input, offset, length);
@@ -248,6 +269,142 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec {
     public double decodeDouble(double value) {
       throw new RuntimeException("internal error: " + debugInfo());
     }
+
+    @Override
+    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+      CarbonColumnVector vector = vectorInfo.vector;
+      BitSet nullBits = columnPage.getNullBits();
+      DataType vectorDataType = vector.getType();
+      DataType pageDataType = columnPage.getDataType();
+      int pageSize = columnPage.getPageSize();
+      BitSet deletedRows = vectorInfo.deletedRows;
+      fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+      if (deletedRows == null || deletedRows.isEmpty()) {
+        for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+          vector.putNull(i);
+        }
+      }
+    }
+
+    private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+        DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+      if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+        byte[] byteData = columnPage.getBytePage();
+        if (vectorDataType == DataTypes.SHORT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putShort(i, (short) byteData[i]);
+          }
+        } else if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) byteData[i]);
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, byteData[i]);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, byteData[i] * 1000);
+          }
+        } else if (vectorDataType == DataTypes.BOOLEAN) {
+          vector.putBytes(0, pageSize, byteData, 0);
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(byteData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, byteData[i]);
+          }
+        }
+      } else if (pageDataType == DataTypes.SHORT) {
+        short[] shortData = columnPage.getShortPage();
+        if (vectorDataType == DataTypes.SHORT) {
+          vector.putShorts(0, pageSize, shortData, 0);
+        } else if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) shortData[i]);
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, shortData[i]);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, shortData[i] * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(shortData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, shortData[i]);
+          }
+        }
+
+      } else if (pageDataType == DataTypes.SHORT_INT) {
+        byte[] shortIntPage = columnPage.getShortIntPage();
+        if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putInt(i, shortInt);
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putLong(i, shortInt);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putLong(i, shortInt * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int[] shortIntData = ByteUtil.toIntArray(shortIntPage, pageSize);
+          decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putDouble(i, shortInt);
+          }
+        }
+      } else if (pageDataType == DataTypes.INT) {
+        int[] intData = columnPage.getIntPage();
+        if (vectorDataType == DataTypes.INT) {
+          vector.putInts(0, pageSize, intData, 0);
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, intData[i]);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, intData[i] * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(intData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, intData[i]);
+          }
+        }
+      } else if (pageDataType == DataTypes.LONG) {
+        long[] longData = columnPage.getLongPage();
+        if (vectorDataType == DataTypes.LONG) {
+          vector.putLongs(0, pageSize, longData, 0);
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, longData[i] * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(longData, pageSize, vectorInfo, columnPage.getNullBits());
+        }
+      } else {
+        double[] doubleData = columnPage.getDoublePage();
+        vector.putDoubles(0, pageSize, doubleData, 0);
+      }
+    }
   };
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index aa03ec1..4d1e6e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.page.encoding.compress;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -26,6 +27,7 @@ import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ColumnPageValueConverter;
+import org.apache.carbondata.core.datastore.page.DecimalColumnPage;
 import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -34,6 +36,10 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.DecimalConverterFactory;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -95,10 +101,25 @@ public class DirectCompressCodec implements ColumnPageCodec {
         return LazyColumnPage.newPage(decodedPage, converter);
       }
 
+      @Override
+      public void decodeAndFillVector(byte[] input, int offset, int length,
+          ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+          throws MemoryException, IOException {
+        ColumnPage decodedPage;
+        if (DataTypes.isDecimal(dataType)) {
+          decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length);
+          vectorInfo.decimalConverter = ((DecimalColumnPage) decodedPage).getDecimalConverter();
+        } else {
+          decodedPage = ColumnPage.decompress(meta, input, offset, length, isLVEncoded);
+        }
+        decodedPage.setNullBits(nullBits);
+        converter.decodeAndFillVector(decodedPage, vectorInfo);
+      }
+
       @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
-        throws MemoryException, IOException {
-        return LazyColumnPage.newPage(
-            ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
+          throws MemoryException, IOException {
+        return LazyColumnPage
+            .newPage(ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter);
       }
     };
   }
@@ -178,6 +199,149 @@ public class DirectCompressCodec implements ColumnPageCodec {
     public double decodeDouble(double value) {
       return value;
     }
+
+    @Override
+    public void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo) {
+      CarbonColumnVector vector = vectorInfo.vector;
+      BitSet nullBits = columnPage.getNullBits();
+      DataType vectorDataType = vector.getType();
+      DataType pageDataType = columnPage.getDataType();
+      int pageSize = columnPage.getPageSize();
+      BitSet deletedRows = vectorInfo.deletedRows;
+      fillVector(columnPage, vector, vectorDataType, pageDataType, pageSize, vectorInfo);
+      if (deletedRows == null || deletedRows.isEmpty()) {
+        for (int i = nullBits.nextSetBit(0); i >= 0; i = nullBits.nextSetBit(i + 1)) {
+          vector.putNull(i);
+        }
+      }
+    }
+
+    private void fillVector(ColumnPage columnPage, CarbonColumnVector vector,
+        DataType vectorDataType, DataType pageDataType, int pageSize, ColumnVectorInfo vectorInfo) {
+      if (pageDataType == DataTypes.BOOLEAN || pageDataType == DataTypes.BYTE) {
+        byte[] byteData = columnPage.getBytePage();
+        if (vectorDataType == DataTypes.SHORT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putShort(i, (short) byteData[i]);
+          }
+        } else if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) byteData[i]);
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, byteData[i]);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, byteData[i] * 1000);
+          }
+        } else if (vectorDataType == DataTypes.BOOLEAN || vectorDataType == DataTypes.BYTE) {
+          vector.putBytes(0, pageSize, byteData, 0);
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(byteData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, byteData[i]);
+          }
+        }
+      } else if (pageDataType == DataTypes.SHORT) {
+        short[] shortData = columnPage.getShortPage();
+        if (vectorDataType == DataTypes.SHORT) {
+          vector.putShorts(0, pageSize, shortData, 0);
+        } else if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putInt(i, (int) shortData[i]);
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, shortData[i]);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, shortData[i] * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(shortData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, shortData[i]);
+          }
+        }
+
+      } else if (pageDataType == DataTypes.SHORT_INT) {
+        byte[] shortIntPage = columnPage.getShortIntPage();
+        if (vectorDataType == DataTypes.INT) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putInt(i, shortInt);
+          }
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putLong(i, shortInt);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putLong(i, shortInt * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          int[] shortIntData = ByteUtil.toIntArray(shortIntPage, pageSize);
+          decimalConverter.fillVector(shortIntData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            int shortInt = ByteUtil.valueOf3Bytes(shortIntPage, i * 3);
+            vector.putDouble(i, shortInt);
+          }
+        }
+      } else if (pageDataType == DataTypes.INT) {
+        int[] intData = columnPage.getIntPage();
+        if (vectorDataType == DataTypes.INT) {
+          vector.putInts(0, pageSize, intData, 0);
+        } else if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, intData[i]);
+          }
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, intData[i] * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(intData, pageSize, vectorInfo, columnPage.getNullBits());
+        } else {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putDouble(i, intData[i]);
+          }
+        }
+      }  else if (pageDataType == DataTypes.LONG) {
+        long[] longData = columnPage.getLongPage();
+        if (vectorDataType == DataTypes.LONG) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, longData[i]);
+          }
+          vector.putLongs(0, pageSize, longData, 0);
+        } else if (vectorDataType == DataTypes.TIMESTAMP) {
+          for (int i = 0; i < pageSize; i++) {
+            vector.putLong(i, longData[i] * 1000);
+          }
+        } else if (DataTypes.isDecimal(vectorDataType)) {
+          DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+          decimalConverter.fillVector(longData, pageSize, vectorInfo, columnPage.getNullBits());
+        }
+      } else if (DataTypes.isDecimal(pageDataType)) {
+        DecimalConverterFactory.DecimalConverter decimalConverter = vectorInfo.decimalConverter;
+        decimalConverter.fillVector(columnPage.getByteArrayPage(), pageSize, vectorInfo,
+            columnPage.getNullBits());
+      } else {
+        double[] doubleData = columnPage.getDoublePage();
+        vector.putDoubles(0, pageSize, doubleData, 0);
+      }
+    }
   };
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
index e7d4118..c9b47db 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/rle/RLECodec.java
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
@@ -35,6 +36,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.format.Encoding;
 
 /**
@@ -314,6 +316,13 @@ public class RLECodec implements ColumnPageCodec {
       return resultPage;
     }
 
+    @Override
+    public void decodeAndFillVector(byte[] input, int offset, int length,
+        ColumnVectorInfo vectorInfo, BitSet nullBits, boolean isLVEncoded)
+        throws MemoryException, IOException {
+      throw new UnsupportedOperationException("Not supposed to be called here");
+    }
+
     @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded)
         throws MemoryException, IOException {
       return decode(input, offset, length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
index a49eced..67d70e3 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
@@ -35,7 +35,7 @@ import org.apache.log4j.Logger;
  */
 public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator {
 
-  private static final int cutOffDate = Integer.MAX_VALUE >> 1;
+  public static final int cutOffDate = Integer.MAX_VALUE >> 1;
   private static final long SECONDS_PER_DAY = 60 * 60 * 24L;
   public static final long MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
index a8da6d4..89a3168 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DecimalConverterFactory.java
@@ -19,7 +19,10 @@ package org.apache.carbondata.core.metadata.datatype;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Arrays;
+import java.util.BitSet;
 
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
@@ -72,6 +75,8 @@ public final class DecimalConverterFactory {
 
     BigDecimal getDecimal(Object valueToBeConverted);
 
+    void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info, BitSet nullBitset);
+
     int getSize();
 
     DecimalConverterType getDecimalConverterType();
@@ -80,7 +85,7 @@ public final class DecimalConverterFactory {
 
   public static class DecimalIntConverter implements DecimalConverter {
 
-    private int scale;
+    protected int scale;
 
     DecimalIntConverter(int scale) {
       this.scale = scale;
@@ -95,6 +100,51 @@ public final class DecimalConverterFactory {
       return BigDecimal.valueOf((Long) valueToBeConverted, scale);
     }
 
+    @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
+        BitSet nullBitset) {
+      // TODO we need to find way to directly set to vector with out conversion. This way is very
+      // inefficient.
+      CarbonColumnVector vector = info.vector;
+      int precision = info.measure.getMeasure().getPrecision();
+      if (valuesToBeConverted instanceof byte[]) {
+        byte[] data = (byte[]) valuesToBeConverted;
+        for (int i = 0; i < size; i++) {
+          if (nullBitset.get(i)) {
+            vector.putNull(i);
+          } else {
+            vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+          }
+        }
+      } else if (valuesToBeConverted instanceof short[]) {
+        short[] data = (short[]) valuesToBeConverted;
+        for (int i = 0; i < size; i++) {
+          if (nullBitset.get(i)) {
+            vector.putNull(i);
+          } else {
+            vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+          }
+        }
+      } else if (valuesToBeConverted instanceof int[]) {
+        int[] data = (int[]) valuesToBeConverted;
+        for (int i = 0; i < size; i++) {
+          if (nullBitset.get(i)) {
+            vector.putNull(i);
+          } else {
+            vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+          }
+        }
+      } else if (valuesToBeConverted instanceof long[]) {
+        long[] data = (long[]) valuesToBeConverted;
+        for (int i = 0; i < size; i++) {
+          if (nullBitset.get(i)) {
+            vector.putNull(i);
+          } else {
+            vector.putDecimal(i, BigDecimal.valueOf(data[i], scale), precision);
+          }
+        }
+      }
+    }
+
     @Override public int getSize() {
       return 4;
     }
@@ -104,12 +154,10 @@ public final class DecimalConverterFactory {
     }
   }
 
-  public static class DecimalLongConverter implements DecimalConverter {
-
-    private int scale;
+  public static class DecimalLongConverter extends DecimalIntConverter {
 
     DecimalLongConverter(int scale) {
-      this.scale = scale;
+      super(scale);
     }
 
     @Override public Object convert(BigDecimal decimal) {
@@ -173,6 +221,23 @@ public final class DecimalConverterFactory {
       return new BigDecimal(bigInteger, scale);
     }
 
+    @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
+        BitSet nullBitset) {
+      CarbonColumnVector vector = info.vector;
+      int precision = info.measure.getMeasure().getPrecision();
+      if (valuesToBeConverted instanceof byte[][]) {
+        byte[][] data = (byte[][]) valuesToBeConverted;
+        for (int i = 0; i < size; i++) {
+          if (nullBitset.get(i)) {
+            vector.putNull(i);
+          } else {
+            BigInteger bigInteger = new BigInteger(data[i]);
+            vector.putDecimal(i, new BigDecimal(bigInteger, scale), precision);
+          }
+        }
+      }
+    }
+
     @Override public int getSize() {
       return numBytes;
     }
@@ -194,6 +259,22 @@ public final class DecimalConverterFactory {
       return DataTypeUtil.byteToBigDecimal((byte[]) valueToBeConverted);
     }
 
+    @Override public void fillVector(Object valuesToBeConverted, int size, ColumnVectorInfo info,
+        BitSet nullBitset) {
+      CarbonColumnVector vector = info.vector;
+      int precision = info.measure.getMeasure().getPrecision();
+      if (valuesToBeConverted instanceof byte[][]) {
+        byte[][] data = (byte[][]) valuesToBeConverted;
+        for (int i = 0; i < size; i++) {
+          if (nullBitset.get(i)) {
+            vector.putNull(i);
+          } else {
+            vector.putDecimal(i, DataTypeUtil.byteToBigDecimal(data[i]), precision);
+          }
+        }
+      }
+    }
+
     @Override public int getSize() {
       return -1;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
index d68e4e9..ac50d7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/DeleteDeltaVo.java
@@ -57,4 +57,8 @@ public class DeleteDeltaVo {
   public boolean containsRow(int counter) {
     return bitSet.get(counter);
   }
+
+  public BitSet getBitSet() {
+    return bitSet;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
index 8695d90..430a555 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -18,10 +18,13 @@ package org.apache.carbondata.core.scan.collector.impl;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.List;
 
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.mutate.DeleteDeltaVo;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.model.ProjectionDimension;
 import org.apache.carbondata.core.scan.model.ProjectionMeasure;
@@ -30,11 +33,19 @@ import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
 import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.scan.result.vector.MeasureDataVectorProcessor;
 
+import org.apache.log4j.Logger;
+
 /**
  * It is not a collector it is just a scanned result holder.
  */
 public class DictionaryBasedVectorResultCollector extends AbstractScannedResultCollector {
 
+  /**
+   * logger of result collector factory
+   */
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(DictionaryBasedVectorResultCollector.class.getName());
+
   protected ProjectionDimension[] queryDimensions;
 
   protected ProjectionMeasure[] queryMeasures;
@@ -51,8 +62,14 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
 
   private ColumnVectorInfo[] implictColumnInfo;
 
+  private boolean isDirectVectorFill;
+
   public DictionaryBasedVectorResultCollector(BlockExecutionInfo blockExecutionInfos) {
     super(blockExecutionInfos);
+    this.isDirectVectorFill = blockExecutionInfos.isDirectVectorFill();
+    if (this.isDirectVectorFill) {
+      LOGGER.info("Direct pagewise vector fill collector is used to scan and collect the data");
+    }
     // initialize only if the current block is not a restructured block else the initialization
     // will be taken care by RestructureBasedVectorResultCollector
     if (!blockExecutionInfos.isRestructuredBlock()) {
@@ -141,29 +158,33 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
   @Override
   public void collectResultInColumnarBatch(BlockletScannedResult scannedResult,
       CarbonColumnarBatch columnarBatch) {
-    int numberOfPages = scannedResult.numberOfpages();
-    int filteredRows = 0;
-    while (scannedResult.getCurrentPageCounter() < numberOfPages) {
-      int currentPageRowCount = scannedResult.getCurrentPageRowCount();
-      if (currentPageRowCount == 0) {
-        scannedResult.incrementPageCounter();
-        continue;
-      }
-      int rowCounter = scannedResult.getRowCounter();
-      int availableRows = currentPageRowCount - rowCounter;
-      // getRowCounter holds total number or rows being placed in Vector. Calculate the
-      // Left over space through getRowCounter only.
-      int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getRowCounter();
-      requiredRows = Math.min(requiredRows, availableRows);
-      if (requiredRows < 1) {
-        return;
+    if (isDirectVectorFill) {
+      collectResultInColumnarBatchDirect(scannedResult, columnarBatch);
+    } else {
+      int numberOfPages = scannedResult.numberOfpages();
+      int filteredRows = 0;
+      while (scannedResult.getCurrentPageCounter() < numberOfPages) {
+        int currentPageRowCount = scannedResult.getCurrentPageRowCount();
+        if (currentPageRowCount == 0) {
+          scannedResult.incrementPageCounter();
+          continue;
+        }
+        int rowCounter = scannedResult.getRowCounter();
+        int availableRows = currentPageRowCount - rowCounter;
+        // getRowCounter holds total number or rows being placed in Vector. Calculate the
+        // Left over space through getRowCounter only.
+        int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getRowCounter();
+        requiredRows = Math.min(requiredRows, availableRows);
+        if (requiredRows < 1) {
+          return;
+        }
+        fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
+        filteredRows = scannedResult.markFilteredRows(columnarBatch, rowCounter, requiredRows,
+            columnarBatch.getRowCounter());
+        fillResultToColumnarBatch(scannedResult, columnarBatch, rowCounter, availableRows,
+            requiredRows);
+        columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);
       }
-      fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows);
-      filteredRows = scannedResult.markFilteredRows(
-          columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter());
-      fillResultToColumnarBatch(
-          scannedResult, columnarBatch, rowCounter, availableRows, requiredRows);
-      columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows);
     }
   }
 
@@ -198,4 +219,51 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC
     }
   }
 
+  /**
+   * Fill the vector during the page decoding.
+   */
+  private void collectResultInColumnarBatchDirect(BlockletScannedResult scannedResult,
+      CarbonColumnarBatch columnarBatch) {
+    int numberOfPages = scannedResult.numberOfpages();
+    while (scannedResult.getCurrentPageCounter() < numberOfPages) {
+      int currentPageRowCount = scannedResult.getCurrentPageRowCount();
+      if (currentPageRowCount == 0) {
+        scannedResult.incrementPageCounter(null);
+        continue;
+      }
+      DeleteDeltaVo deltaVo = scannedResult.getCurrentDeleteDeltaVo();
+      BitSet bitSet = null;
+      int deletedRows = 0;
+      if (deltaVo != null) {
+        bitSet = deltaVo.getBitSet();
+        deletedRows = bitSet.cardinality();
+      }
+      fillColumnVectorDetails(columnarBatch, bitSet);
+      fillResultToColumnarBatch(scannedResult);
+      columnarBatch.setActualSize(currentPageRowCount - deletedRows);
+      scannedResult.setRowCounter(currentPageRowCount - deletedRows);
+      scannedResult.incrementPageCounter(null);
+      return;
+    }
+  }
+
+  private void fillResultToColumnarBatch(BlockletScannedResult scannedResult) {
+    scannedResult.fillDataChunks(dictionaryInfo, noDictionaryInfo, measureColumnInfo,
+        measureInfo.getMeasureOrdinals());
+
+  }
+
+  private void fillColumnVectorDetails(CarbonColumnarBatch columnarBatch,
+      BitSet deltaBitSet) {
+    for (int i = 0; i < allColumnInfo.length; i++) {
+      allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
+      allColumnInfo[i].vector = columnarBatch.columnVectors[i];
+      allColumnInfo[i].deletedRows = deltaBitSet;
+      if (null != allColumnInfo[i].dimension) {
+        allColumnInfo[i].vector.setBlockDataType(dimensionInfo.dataType[i]);
+      }
+    }
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 6a6a929..fed0faf 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -478,6 +478,19 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     } else {
       blockExecutionInfo.setPrefetchBlocklet(queryModel.isPreFetchData());
     }
+    // In case of fg datamap it should not go to direct fill.
+    boolean fgDataMapPathPresent = false;
+    for (TableBlockInfo blockInfo : queryModel.getTableBlockInfos()) {
+      fgDataMapPathPresent = blockInfo.getDataMapWriterPath() != null;
+      if (fgDataMapPathPresent) {
+        queryModel.setDirectVectorFill(false);
+        break;
+      }
+    }
+
+    blockExecutionInfo
+        .setDirectVectorFill(queryModel.isDirectVectorFill());
+
     blockExecutionInfo
         .setTotalNumberOfMeasureToRead(segmentProperties.getMeasuresOrdinalToChunkMapping().size());
     blockExecutionInfo.setComplexDimensionInfoMap(QueryUtil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
index e737b0e..f0ef23b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/infos/BlockExecutionInfo.java
@@ -217,6 +217,11 @@ public class BlockExecutionInfo {
   private QueryStatisticsModel queryStatisticsModel;
 
   /**
+   * It fills the vector directly from decoded column page with out any staging and conversions
+   */
+  private boolean isDirectVectorFill;
+
+  /**
    * @param blockIndex the tableBlock to set
    */
   public void setDataBlock(AbstractIndex blockIndex) {
@@ -625,4 +630,12 @@ public class BlockExecutionInfo {
   public void setQueryStatisticsModel(QueryStatisticsModel queryStatisticsModel) {
     this.queryStatisticsModel = queryStatisticsModel;
   }
+
+  public boolean isDirectVectorFill() {
+    return isDirectVectorFill && !isRestructuredBlock;
+  }
+
+  public void setDirectVectorFill(boolean directVectorFill) {
+    isDirectVectorFill = directVectorFill;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 22e1e72..49157f9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -760,7 +760,7 @@ public class QueryUtil {
       vector.putNull(vectorRow);
     } else {
       if (dt == DataTypes.STRING) {
-        vector.putBytes(vectorRow, 0, length, value);
+        vector.putByteArray(vectorRow, 0, length, value);
       } else if (dt == DataTypes.BOOLEAN) {
         vector.putBoolean(vectorRow, ByteUtil.toBoolean(value[0]));
       } else if (dt == DataTypes.BYTE) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
index 0951da0..d7dcee0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModel.java
@@ -125,7 +125,11 @@ public class QueryModel {
   private boolean preFetchData = true;
 
   /**
-   * It fills the vector directly from decoded column page with out any staging and conversions
+   * It fills the vector directly from decoded column page with out any staging and conversions.
+   * Execution engine can set this filed to true in case of vector flow. Note that execution engine
+   * should make sure that batch size vector should be greater than or equal to column page size.
+   * In this flow only pages will be pruned and decode the page and fill the complete page data to
+   * vector, so it is execution engine responsibility to filter the rows at row level.
    */
   private boolean isDirectVectorFill;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index 9191d08..4963441 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -73,6 +73,11 @@ public abstract class BlockletScannedResult {
   private int[] pageFilteredRowCount;
 
   /**
+   * Filtered pages to be decoded and loaded to vector.
+   */
+  private int[] pageIdFiltered;
+
+  /**
    * to keep track of number of rows process
    */
   protected int rowCounter;
@@ -304,7 +309,7 @@ public abstract class BlockletScannedResult {
               j :
               pageFilteredRowId[pageCounter][j]);
         }
-        vector.putBytes(vectorOffset++,
+        vector.putByteArray(vectorOffset++,
             data.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
       }
     }
@@ -342,6 +347,19 @@ public abstract class BlockletScannedResult {
   }
 
   /**
+   * Just increment the page counter and reset the remaining counters.
+   */
+  public void incrementPageCounter(ColumnVectorInfo[] vectorInfos) {
+    rowCounter = 0;
+    currentRow = -1;
+    pageCounter++;
+    if (null != deletedRecordMap && pageCounter < pageIdFiltered.length) {
+      currentDeleteDeltaVo =
+          deletedRecordMap.get(blockletNumber + "_" + pageIdFiltered[pageCounter]);
+    }
+  }
+
+  /**
    * This case is used only in case of compaction, since it does not use filter flow.
    */
   public void fillDataChunks() {
@@ -369,6 +387,36 @@ public abstract class BlockletScannedResult {
         pageUncompressTime.getCount() + (System.currentTimeMillis() - startTime));
   }
 
+  /**
+   * Fill all the vectors with data by decompressing/decoding the column page
+   */
+  public void fillDataChunks(ColumnVectorInfo[] dictionaryInfo, ColumnVectorInfo[] noDictionaryInfo,
+      ColumnVectorInfo[] msrVectorInfo, int[] measuresOrdinal) {
+    freeDataChunkMemory();
+    if (pageCounter >= pageFilteredRowCount.length) {
+      return;
+    }
+    long startTime = System.currentTimeMillis();
+
+    for (int i = 0; i < this.dictionaryColumnChunkIndexes.length; i++) {
+      dimRawColumnChunks[dictionaryColumnChunkIndexes[i]]
+          .convertToDimColDataChunkAndFillVector(pageIdFiltered[pageCounter], dictionaryInfo[i]);
+    }
+    for (int i = 0; i < this.noDictionaryColumnChunkIndexes.length; i++) {
+      dimRawColumnChunks[noDictionaryColumnChunkIndexes[i]]
+          .convertToDimColDataChunkAndFillVector(pageIdFiltered[pageCounter], noDictionaryInfo[i]);
+    }
+
+    for (int i = 0; i < measuresOrdinal.length; i++) {
+      msrRawColumnChunks[measuresOrdinal[i]]
+          .convertToColumnPageAndFillVector(pageIdFiltered[pageCounter], msrVectorInfo[i]);
+    }
+    QueryStatistic pageUncompressTime = queryStatisticsModel.getStatisticsTypeAndObjMap()
+        .get(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME);
+    pageUncompressTime.addCountStatistic(QueryStatisticsConstants.PAGE_UNCOMPRESS_TIME,
+        pageUncompressTime.getCount() + (System.currentTimeMillis() - startTime));
+  }
+
   // free the memory for the last page chunk
   private void freeDataChunkMemory() {
     for (int i = 0; i < dimensionColumnPages.length; i++) {
@@ -390,6 +438,14 @@ public abstract class BlockletScannedResult {
     return pageFilteredRowCount.length;
   }
 
+  public int[] getPageIdFiltered() {
+    return pageIdFiltered;
+  }
+
+  public void setPageIdFiltered(int[] pageIdFiltered) {
+    this.pageIdFiltered = pageIdFiltered;
+  }
+
   /**
    * Get total rows in the current page
    *
@@ -513,7 +569,13 @@ public abstract class BlockletScannedResult {
     // if deleted recors map is present for this block
     // then get the first page deleted vo
     if (null != deletedRecordMap) {
-      currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + '_' + pageCounter);
+      String key;
+      if (pageIdFiltered != null) {
+        key = blockletNumber + '_' + pageIdFiltered[pageCounter];
+      } else {
+        key = blockletNumber + '_' + pageCounter;
+      }
+      currentDeleteDeltaVo = deletedRecordMap.get(key);
     }
   }
 
@@ -616,6 +678,12 @@ public abstract class BlockletScannedResult {
    */
   public void setPageFilteredRowCount(int[] pageFilteredRowCount) {
     this.pageFilteredRowCount = pageFilteredRowCount;
+    if (pageIdFiltered == null) {
+      pageIdFiltered = new int[pageFilteredRowCount.length];
+      for (int i = 0; i < pageIdFiltered.length; i++) {
+        pageIdFiltered[i] = i;
+      }
+    }
   }
 
   /**
@@ -714,6 +782,10 @@ public abstract class BlockletScannedResult {
     return rowsFiltered;
   }
 
+  public DeleteDeltaVo getCurrentDeleteDeltaVo() {
+    return currentDeleteDeltaVo;
+  }
+
   /**
    * Below method will be used to check row got deleted
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
index dd0e8b9..f670884 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/vector/CarbonColumnVector.java
@@ -27,18 +27,26 @@ public interface CarbonColumnVector {
 
   void putFloat(int rowId, float value);
 
+  void putFloats(int rowId, int count, float[] src, int srcIndex);
+
   void putShort(int rowId, short value);
 
   void putShorts(int rowId, int count, short value);
 
+  void putShorts(int rowId, int count, short[] src, int srcIndex);
+
   void putInt(int rowId, int value);
 
   void putInts(int rowId, int count, int value);
 
+  void putInts(int rowId, int count, int[] src, int srcIndex);
+
   void putLong(int rowId, long value);
 
   void putLongs(int rowId, int count, long value);
 
+  void putLongs(int rowId, int count, long[] src, int srcIndex);
+
   void putDecimal(int rowId, BigDecimal value, int precision);
 
   void putDecimals(int rowId, int count, BigDecimal value, int precision);
@@ -47,14 +55,18 @@ public interface CarbonColumnVector {
 
   void putDoubles(int rowId, int count, double value);
 
-  void putBytes(int rowId, byte[] value);
+  void putDoubles(int rowId, int count, double[] src, int srcIndex);
 
-  void putBytes(int rowId, int count, byte[] value);
+  void putByteArray(int rowId, byte[] value);
 
-  void putBytes(int rowId, int offset, int length, byte[] value);
+  void putByteArray(int rowId, int offset, int length, byte[] value);
 
   void putByte(int rowId, byte value);
 
+  void putBytes(int rowId, int count, byte[] value);
+
+  void putBytes(int rowId, int count, byte[] src, int srcIndex);
+
   void putNull(int rowId);
 
   void putNulls(int rowId, int count);