You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/10/25 16:55:39 UTC

[3/3] carbondata git commit: [CARBONDATA-3012] Added support for full scan queries for vector direct fill.

[CARBONDATA-3012] Added support for full scan queries for vector direct fill.

After decompressing the page in our V3 reader we can immediately fill the data to a vector without any condition checks inside loops.
So here complete column page data is set to column vector in a single batch and gives back data to Spark/Presto.
For this purpose, a new method is added in ColumnPageDecoder

ColumnPage decodeAndFillVector(byte[] input, int offset, int length, ColumnVectorInfo vectorInfo,
      BitSet nullBits, boolean isLVEncoded)
The above method takes vector fill it in a single loop without any checks inside loop.

And also added new method inside DimensionDataChunkStore

 void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
      ColumnVectorInfo vectorInfo);
The above method takes vector fill it in a single loop without any checks inside loop.

This closes #2818


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3d3b6ff1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3d3b6ff1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3d3b6ff1

Branch: refs/heads/master
Commit: 3d3b6ff1615e08131f6bcaea23dec0116a18081d
Parents: e0baa9b
Author: ravipesala <ra...@gmail.com>
Authored: Tue Oct 16 11:30:43 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Thu Oct 25 22:24:24 2018 +0530

----------------------------------------------------------------------
 .../chunk/impl/DimensionRawColumnChunk.java     |  17 ++
 .../impl/FixedLengthDimensionColumnPage.java    |  29 +-
 .../chunk/impl/MeasureRawColumnChunk.java       |  17 ++
 .../impl/VariableLengthDimensionColumnPage.java |  29 +-
 .../reader/DimensionColumnChunkReader.java      |   7 +
 .../chunk/reader/MeasureColumnChunkReader.java  |   7 +
 .../reader/dimension/AbstractChunkReader.java   |  11 +
 ...essedDimChunkFileBasedPageLevelReaderV3.java |   2 +-
 ...mpressedDimensionChunkFileBasedReaderV3.java |  78 +++--
 .../measure/AbstractMeasureChunkReader.java     |  12 +
 ...CompressedMeasureChunkFileBasedReaderV3.java |  45 ++-
 ...essedMsrChunkFileBasedPageLevelReaderV3.java |   6 +-
 .../chunk/store/DimensionChunkStoreFactory.java |  16 +-
 .../chunk/store/DimensionDataChunkStore.java    |   7 +
 .../impl/LocalDictDimensionDataChunkStore.java  |  25 ++
 .../safe/AbstractNonDictionaryVectorFiller.java | 282 ++++++++++++++++++
 .../SafeFixedLengthDimensionDataChunkStore.java |  51 +++-
 ...feVariableLengthDimensionDataChunkStore.java |  17 +-
 .../UnsafeAbstractDimensionDataChunkStore.java  |   6 +
 .../datastore/columnar/BlockIndexerStorage.java |   5 +-
 .../BlockIndexerStorageForNoDictionary.java     |   3 +-
 .../columnar/BlockIndexerStorageForShort.java   |   3 +-
 .../core/datastore/columnar/UnBlockIndexer.java |   3 +
 .../core/datastore/impl/FileReaderImpl.java     |   1 +
 .../core/datastore/page/ColumnPage.java         | 130 ++++----
 .../page/ColumnPageValueConverter.java          |   3 +
 .../datastore/page/SafeDecimalColumnPage.java   |  25 ++
 .../datastore/page/VarLengthColumnPageBase.java |  17 +-
 .../page/encoding/ColumnPageDecoder.java        |   8 +
 .../page/encoding/ColumnPageEncoderMeta.java    |  11 +
 .../page/encoding/EncodingFactory.java          |  44 ++-
 .../adaptive/AdaptiveDeltaFloatingCodec.java    |  82 +++++
 .../adaptive/AdaptiveDeltaIntegralCodec.java    | 194 +++++++++++-
 .../adaptive/AdaptiveFloatingCodec.java         |  84 +++++-
 .../adaptive/AdaptiveIntegralCodec.java         | 157 ++++++++++
 .../encoding/compress/DirectCompressCodec.java  | 170 ++++++++++-
 .../datastore/page/encoding/rle/RLECodec.java   |   9 +
 .../DateDirectDictionaryGenerator.java          |   2 +-
 .../datatype/DecimalConverterFactory.java       |  91 +++++-
 .../carbondata/core/mutate/DeleteDeltaVo.java   |   4 +
 .../DictionaryBasedVectorResultCollector.java   | 112 +++++--
 .../executor/impl/AbstractQueryExecutor.java    |  13 +
 .../scan/executor/infos/BlockExecutionInfo.java |  13 +
 .../core/scan/executor/util/QueryUtil.java      |   2 +-
 .../carbondata/core/scan/model/QueryModel.java  |   6 +-
 .../core/scan/result/BlockletScannedResult.java |  76 ++++-
 .../scan/result/vector/CarbonColumnVector.java  |  18 +-
 .../scan/result/vector/CarbonColumnarBatch.java |   4 +-
 .../scan/result/vector/CarbonDictionary.java    |   2 +
 .../scan/result/vector/ColumnVectorInfo.java    |   5 +
 .../vector/impl/CarbonColumnVectorImpl.java     |  67 ++++-
 .../vector/impl/CarbonDictionaryImpl.java       |   3 +
 .../scan/scanner/impl/BlockletFullScanner.java  |   4 +-
 .../core/stats/QueryStatisticsModel.java        |  13 +
 .../apache/carbondata/core/util/ByteUtil.java   |   8 +
 .../executer/IncludeFilterExecuterImplTest.java |   6 +-
 .../carbondata/core/util/CarbonUtilTest.java    |   2 +-
 .../presto/CarbonColumnVectorWrapper.java       |  65 +++-
 .../presto/readers/SliceStreamReader.java       |   4 +-
 .../filterexpr/AllDataTypesTestCaseFilter.scala |  18 +-
 .../vectorreader/ColumnarVectorWrapper.java     |  70 ++++-
 .../ColumnarVectorWrapperDirect.java            | 229 ++++++++++++++
 .../VectorizedCarbonRecordReader.java           |  30 +-
 .../org/apache/spark/sql/CarbonVectorProxy.java | 295 ++++++++++--------
 .../spark/sql/CarbonDictionaryWrapper.java      |   5 +-
 .../org/apache/spark/sql/CarbonVectorProxy.java | 297 +++++++++++--------
 .../stream/CarbonStreamRecordReader.java        |   2 +-
 67 files changed, 2616 insertions(+), 463 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
index 7b1aca1..d84434e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.scan.result.vector.impl.CarbonDictionaryImpl;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.format.Encoding;
@@ -121,6 +122,22 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk {
     }
   }
 
+  /**
+   * Convert raw data with specified page number processed to DimensionColumnDataChunk and fill
+   * the vector
+   *
+   * @param pageNumber page number to decode and fill the vector
+   * @param vectorInfo vector to be filled with column page
+   */
+  public void convertToDimColDataChunkAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
+    assert pageNumber < pagesCount;
+    try {
+      chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override public void freeMemory() {
     super.freeMemory();
     if (null != dataChunks) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
index c815e4d..e650e0e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
@@ -46,11 +46,38 @@ public class FixedLengthDimensionColumnPage extends AbstractDimensionColumnPage
         dataChunk.length;
     dataChunkStore = DimensionChunkStoreFactory.INSTANCE
         .getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
-            DimensionStoreType.FIXED_LENGTH, null);
+            DimensionStoreType.FIXED_LENGTH, null, false);
     dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
   }
 
   /**
+   * Constructor
+   *
+   * @param dataChunk            data chunk
+   * @param invertedIndex        inverted index
+   * @param invertedIndexReverse reverse inverted index
+   * @param numberOfRows         number of rows
+   * @param columnValueSize      size of each column value
+   * @param vectorInfo           vector to be filled with decoded column page.
+   */
+  public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex,
+      int[] invertedIndexReverse, int numberOfRows, int columnValueSize,
+      ColumnVectorInfo vectorInfo) {
+    boolean isExplicitSorted = isExplicitSorted(invertedIndex);
+    long totalSize = isExplicitSorted ?
+        dataChunk.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) :
+        dataChunk.length;
+    dataChunkStore = DimensionChunkStoreFactory.INSTANCE
+        .getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
+            DimensionStoreType.FIXED_LENGTH, null, vectorInfo != null);
+    if (vectorInfo == null) {
+      dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
+    } else {
+      dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunk, vectorInfo);
+    }
+  }
+
+  /**
    * Below method will be used to fill the data based on offset and row id
    *
    * @param rowId            row id of the chunk

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
index 9448f30..6a90569 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Contains raw measure data
@@ -105,6 +106,22 @@ public class MeasureRawColumnChunk extends AbstractRawColumnChunk {
     }
   }
 
+  /**
+   * Convert raw data with specified page number processed to DimensionColumnDataChunk and fill the
+   * vector
+   *
+   * @param pageNumber page number to decode and fill the vector
+   * @param vectorInfo vector to be filled with column page
+   */
+  public void convertToColumnPageAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
+    assert pageNumber < pagesCount;
+    try {
+      chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
+    } catch (IOException | MemoryException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override public void freeMemory() {
     super.freeMemory();
     if (null != columnPages) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
index a404ff7..6cb8174 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
@@ -30,10 +30,31 @@ public class VariableLengthDimensionColumnPage extends AbstractDimensionColumnPa
 
   /**
    * Constructor for this class
+   * @param dataChunks           data chunk
+   * @param invertedIndex        inverted index
+   * @param invertedIndexReverse reverse inverted index
+   * @param numberOfRows         number of rows
+   * @param dictionary           carbon local dictionary for string column.
    */
   public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
       int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
       CarbonDictionary dictionary) {
+    this(dataChunks, invertedIndex, invertedIndexReverse, numberOfRows, dimStoreType, dictionary,
+        null);
+  }
+
+  /**
+   * Constructor for this class
+   * @param dataChunks           data chunk
+   * @param invertedIndex        inverted index
+   * @param invertedIndexReverse reverse inverted index
+   * @param numberOfRows         number of rows
+   * @param dictionary           carbon local dictionary for string column.
+   * @param vectorInfo           vector to be filled with decoded column page.
+   */
+  public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
+      int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
+      CarbonDictionary dictionary, ColumnVectorInfo vectorInfo) {
     boolean isExplicitSorted = isExplicitSorted(invertedIndex);
     long totalSize = 0;
     switch (dimStoreType) {
@@ -54,8 +75,12 @@ public class VariableLengthDimensionColumnPage extends AbstractDimensionColumnPa
     }
     dataChunkStore = DimensionChunkStoreFactory.INSTANCE
         .getDimensionChunkStore(0, isExplicitSorted, numberOfRows, totalSize, dimStoreType,
-            dictionary);
-    dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
+            dictionary, vectorInfo != null);
+    if (vectorInfo != null) {
+      dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunks, vectorInfo);
+    } else {
+      dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
index fd81973..e2d6be7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Interface for reading the data chunk
@@ -60,4 +61,10 @@ public interface DimensionColumnChunkReader {
    */
   DimensionColumnPage decodeColumnPage(DimensionRawColumnChunk dimensionRawColumnChunk,
       int pageNumber) throws IOException, MemoryException;
+
+  /**
+   * Decodes the raw data chunk of given page number and fill the vector with decoded data.
+   */
+  void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
+      int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
index f1392d0..0fbbe6b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Reader interface for reading the measure blocks from file
@@ -58,4 +59,10 @@ public interface MeasureColumnChunkReader {
   ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
       int pageNumber) throws IOException, MemoryException;
 
+  /**
+   * Decode raw data and fill the vector
+   */
+  void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
+      int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
index b08f9ed..2c42abe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
@@ -16,10 +16,15 @@
  */
 package org.apache.carbondata.core.datastore.chunk.reader.dimension;
 
+import java.io.IOException;
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
@@ -79,4 +84,10 @@ public abstract class AbstractChunkReader implements DimensionColumnChunkReader
     this.numberOfRows = numberOfRows;
   }
 
+  @Override
+  public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
+      int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+    throw new UnsupportedOperationException(
+        "This operation is not supported in this reader " + this.getClass().getName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
index 6efaf8a..86a4334 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
@@ -171,6 +171,6 @@ public class CompressedDimChunkFileBasedPageLevelReaderV3
     ByteBuffer rawData = dimensionRawColumnChunk.getFileReader()
         .readByteBuffer(filePath, offset, length);
 
-    return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0);
+    return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0, null);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index b96e52e..a9f9338 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.dimension.v3;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -39,6 +40,7 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
@@ -207,6 +209,12 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
    */
   @Override public DimensionColumnPage decodeColumnPage(
       DimensionRawColumnChunk rawColumnPage, int pageNumber) throws IOException, MemoryException {
+    return decodeColumnPage(rawColumnPage, pageNumber, null);
+  }
+
+  private DimensionColumnPage decodeColumnPage(
+      DimensionRawColumnChunk rawColumnPage, int pageNumber,
+      ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
     // data chunk of blocklet column
     DataChunk3 dataChunk3 = rawColumnPage.getDataChunkV3();
     // get the data buffer
@@ -221,49 +229,65 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
     int offset = (int) rawColumnPage.getOffSet() + dimensionChunksLength
         .get(rawColumnPage.getColumnIndex()) + dataChunk3.getPage_offset().get(pageNumber);
     // first read the data and uncompressed it
-    return decodeDimension(rawColumnPage, rawData, pageMetadata, offset);
+    return decodeDimension(rawColumnPage, rawData, pageMetadata, offset, vectorInfo);
+  }
+
+  @Override
+  public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
+      int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+    DimensionColumnPage columnPage =
+        decodeColumnPage(dimensionRawColumnChunk, pageNumber, vectorInfo);
+    columnPage.freeMemory();
   }
 
-  private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata,
-      ByteBuffer pageData, int offset, boolean isLocalDictEncodedPage)
+  private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata, ByteBuffer pageData, int offset,
+      boolean isLocalDictEncodedPage, ColumnVectorInfo vectorInfo, BitSet nullBitSet)
       throws IOException, MemoryException {
     List<Encoding> encodings = pageMetadata.getEncoders();
     List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
     String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
         pageMetadata.getChunk_meta());
     ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas,
-        compressorName);
-    return decoder
-        .decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
+        compressorName, vectorInfo != null);
+    if (vectorInfo != null) {
+      decoder
+          .decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
+              nullBitSet, isLocalDictEncodedPage);
+      return null;
+    } else {
+      return decoder
+          .decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
+    }
   }
 
   protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnPage,
-      ByteBuffer pageData, DataChunk2 pageMetadata, int offset)
+      ByteBuffer pageData, DataChunk2 pageMetadata, int offset, ColumnVectorInfo vectorInfo)
       throws IOException, MemoryException {
     List<Encoding> encodings = pageMetadata.getEncoders();
     if (CarbonUtil.isEncodedWithMeta(encodings)) {
-      ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset,
-          null != rawColumnPage.getLocalDictionary());
-      decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
       int[] invertedIndexes = new int[0];
       int[] invertedIndexesReverse = new int[0];
       // in case of no dictionary measure data types, if it is included in sort columns
       // then inverted index to be uncompressed
-      if (encodings.contains(Encoding.INVERTED_INDEX)) {
+      boolean isExplicitSorted =
+          CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX);
+      int dataOffset = offset;
+      if (isExplicitSorted) {
         offset += pageMetadata.data_page_length;
-        if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
-          invertedIndexes = CarbonUtil
-              .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
-          // get the reverse index
-          invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
-        }
+        invertedIndexes = CarbonUtil
+            .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
+        // get the reverse index
+        invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
       }
+      BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
+      ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, dataOffset,
+          null != rawColumnPage.getLocalDictionary(), vectorInfo, nullBitSet);
+      decodedPage.setNullBits(nullBitSet);
       return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), invertedIndexes,
-          invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata),
-          CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX));
+          invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata), isExplicitSorted);
     } else {
       // following code is for backward compatibility
-      return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset);
+      return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset, vectorInfo);
     }
   }
 
@@ -283,8 +307,8 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
   }
 
   private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage,
-      ByteBuffer pageData, DataChunk2 pageMetadata, int offset) throws IOException,
-      MemoryException {
+      ByteBuffer pageData, DataChunk2 pageMetadata, int offset, ColumnVectorInfo vectorInfo)
+      throws IOException, MemoryException {
     byte[] dataPage;
     int[] rlePage;
     int[] invertedIndexes = new int[0];
@@ -296,8 +320,10 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
       invertedIndexes = CarbonUtil
           .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
       offset += pageMetadata.rowid_page_length;
-      // get the reverse index
-      invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
+      if (vectorInfo == null) {
+        // get the reverse index
+        invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
+      }
     }
     // if rle is applied then read the rle block chunk and then uncompress
     //then actual data based on rle block
@@ -324,13 +350,13 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
       columnDataChunk =
           new VariableLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
               pageMetadata.getNumberOfRowsInpage(), dimStoreType,
-              rawColumnPage.getLocalDictionary());
+              rawColumnPage.getLocalDictionary(), vectorInfo);
     } else {
       // to store fixed length column chunk values
       columnDataChunk =
           new FixedLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
               pageMetadata.getNumberOfRowsInpage(),
-              eachColumnValueSize[rawColumnPage.getColumnIndex()]);
+              eachColumnValueSize[rawColumnPage.getColumnIndex()], vectorInfo);
     }
     return columnDataChunk;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
index 6774fcb..cd233d2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
@@ -16,10 +16,15 @@
  */
 package org.apache.carbondata.core.datastore.chunk.reader.measure;
 
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
 import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Measure block reader abstract class
@@ -48,4 +53,11 @@ public abstract class AbstractMeasureChunkReader implements MeasureColumnChunkRe
     this.filePath = filePath;
     this.numberOfRows = numberOfRows;
   }
+
+  @Override
+  public void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
+      int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+    throw new UnsupportedOperationException(
+        "This operation is not supported in this class " + getClass().getName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index 240771a..8394029 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileReader;
@@ -29,6 +30,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
@@ -190,6 +192,18 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
   public ColumnPage decodeColumnPage(
       MeasureRawColumnChunk rawColumnChunk, int pageNumber)
       throws IOException, MemoryException {
+    return decodeColumnPage(rawColumnChunk, pageNumber, null);
+  }
+
+  @Override
+  public void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
+      int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+    decodeColumnPage(measureRawColumnChunk, pageNumber, vectorInfo);
+  }
+
+  private ColumnPage decodeColumnPage(
+      MeasureRawColumnChunk rawColumnChunk, int pageNumber, ColumnVectorInfo vectorInfo)
+      throws IOException, MemoryException {
     // data chunk of blocklet column
     DataChunk3 dataChunk3 = rawColumnChunk.getDataChunkV3();
     // data chunk of page
@@ -203,23 +217,34 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
     int offset = (int) rawColumnChunk.getOffSet() +
         measureColumnChunkLength.get(rawColumnChunk.getColumnIndex()) +
         dataChunk3.getPage_offset().get(pageNumber);
-    ColumnPage decodedPage = decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset);
-    decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
+    BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
+    ColumnPage decodedPage =
+        decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset, vectorInfo, nullBitSet);
+    if (decodedPage == null) {
+      return null;
+    }
+    decodedPage.setNullBits(nullBitSet);
     return decodedPage;
   }
 
   /**
    * Decode measure column page with page header and raw data starting from offset
    */
-  protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData, int offset)
-      throws MemoryException, IOException {
+  protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData, int offset,
+      ColumnVectorInfo vectorInfo, BitSet nullBitSet) throws MemoryException, IOException {
     List<Encoding> encodings = pageMetadata.getEncoders();
     List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
-    String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
-        pageMetadata.getChunk_meta());
-    ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas,
-        compressorName);
-    return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
+    String compressorName =
+        CarbonMetadataUtil.getCompressorNameFromChunkMeta(pageMetadata.getChunk_meta());
+    ColumnPageDecoder codec =
+        encodingFactory.createDecoder(encodings, encoderMetas, compressorName, vectorInfo != null);
+    if (vectorInfo != null) {
+      codec
+          .decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
+              nullBitSet, false);
+      return null;
+    } else {
+      return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
+    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
index 924a206..b092350 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 
 import org.apache.carbondata.core.datastore.FileReader;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
@@ -151,8 +152,9 @@ public class CompressedMsrChunkFileBasedPageLevelReaderV3
     ByteBuffer buffer = rawColumnPage.getFileReader()
         .readByteBuffer(filePath, offset, pageMetadata.data_page_length);
 
-    ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0);
-    decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
+    BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
+    ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0, null, nullBitSet);
+    decodedPage.setNullBits(nullBitSet);
     return decodedPage;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
index c7bcef1..5346f35 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
@@ -65,8 +65,8 @@ public class DimensionChunkStoreFactory {
    */
   public DimensionDataChunkStore getDimensionChunkStore(int columnValueSize,
       boolean isInvertedIndex, int numberOfRows, long totalSize, DimensionStoreType storeType,
-      CarbonDictionary dictionary) {
-    if (isUnsafe) {
+      CarbonDictionary dictionary, boolean fillDirectVector) {
+    if (isUnsafe && !fillDirectVector) {
       switch (storeType) {
         case FIXED_LENGTH:
           return new UnsafeFixedLengthDimensionDataChunkStore(totalSize, columnValueSize,
@@ -79,24 +79,24 @@ public class DimensionChunkStoreFactory {
               numberOfRows);
         case LOCAL_DICT:
           return new LocalDictDimensionDataChunkStore(
-              new UnsafeFixedLengthDimensionDataChunkStore(totalSize,
-                  3, isInvertedIndex, numberOfRows),
-              dictionary);
+              new UnsafeFixedLengthDimensionDataChunkStore(totalSize, 3, isInvertedIndex,
+                  numberOfRows), dictionary);
         default:
           throw new UnsupportedOperationException("Invalid dimension store type");
       }
     } else {
       switch (storeType) {
         case FIXED_LENGTH:
-          return new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, columnValueSize);
+          return new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, columnValueSize,
+              numberOfRows);
         case VARIABLE_SHORT_LENGTH:
           return new SafeVariableShortLengthDimensionDataChunkStore(isInvertedIndex, numberOfRows);
         case VARIABLE_INT_LENGTH:
           return new SafeVariableIntLengthDimensionDataChunkStore(isInvertedIndex, numberOfRows);
         case LOCAL_DICT:
           return new LocalDictDimensionDataChunkStore(
-              new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex,
-                  3), dictionary);
+              new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, 3, numberOfRows),
+              dictionary);
         default:
           throw new UnsupportedOperationException("Invalid dimension store type");
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
index 28aed5b..8972ddb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.datastore.chunk.store;
 
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Interface responsibility is to store dimension data in memory.
@@ -35,6 +36,12 @@ public interface DimensionDataChunkStore {
   void putArray(int[] invertedIndex, int[] invertedIndexReverse, byte[] data);
 
   /**
+   * Fill the vector with decoded data.
+   */
+  void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+      ColumnVectorInfo vectorInfo);
+
+  /**
    * Below method will be used to get the row
    * based on row id passed
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
index 0d06f61..e70424f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
@@ -21,6 +21,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Dimension chunk store for local dictionary encoded data.
@@ -49,6 +51,29 @@ public class LocalDictDimensionDataChunkStore implements DimensionDataChunkStore
     this.dimensionDataChunkStore.putArray(invertedIndex, invertedIndexReverse, data);
   }
 
+  @Override
+  public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+      ColumnVectorInfo vectorInfo) {
+    int columnValueSize = dimensionDataChunkStore.getColumnValueSize();
+    int rowsNum = data.length / columnValueSize;
+    CarbonColumnVector vector = vectorInfo.vector;
+    if (!dictionary.isDictionaryUsed()) {
+      vector.setDictionary(dictionary);
+      dictionary.setDictionaryUsed();
+    }
+    for (int i = 0; i < rowsNum; i++) {
+      int surrogate = CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
+      if (surrogate == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
+        vector.putNull(i);
+        vector.getDictionaryVector().putNull(i);
+      } else {
+        vector.putNotNull(i);
+        vector.getDictionaryVector().putInt(i, surrogate);
+      }
+
+    }
+  }
+
   @Override public byte[] getRow(int rowId) {
     return dictionary.getDictionaryValue(dimensionDataChunkStore.getSurrogate(rowId));
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
new file mode 100644
index 0000000..ddfa470
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+@InterfaceAudience.Internal
+@InterfaceStability.Stable
+public abstract class AbstractNonDictionaryVectorFiller {
+
+  protected int lengthSize;
+  protected int numberOfRows;
+
+  public AbstractNonDictionaryVectorFiller(int lengthSize, int numberOfRows) {
+    this.lengthSize = lengthSize;
+    this.numberOfRows = numberOfRows;
+  }
+
+  public abstract void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer);
+
+  public int getLengthFromBuffer(ByteBuffer buffer) {
+    return buffer.getShort();
+  }
+}
+
+class NonDictionaryVectorFillerFactory {
+
+  public static AbstractNonDictionaryVectorFiller getVectorFiller(DataType type, int lengthSize,
+      int numberOfRows) {
+    if (type == DataTypes.STRING) {
+      return new StringVectorFiller(lengthSize, numberOfRows);
+    } else if (type == DataTypes.VARCHAR) {
+      return new LongStringVectorFiller(lengthSize, numberOfRows);
+    } else if (type == DataTypes.TIMESTAMP) {
+      return new TimeStampVectorFiller(lengthSize, numberOfRows);
+    } else if (type == DataTypes.BOOLEAN) {
+      return new BooleanVectorFiller(lengthSize, numberOfRows);
+    } else if (type == DataTypes.SHORT) {
+      return new ShortVectorFiller(lengthSize, numberOfRows);
+    } else if (type == DataTypes.INT) {
+      return new IntVectorFiller(lengthSize, numberOfRows);
+    } else if (type == DataTypes.LONG) {
+      return new LongVectorFiller(lengthSize, numberOfRows);
+    } else {
+      throw new UnsupportedOperationException("Not supported datatype : " + type);
+    }
+
+  }
+
+}
+
+class StringVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+  public StringVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    // as first position will be start from length of bytes as data is stored first in the memory
+    // block we need to skip first two bytes this is because first two bytes will be length of the
+    // data which we have to skip
+    int currentOffset = lengthSize;
+    ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
+    for (int i = 0; i < numberOfRows - 1; i++) {
+      buffer.position(startOffset);
+      startOffset += getLengthFromBuffer(buffer) + lengthSize;
+      int length = startOffset - (currentOffset);
+      if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+          CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentOffset, length)) {
+        vector.putNull(i);
+      } else {
+        vector.putByteArray(i, currentOffset, length, data);
+      }
+      currentOffset = startOffset + lengthSize;
+    }
+    // Handle last row
+    int length = (data.length - currentOffset);
+    if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+        CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentOffset, length)) {
+      vector.putNull(numberOfRows - 1);
+    } else {
+      vector.putByteArray(numberOfRows - 1, currentOffset, length, data);
+    }
+  }
+}
+
+class LongStringVectorFiller extends StringVectorFiller {
+  public LongStringVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public int getLengthFromBuffer(ByteBuffer buffer) {
+    return buffer.getInt();
+  }
+}
+
+class BooleanVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+  public BooleanVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    int currentOffset = lengthSize;
+    for (int i = 0; i < numberOfRows - 1; i++) {
+      buffer.position(startOffset);
+      startOffset += getLengthFromBuffer(buffer) + lengthSize;
+      int length = startOffset - (currentOffset);
+      if (length == 0) {
+        vector.putNull(i);
+      } else {
+        vector.putBoolean(i, ByteUtil.toBoolean(data[currentOffset]));
+      }
+      currentOffset = startOffset + lengthSize;
+    }
+    int length = (data.length - currentOffset);
+    if (length == 0) {
+      vector.putNull(numberOfRows - 1);
+    } else {
+      vector.putBoolean(numberOfRows - 1, ByteUtil.toBoolean(data[currentOffset]));
+    }
+  }
+}
+
+class ShortVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+  public ShortVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    int currentOffset = lengthSize;
+    for (int i = 0; i < numberOfRows - 1; i++) {
+      buffer.position(startOffset);
+      startOffset += getLengthFromBuffer(buffer) + lengthSize;
+      int length = startOffset - (currentOffset);
+      if (length == 0) {
+        vector.putNull(i);
+      } else {
+        vector.putShort(i, ByteUtil.toXorShort(data, currentOffset, length));
+      }
+      currentOffset = startOffset + lengthSize;
+    }
+    int length = (data.length - currentOffset);
+    if (length == 0) {
+      vector.putNull(numberOfRows - 1);
+    } else {
+      vector.putShort(numberOfRows - 1, ByteUtil.toXorShort(data, currentOffset, length));
+    }
+  }
+}
+
+class IntVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+  public IntVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    int currentOffset = lengthSize;
+    for (int i = 0; i < numberOfRows - 1; i++) {
+      buffer.position(startOffset);
+      startOffset += getLengthFromBuffer(buffer) + lengthSize;
+      int length = startOffset - (currentOffset);
+      if (length == 0) {
+        vector.putNull(i);
+      } else {
+        vector.putInt(i, ByteUtil.toXorInt(data, currentOffset, length));
+      }
+      currentOffset = startOffset + lengthSize;
+    }
+    int length = (data.length - currentOffset);
+    if (length == 0) {
+      vector.putNull(numberOfRows - 1);
+    } else {
+      vector.putInt(numberOfRows - 1, ByteUtil.toXorInt(data, currentOffset, length));
+    }
+  }
+}
+
+class LongVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+  public LongVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    int currentOffset = lengthSize;
+    for (int i = 0; i < numberOfRows - 1; i++) {
+      buffer.position(startOffset);
+      startOffset += getLengthFromBuffer(buffer) + lengthSize;
+      int length = startOffset - (currentOffset);
+      if (length == 0) {
+        vector.putNull(i);
+      } else {
+        vector.putLong(i, DataTypeUtil
+            .getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), currentOffset,
+                length));
+      }
+      currentOffset = startOffset + lengthSize;
+    }
+    int length = (data.length - currentOffset);
+    if (length == 0) {
+      vector.putNull(numberOfRows - 1);
+    } else {
+      vector.putLong(numberOfRows - 1, DataTypeUtil
+          .getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), currentOffset,
+              length));
+    }
+  }
+}
+
+class TimeStampVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+  public TimeStampVectorFiller(int lengthSize, int numberOfRows) {
+    super(lengthSize, numberOfRows);
+  }
+
+  @Override
+  public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+    // start position will be used to store the current data position
+    int startOffset = 0;
+    int currentOffset = lengthSize;
+    for (int i = 0; i < numberOfRows - 1; i++) {
+      buffer.position(startOffset);
+      startOffset += getLengthFromBuffer(buffer) + lengthSize;
+      int length = startOffset - (currentOffset);
+      if (length == 0) {
+        vector.putNull(i);
+      } else {
+        vector.putLong(i, ByteUtil.toXorLong(data, currentOffset, length) * 1000L);
+      }
+      currentOffset = startOffset + lengthSize;
+    }
+    int length = (data.length - currentOffset);
+    if (length == 0) {
+      vector.putNull(numberOfRows - 1);
+    } else {
+      vector.putLong(numberOfRows - 1, ByteUtil.toXorLong(data, currentOffset, length) * 1000L);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
index 41218d0..d30650d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
@@ -17,6 +17,12 @@
 
 package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 
@@ -30,9 +36,52 @@ public class SafeFixedLengthDimensionDataChunkStore extends SafeAbsractDimension
    */
   private int columnValueSize;
 
-  public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int columnValueSize) {
+  private int numOfRows;
+
+  public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int columnValueSize,
+      int numOfRows) {
     super(isInvertedIndex);
     this.columnValueSize = columnValueSize;
+    this.numOfRows = numOfRows;
+  }
+
+  @Override
+  public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+      ColumnVectorInfo vectorInfo) {
+    CarbonColumnVector vector = vectorInfo.vector;
+    fillVector(data, vectorInfo, vector);
+  }
+
+  private void fillVector(byte[] data, ColumnVectorInfo vectorInfo, CarbonColumnVector vector) {
+    DataType dataType = vectorInfo.vector.getBlockDataType();
+    if (dataType == DataTypes.DATE) {
+      for (int i = 0; i < numOfRows; i++) {
+        int surrogateInternal =
+            CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
+        if (surrogateInternal == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
+          vector.putNull(i);
+        } else {
+          vector.putInt(i, surrogateInternal - DateDirectDictionaryGenerator.cutOffDate);
+        }
+      }
+    } else if (dataType == DataTypes.TIMESTAMP) {
+      for (int i = 0; i < numOfRows; i++) {
+        int surrogateInternal =
+            CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
+        if (surrogateInternal == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
+          vector.putNull(i);
+        } else {
+          Object valueFromSurrogate =
+              vectorInfo.directDictionaryGenerator.getValueFromSurrogate(surrogateInternal);
+          vector.putLong(i, (long)valueFromSurrogate);
+        }
+      }
+    } else {
+      for (int i = 0; i < numOfRows; i++) {
+        vector.putInt(i,
+            CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize));
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index 8553506..0fb4854 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
@@ -91,6 +92,20 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
     }
   }
 
+  @Override
+  public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+      ColumnVectorInfo vectorInfo) {
+    this.invertedIndexReverse = invertedIndex;
+    int lengthSize = getLengthSize();
+    CarbonColumnVector vector = vectorInfo.vector;
+    DataType dt = vector.getType();
+    // creating a byte buffer which will wrap the length of the row
+    ByteBuffer buffer = ByteBuffer.wrap(data);
+    AbstractNonDictionaryVectorFiller vectorFiller =
+        NonDictionaryVectorFillerFactory.getVectorFiller(dt, lengthSize, numberOfRows);
+    vectorFiller.fillVector(data, vector, buffer);
+  }
+
   protected abstract int getLengthSize();
   protected abstract int getLengthFromBuffer(ByteBuffer buffer);
 
@@ -150,7 +165,7 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
       vector.putNull(vectorRow);
     } else {
       if (dt == DataTypes.STRING) {
-        vector.putBytes(vectorRow, currentDataOffset, length, data);
+        vector.putByteArray(vectorRow, currentDataOffset, length, data);
       } else if (dt == DataTypes.BOOLEAN) {
         vector.putBoolean(vectorRow, ByteUtil.toBoolean(data[currentDataOffset]));
       } else if (dt == DataTypes.SHORT) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
index 89bce2d..57e9de5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
 /**
@@ -115,6 +116,11 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension
     }
   }
 
+  @Override public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+      ColumnVectorInfo vectorInfo) {
+    throw new UnsupportedOperationException("This method not supposed to be called here");
+  }
+
   /**
    * Below method will be used to free the memory occupied by the column chunk
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
index 6f3f139..44b3c12 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
@@ -50,8 +50,9 @@ public abstract class BlockIndexerStorage<T> {
    *
    * @param rowIds
    */
-  protected Map<String, short[]> rleEncodeOnRowId(short[] rowIds, short[] rowIdPage,
-      short[] rowIdRlePage) {
+  protected Map<String, short[]> rleEncodeOnRowId(short[] rowIds) {
+    short[] rowIdPage;
+    short[] rowIdRlePage;
     List<Short> list = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
     int k = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
index b3e25d3..bcf5432 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
@@ -39,8 +39,7 @@ public class BlockIndexerStorageForNoDictionary extends BlockIndexerStorage<Obje
       Arrays.sort(dataWithRowId);
     }
     short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
-    Map<String, short[]> rowIdAndRleRowIdPages =
-        rleEncodeOnRowId(rowIds, getRowIdPage(), getRowIdRlePage());
+    Map<String, short[]> rowIdAndRleRowIdPages = rleEncodeOnRowId(rowIds);
     rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
     rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
index f1b9af2..b30396c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
@@ -43,8 +43,7 @@ public class BlockIndexerStorageForShort extends BlockIndexerStorage<byte[][]> {
       Arrays.sort(dataWithRowId);
     }
     short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
-    Map<String, short[]> rowIdAndRleRowIdPages =
-        rleEncodeOnRowId(rowIds, getRowIdPage(), getRowIdRlePage());
+    Map<String, short[]> rowIdAndRleRowIdPages = rleEncodeOnRowId(rowIds);
     rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
     rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
     if (rleOnData) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
index a7f38cd..48484ce 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
@@ -28,6 +28,9 @@ public final class UnBlockIndexer {
   public static int[] uncompressIndex(int[] indexData, int[] indexMap) {
     int actualSize = indexData.length;
     int mapLength = indexMap.length;
+    if (indexMap.length == 0) {
+      return indexData;
+    }
     for (int i = 0; i < mapLength; i++) {
       actualSize += indexData[indexMap[i] + 1] - indexData[indexMap[i]] - 1;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
index 6fef278..9f0abd9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
@@ -76,6 +76,7 @@ public class FileReaderImpl implements FileReader {
         channel.close();
       }
     }
+    fileNameAndStreamCache.clear();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index e8097da..e5312f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -37,14 +37,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
 
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE_ARRAY;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.FLOAT;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.INT;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.LONG;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT_INT;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.*;
 
 public abstract class ColumnPage {
 
@@ -90,7 +83,7 @@ public abstract class ColumnPage {
 
   private static ColumnPage createDecimalPage(ColumnPageEncoderMeta columnPageEncoderMeta,
       int pageSize) {
-    if (unsafe) {
+    if (isUnsafeEnabled(columnPageEncoderMeta)) {
       try {
         return new UnsafeDecimalColumnPage(columnPageEncoderMeta, pageSize);
       } catch (MemoryException e) {
@@ -103,7 +96,7 @@ public abstract class ColumnPage {
 
   private static ColumnPage createVarLengthPage(ColumnPageEncoderMeta columnPageEncoderMeta,
       int pageSize) {
-    if (unsafe) {
+    if (isUnsafeEnabled(columnPageEncoderMeta)) {
       try {
         return new UnsafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
       } catch (MemoryException e) {
@@ -116,7 +109,7 @@ public abstract class ColumnPage {
 
   private static ColumnPage createFixLengthPage(
       ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
-    if (unsafe) {
+    if (isUnsafeEnabled(columnPageEncoderMeta)) {
       try {
         return new UnsafeFixLengthColumnPage(columnPageEncoderMeta, pageSize);
       } catch (MemoryException e) {
@@ -129,7 +122,7 @@ public abstract class ColumnPage {
 
   private static ColumnPage createFixLengthByteArrayPage(
       ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize, int eachValueSize) {
-    if (unsafe) {
+    if (isUnsafeEnabled(columnPageEncoderMeta)) {
       try {
         return new UnsafeFixLengthColumnPage(columnPageEncoderMeta, pageSize, eachValueSize);
       } catch (MemoryException e) {
@@ -163,7 +156,7 @@ public abstract class ColumnPage {
             CarbonCommonConstants.LOCAL_DICTIONARY_DECODER_BASED_FALLBACK_DEFAULT));
     ColumnPage actualPage;
     ColumnPage encodedPage;
-    if (unsafe) {
+    if (isUnsafeEnabled(columnPageEncoderMeta)) {
       actualPage = new UnsafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
       encodedPage = new UnsafeFixLengthColumnPage(
           new ColumnPageEncoderMeta(columnPageEncoderMeta.getColumnSpec(), DataTypes.BYTE_ARRAY,
@@ -190,7 +183,7 @@ public abstract class ColumnPage {
     DataType dataType = columnPageEncoderMeta.getStoreDataType();
     TableSpec.ColumnSpec columnSpec = columnPageEncoderMeta.getColumnSpec();
     String compressorName = columnPageEncoderMeta.getCompressorName();
-    if (unsafe) {
+    if (isUnsafeEnabled(columnPageEncoderMeta)) {
       if (dataType == DataTypes.BOOLEAN) {
         instance = new UnsafeFixLengthColumnPage(
             new ColumnPageEncoderMeta(columnSpec, BYTE, compressorName), pageSize);
@@ -219,21 +212,23 @@ public abstract class ColumnPage {
       }
     } else {
       if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
-        instance = newBytePage(columnSpec, new byte[pageSize], compressorName);
+        instance = newBytePage(columnPageEncoderMeta, new byte[pageSize]);
       } else if (dataType == DataTypes.SHORT) {
-        instance = newShortPage(columnSpec, new short[pageSize], compressorName);
+        instance = newShortPage(columnPageEncoderMeta, new short[pageSize]);
       } else if (dataType == DataTypes.SHORT_INT) {
-        instance = newShortIntPage(columnSpec, new byte[pageSize * 3], compressorName);
+        instance = newShortIntPage(columnPageEncoderMeta, new byte[pageSize * 3]);
       } else if (dataType == DataTypes.INT) {
-        instance = newIntPage(columnSpec, new int[pageSize], compressorName);
+        instance = newIntPage(columnPageEncoderMeta, new int[pageSize]);
       } else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
-        instance = newLongPage(columnSpec, new long[pageSize], compressorName);
+        instance = newLongPage(
+            new ColumnPageEncoderMeta(columnPageEncoderMeta.getColumnSpec(), LONG,
+                columnPageEncoderMeta.getCompressorName()), new long[pageSize]);
       } else if (dataType == DataTypes.FLOAT) {
-        instance = newFloatPage(columnSpec, new float[pageSize], compressorName);
+        instance = newFloatPage(columnPageEncoderMeta, new float[pageSize]);
       } else if (dataType == DataTypes.DOUBLE) {
-        instance = newDoublePage(columnSpec, new double[pageSize], compressorName);
+        instance = newDoublePage(columnPageEncoderMeta, new double[pageSize]);
       } else if (DataTypes.isDecimal(dataType)) {
-        instance = newDecimalPage(columnSpec, new byte[pageSize][], compressorName);
+        instance = newDecimalPage(columnPageEncoderMeta, new byte[pageSize][]);
       } else if (dataType == DataTypes.STRING
           || dataType == DataTypes.BYTE_ARRAY
           || dataType == DataTypes.VARCHAR) {
@@ -253,75 +248,67 @@ public abstract class ColumnPage {
     return columnPage;
   }
 
-  private static ColumnPage newBytePage(TableSpec.ColumnSpec columnSpec, byte[] byteData,
-      String compressorName) {
+  private static ColumnPage newBytePage(ColumnPageEncoderMeta meta, byte[] byteData) {
+    ColumnPageEncoderMeta encoderMeta =
+        new ColumnPageEncoderMeta(meta.getColumnSpec(), BYTE, meta.getCompressorName());
+    encoderMeta.setFillCompleteVector(meta.isFillCompleteVector());
     ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, BYTE, compressorName), byteData.length);
+        encoderMeta, byteData.length);
     columnPage.setBytePage(byteData);
     return columnPage;
   }
 
-  private static ColumnPage newShortPage(TableSpec.ColumnSpec columnSpec, short[] shortData,
-      String compressorName) {
-    ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, SHORT, compressorName), shortData.length);
+  private static ColumnPage newShortPage(ColumnPageEncoderMeta meta, short[] shortData) {
+    ColumnPage columnPage = createPage(meta, shortData.length);
     columnPage.setShortPage(shortData);
     return columnPage;
   }
 
-  private static ColumnPage newShortIntPage(TableSpec.ColumnSpec columnSpec, byte[] shortIntData,
-      String compressorName) {
-    ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, SHORT_INT, compressorName), shortIntData.length / 3);
+  private static ColumnPage newShortIntPage(ColumnPageEncoderMeta meta, byte[] shortIntData) {
+    ColumnPage columnPage = createPage(meta, shortIntData.length / 3);
     columnPage.setShortIntPage(shortIntData);
     return columnPage;
   }
 
-  private static ColumnPage newIntPage(TableSpec.ColumnSpec columnSpec, int[] intData,
-      String compressorName) {
-    ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, INT, compressorName), intData.length);
+  private static ColumnPage newIntPage(ColumnPageEncoderMeta meta, int[] intData) {
+    ColumnPage columnPage = createPage(meta, intData.length);
     columnPage.setIntPage(intData);
     return columnPage;
   }
 
-  private static ColumnPage newLongPage(TableSpec.ColumnSpec columnSpec, long[] longData,
-      String compressorName) {
-    ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, LONG, compressorName), longData.length);
+  private static ColumnPage newLongPage(ColumnPageEncoderMeta meta, long[] longData) {
+    ColumnPage columnPage = createPage(meta, longData.length);
     columnPage.setLongPage(longData);
     return columnPage;
   }
 
-  private static ColumnPage newFloatPage(TableSpec.ColumnSpec columnSpec, float[] floatData,
-      String compressorName) {
-    ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, FLOAT, compressorName), floatData.length);
+  private static ColumnPage newFloatPage(ColumnPageEncoderMeta meta, float[] floatData) {
+    ColumnPage columnPage = createPage(meta, floatData.length);
     columnPage.setFloatPage(floatData);
     return columnPage;
   }
 
-  private static ColumnPage newDoublePage(TableSpec.ColumnSpec columnSpec, double[] doubleData,
-      String compressorName) {
-    ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, DOUBLE, compressorName), doubleData.length);
+  private static ColumnPage newDoublePage(ColumnPageEncoderMeta meta, double[] doubleData) {
+    ColumnPage columnPage = createPage(meta, doubleData.length);
     columnPage.setDoublePage(doubleData);
     return columnPage;
   }
 
-  private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray,
-      String compressorName) {
+  private static ColumnPage newDecimalPage(ColumnPageEncoderMeta meta, byte[][] byteArray) {
+    ColumnPageEncoderMeta encoderMeta =
+        new ColumnPageEncoderMeta(meta.getColumnSpec(), meta.getColumnSpec().getSchemaDataType(),
+            meta.getCompressorName());
+    encoderMeta.setFillCompleteVector(meta.isFillCompleteVector());
     ColumnPage columnPage = createPage(
-        new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
+        encoderMeta,
         byteArray.length);
     columnPage.setByteArrayPage(byteArray);
     return columnPage;
   }
 
-  private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedByteArray, String compressorName) throws MemoryException {
-    return VarLengthColumnPageBase.newDecimalColumnPage(
-        columnSpec, lvEncodedByteArray, compressorName);
+  private static ColumnPage newDecimalPage(ColumnPageEncoderMeta meta,
+      byte[] lvEncodedByteArray) throws MemoryException {
+    return VarLengthColumnPageBase.newDecimalColumnPage(meta, lvEncodedByteArray);
   }
 
   private static ColumnPage newLVBytesPage(TableSpec.ColumnSpec columnSpec,
@@ -813,25 +800,25 @@ public abstract class ColumnPage {
     DataType storeDataType = meta.getStoreDataType();
     if (storeDataType == DataTypes.BOOLEAN || storeDataType == DataTypes.BYTE) {
       byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
-      return newBytePage(columnSpec, byteData, meta.getCompressorName());
+      return newBytePage(meta, byteData);
     } else if (storeDataType == DataTypes.SHORT) {
       short[] shortData = compressor.unCompressShort(compressedData, offset, length);
-      return newShortPage(columnSpec, shortData, meta.getCompressorName());
+      return newShortPage(meta, shortData);
     } else if (storeDataType == DataTypes.SHORT_INT) {
       byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
-      return newShortIntPage(columnSpec, shortIntData, meta.getCompressorName());
+      return newShortIntPage(meta, shortIntData);
     } else if (storeDataType == DataTypes.INT) {
       int[] intData = compressor.unCompressInt(compressedData, offset, length);
-      return newIntPage(columnSpec, intData, meta.getCompressorName());
+      return newIntPage(meta, intData);
     } else if (storeDataType == DataTypes.LONG) {
       long[] longData = compressor.unCompressLong(compressedData, offset, length);
-      return newLongPage(columnSpec, longData, meta.getCompressorName());
+      return newLongPage(meta, longData);
     } else if (storeDataType == DataTypes.FLOAT) {
       float[] floatData = compressor.unCompressFloat(compressedData, offset, length);
-      return newFloatPage(columnSpec, floatData, meta.getCompressorName());
+      return newFloatPage(meta, floatData);
     } else if (storeDataType == DataTypes.DOUBLE) {
       double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
-      return newDoublePage(columnSpec, doubleData, meta.getCompressorName());
+      return newDoublePage(meta, doubleData);
     } else if (!isLVEncoded && storeDataType == DataTypes.BYTE_ARRAY && (
         columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE
             || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
@@ -873,8 +860,7 @@ public abstract class ColumnPage {
   public static ColumnPage decompressDecimalPage(ColumnPageEncoderMeta meta, byte[] compressedData,
       int offset, int length) throws MemoryException {
     Compressor compressor = CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
-    TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
-    ColumnPage decimalPage = null;
+    ColumnPage decimalPage;
     DataType storeDataType = meta.getStoreDataType();
     if (storeDataType == DataTypes.BYTE) {
       byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
@@ -888,7 +874,7 @@ public abstract class ColumnPage {
       return decimalPage;
     } else if (storeDataType == DataTypes.SHORT_INT) {
       byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
-      decimalPage = createDecimalPage(meta, shortIntData.length);
+      decimalPage = createDecimalPage(meta, shortIntData.length / 3);
       decimalPage.setShortIntPage(shortIntData);
       return decimalPage;
     }  else if (storeDataType == DataTypes.INT) {
@@ -903,10 +889,20 @@ public abstract class ColumnPage {
       return decimalPage;
     } else {
       byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
-      return newDecimalPage(columnSpec, lvEncodedBytes, meta.getCompressorName());
+      return newDecimalPage(meta, lvEncodedBytes);
     }
   }
 
+  /**
+   * Whether unsafe enabled or not. In case of filling complete vector flow there is no need to use
+   * unsafe flow as we don't store the data in memory for long time.
+   * @param meta ColumnPageEncoderMeta
+   * @return boolean Whether unsafe enabled or not
+   */
+  protected static boolean isUnsafeEnabled(ColumnPageEncoderMeta meta) {
+    return unsafe && !meta.isFillCompleteVector();
+  }
+
   public BitSet getNullBits() {
     return nullBitSet;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
index 53ad956..82ccd22 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datastore.page;
 
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+
 // Transformation type that can be applied to ColumnPage
 public interface ColumnPageValueConverter {
   void encode(int rowId, byte value);
@@ -35,4 +37,5 @@ public interface ColumnPageValueConverter {
   double decodeDouble(long value);
   double decodeDouble(float value);
   double decodeDouble(double value);
+  void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
index d3e945d..1867354 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
@@ -194,6 +194,31 @@ public class SafeDecimalColumnPage extends DecimalColumnPage {
   }
 
   @Override
+  public byte[] getBytePage() {
+    return byteData;
+  }
+
+  @Override public short[] getShortPage() {
+    return shortData;
+  }
+
+  @Override public byte[] getShortIntPage() {
+    return shortIntData;
+  }
+
+  @Override public int[] getIntPage() {
+    return intData;
+  }
+
+  @Override public long[] getLongPage() {
+    return longData;
+  }
+
+  @Override public byte[][] getByteArrayPage() {
+    return byteArrayData;
+  }
+
+  @Override
   public void freeMemory() {
     byteArrayData = null;
     super.freeMemory();