You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/12/20 03:10:58 UTC

[3/4] incubator-carbondata git commit: add initial check in for vector reader

add initial check in for vector reader

Added vector reader in carbon

Fixed check style

Added batch reader support in spark layer

rebased

Fixed issues

Added testcases

fixed testcase

fixed comments

fixed style

fixed testcase

Fixed review comments

Fixed testcase and comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/376d69ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/376d69ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/376d69ff

Branch: refs/heads/master
Commit: 376d69ff72c88d541022d3cea98ef1d6a7542ee6
Parents: 5e50d04
Author: ravipesala <ra...@gmail.com>
Authored: Tue Dec 6 23:24:05 2016 +0530
Committer: jackylk <ja...@huawei.com>
Committed: Tue Dec 20 11:01:27 2016 +0800

----------------------------------------------------------------------
 .../chunk/DimensionColumnDataChunk.java         |   13 +
 .../impl/ColumnGroupDimensionDataChunk.java     |   68 +-
 .../impl/FixedLengthDimensionDataChunk.java     |   93 +-
 .../impl/VariableLengthDimensionDataChunk.java  |   48 +
 .../core/constants/CarbonCommonConstants.java   |    9 +
 .../DirectDictionaryGenerator.java              |    8 +
 .../DateDirectDictionaryGenerator.java          |    4 +
 .../TimeStampDirectDictionaryGenerator.java     |    5 +
 .../scan/collector/ScannedResultCollector.java  |    7 +
 .../impl/AbstractScannedResultCollector.java    |    5 +
 .../DictionaryBasedVectorResultCollector.java   |  142 +++
 .../scan/executor/QueryExecutorFactory.java     |   10 +-
 .../executor/impl/AbstractQueryExecutor.java    |    1 +
 .../impl/VectorDetailQueryExecutor.java         |   41 +
 .../scan/executor/infos/BlockExecutionInfo.java |   13 +
 .../carbondata/scan/model/QueryModel.java       |   10 +
 .../processor/AbstractDataBlockIterator.java    |    7 +-
 .../processor/impl/DataBlockIteratorImpl.java   |   10 +
 .../scan/result/AbstractScannedResult.java      |   85 +-
 .../result/impl/FilterQueryScannedResult.java   |   35 +
 .../AbstractDetailQueryResultIterator.java      |    5 +
 .../result/iterator/VectorChunkRowIterator.java |   93 ++
 .../VectorDetailQueryResultIterator.java        |   52 +
 .../scan/result/vector/CarbonColumnVector.java  |   47 +
 .../scan/result/vector/CarbonColumnarBatch.java |   62 ++
 .../scan/result/vector/ColumnVectorInfo.java    |   41 +
 .../vector/MeasureDataVectorProcessor.java      |  268 +++++
 .../vector/impl/CarbonColumnVectorImpl.java     |  154 +++
 .../carbondata/hadoop/CarbonInputFormat.java    |   14 +-
 .../carbondata/hadoop/CarbonRecordReader.java   |    2 +-
 .../readsupport/impl/RawDataReadSupport.java    |    9 +-
 .../spark/merger/CarbonCompactionExecutor.java  |    2 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   71 +-
 .../scala/org/apache/spark/sql/CarbonScan.scala |    6 +-
 .../ColumnGroupDataTypesTestCase.scala          |    4 +-
 .../readsupport/SparkRowReadSupportImpl.java    |   22 +-
 .../vectorreader/ColumnarVectorWrapper.java     |   80 ++
 .../VectorizedCarbonRecordReader.java           |  256 +++++
 .../sql/CarbonDatasourceHadoopRelation.scala    |    5 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   20 +-
 .../execution/CarbonLateDecodeStrategy.scala    |   89 +-
 .../spark2/src/test/resources/dataDiff.csv      | 1001 ++++++++++++++++++
 .../carbondata/CarbonDataSourceSuite.scala      |   27 +-
 .../spark/carbondata/util/QueryTest.scala       |   66 ++
 .../vectorreader/VectorReaderTestCase.scala     |   79 ++
 45 files changed, 2972 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionColumnDataChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionColumnDataChunk.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionColumnDataChunk.java
index ddc76c0..efa67e6 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionColumnDataChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/DimensionColumnDataChunk.java
@@ -19,6 +19,7 @@
 package org.apache.carbondata.core.carbon.datastore.chunk;
 
 import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Interface for dimension column chunk.
@@ -46,6 +47,18 @@ public interface DimensionColumnDataChunk<T> {
       KeyStructureInfo restructuringInfo);
 
   /**
+   * Fill the data to vector
+   */
+  int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, int column,
+      KeyStructureInfo restructuringInfo);
+
+  /**
+   * Fill the data to vector
+   */
+  int fillConvertedChunkData(int[] rowMapping, ColumnVectorInfo[] vectorInfo, int column,
+      KeyStructureInfo restructuringInfo);
+
+  /**
    * Below method to get  the data based in row id
    *
    * @param row id

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java
index 77fb163..ab467f2 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/ColumnGroupDimensionDataChunk.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.core.carbon.datastore.chunk.impl;
 import org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
 import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.scan.result.vector.ColumnVectorInfo;
 
 /**
  * This class is holder of the dimension column chunk data of the fixed length
@@ -58,7 +59,8 @@ public class ColumnGroupDimensionDataChunk implements DimensionColumnDataChunk<b
    * @param restructuringInfo define the structure of the key
    * @return how many bytes was copied
    */
-  @Override public int fillChunkData(byte[] data, int offset, int rowId,
+  @Override
+  public int fillChunkData(byte[] data, int offset, int rowId,
       KeyStructureInfo restructuringInfo) {
     byte[] maskedKey =
         getMaskedKey(dataChunk, rowId * chunkAttributes.getColumnValueSize(), restructuringInfo);
@@ -69,7 +71,8 @@ public class ColumnGroupDimensionDataChunk implements DimensionColumnDataChunk<b
   /**
    * Converts to column dictionary integer value
    */
-  @Override public int fillConvertedChunkData(int rowId, int columnIndex, int[] row,
+  @Override
+  public int fillConvertedChunkData(int rowId, int columnIndex, int[] row,
       KeyStructureInfo info) {
     int start = rowId * chunkAttributes.getColumnValueSize();
     long[] keyArray = info.getKeyGenerator().getKeyArray(dataChunk, start);
@@ -80,6 +83,58 @@ public class ColumnGroupDimensionDataChunk implements DimensionColumnDataChunk<b
     return columnIndex;
   }
 
+  @Override
+  public int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, int column,
+      KeyStructureInfo info) {
+    ColumnVectorInfo columnVectorInfo = vectorInfo[column];
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = offset + columnVectorInfo.size;
+    int columnValueSize = chunkAttributes.getColumnValueSize();
+    int[] ordinal = info.getMdkeyQueryDimensionOrdinal();
+    for (int k = offset; k < len; k++) {
+      int start = k * columnValueSize;
+      long[] keyArray = info.getKeyGenerator().getKeyArray(dataChunk, start);
+      int index = 0;
+      for (int i = column; i < column + ordinal.length; i++) {
+        if (vectorInfo[i].directDictionaryGenerator == null) {
+          vectorInfo[i].vector.putInt(vectorOffset, (int) keyArray[ordinal[index++]]);
+        } else {
+          vectorInfo[i].vector.putLong(vectorOffset, (long) vectorInfo[i].directDictionaryGenerator
+              .getValueFromSurrogate((int) keyArray[ordinal[index++]]));
+        }
+      }
+      vectorOffset++;
+    }
+    return column + ordinal.length;
+  }
+
+  @Override
+  public int fillConvertedChunkData(int[] rowMapping, ColumnVectorInfo[] vectorInfo, int column,
+      KeyStructureInfo info) {
+    ColumnVectorInfo columnVectorInfo = vectorInfo[column];
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = offset + columnVectorInfo.size;
+    int columnValueSize = chunkAttributes.getColumnValueSize();
+    int[] ordinal = info.getMdkeyQueryDimensionOrdinal();
+    for (int k = offset; k < len; k++) {
+      int start = rowMapping[k] * columnValueSize;
+      long[] keyArray = info.getKeyGenerator().getKeyArray(dataChunk, start);
+      int index = 0;
+      for (int i = column; i < column + ordinal.length; i++) {
+        if (vectorInfo[i].directDictionaryGenerator == null) {
+          vectorInfo[i].vector.putInt(vectorOffset, (int) keyArray[ordinal[index++]]);
+        } else {
+          vectorInfo[i].vector.putLong(vectorOffset, (long) vectorInfo[i].directDictionaryGenerator
+              .getValueFromSurrogate((int) keyArray[ordinal[index++]]));
+        }
+      }
+      vectorOffset++;
+    }
+    return column + ordinal.length;
+  }
+
   /**
    * Below method masks key
    *
@@ -101,7 +156,8 @@ public class ColumnGroupDimensionDataChunk implements DimensionColumnDataChunk<b
    * @param rowId row id of the data
    * @return chunk
    */
-  @Override public byte[] getChunkData(int rowId) {
+  @Override
+  public byte[] getChunkData(int rowId) {
     byte[] data = new byte[chunkAttributes.getColumnValueSize()];
     System.arraycopy(dataChunk, rowId * data.length, data, 0, data.length);
     return data;
@@ -112,7 +168,8 @@ public class ColumnGroupDimensionDataChunk implements DimensionColumnDataChunk<b
    *
    * @return chunk attributes
    */
-  @Override public DimensionChunkAttributes getAttributes() {
+  @Override
+  public DimensionChunkAttributes getAttributes() {
     return chunkAttributes;
   }
 
@@ -122,7 +179,8 @@ public class ColumnGroupDimensionDataChunk implements DimensionColumnDataChunk<b
    *
    * @return complete chunk
    */
-  @Override public byte[] getCompleteDataChunk() {
+  @Override
+  public byte[] getCompleteDataChunk() {
     return dataChunk;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/FixedLengthDimensionDataChunk.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
index 2867d76..8e06d16 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/FixedLengthDimensionDataChunk.java
@@ -21,6 +21,8 @@ package org.apache.carbondata.core.carbon.datastore.chunk.impl;
 import org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
 import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
 import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.scan.result.vector.ColumnVectorInfo;
 
 /**
  * This class is holder of the dimension column chunk data of the fixed length
@@ -77,13 +79,98 @@ public class FixedLengthDimensionDataChunk implements DimensionColumnDataChunk<b
       rowId = chunkAttributes.getInvertedIndexesReverse()[rowId];
     }
     int start = rowId * chunkAttributes.getColumnValueSize();
+    int dict = getInt(chunkAttributes.getColumnValueSize(), start);
+    row[columnIndex] = dict;
+    return columnIndex + 1;
+  }
+
+  @Override public int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, int column,
+      KeyStructureInfo restructuringInfo) {
+    ColumnVectorInfo columnVectorInfo = vectorInfo[column];
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = columnVectorInfo.size + offset;
+    int[] indexesReverse = chunkAttributes.getInvertedIndexesReverse();
+    int columnValueSize = chunkAttributes.getColumnValueSize();
+    CarbonColumnVector vector = columnVectorInfo.vector;
+    for (int j = offset; j < len; j++) {
+      int start =
+          indexesReverse == null ? j * columnValueSize : indexesReverse[j] * columnValueSize;
+      int dict = getInt(columnValueSize, start);
+      if (columnVectorInfo.directDictionaryGenerator == null) {
+        vector.putInt(vectorOffset++, dict);
+      } else {
+        Object valueFromSurrogate =
+            columnVectorInfo.directDictionaryGenerator.getValueFromSurrogate(dict);
+        if (valueFromSurrogate == null) {
+          vector.putNull(vectorOffset++);
+        } else {
+          switch (columnVectorInfo.directDictionaryGenerator.getReturnType()) {
+            case INT:
+              vector.putInt(vectorOffset++, (int) valueFromSurrogate);
+              break;
+            case LONG:
+              vector.putLong(vectorOffset++, (long) valueFromSurrogate);
+              break;
+          }
+        }
+      }
+    }
+    return column + 1;
+  }
+
+  @Override
+  public int fillConvertedChunkData(int[] rowMapping, ColumnVectorInfo[] vectorInfo, int column,
+      KeyStructureInfo restructuringInfo) {
+    ColumnVectorInfo columnVectorInfo = vectorInfo[column];
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = columnVectorInfo.size + offset;
+    int[] indexesReverse = chunkAttributes.getInvertedIndexesReverse();
+    int columnValueSize = chunkAttributes.getColumnValueSize();
+    CarbonColumnVector vector = columnVectorInfo.vector;
+    for (int j = offset; j < len; j++) {
+      // calculate the start index to take the dictionary data. Here dictionary data size is fixed
+      // so calculate using fixed size of it.
+      int start = indexesReverse == null ?
+          rowMapping[j] * columnValueSize :
+          indexesReverse[rowMapping[j]] * columnValueSize;
+      int dict = getInt(columnValueSize, start);
+      if (columnVectorInfo.directDictionaryGenerator == null) {
+        vector.putInt(vectorOffset++, dict);
+      } else {
+        Object valueFromSurrogate =
+            columnVectorInfo.directDictionaryGenerator.getValueFromSurrogate(dict);
+        if (valueFromSurrogate == null) {
+          vector.putNull(vectorOffset++);
+        } else {
+          switch (columnVectorInfo.directDictionaryGenerator.getReturnType()) {
+            case INT:
+              vector.putInt(vectorOffset++, (int) valueFromSurrogate);
+              break;
+            case LONG:
+              vector.putLong(vectorOffset++, (long) valueFromSurrogate);
+              break;
+          }
+        }
+      }
+    }
+    return column + 1;
+  }
+
+  /**
+   * Create the integer from fixed column size and start index
+   * @param columnValueSize
+   * @param start
+   * @return
+   */
+  private int getInt(int columnValueSize, int start) {
     int dict = 0;
-    for (int i = start; i < start + chunkAttributes.getColumnValueSize(); i++) {
+    for (int i = start; i < start + columnValueSize; i++) {
       dict <<= 8;
       dict ^= dataChunk[i] & 0xFF;
     }
-    row[columnIndex] = dict;
-    return columnIndex + 1;
+    return dict;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/VariableLengthDimensionDataChunk.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
index 6d2d400..590e481 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/impl/VariableLengthDimensionDataChunk.java
@@ -18,11 +18,15 @@
  */
 package org.apache.carbondata.core.carbon.datastore.chunk.impl;
 
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
 import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
+import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.scan.result.vector.ColumnVectorInfo;
 
 /**
  * This class is holder of the dimension column chunk data of the variable
@@ -93,6 +97,50 @@ public class VariableLengthDimensionDataChunk implements DimensionColumnDataChun
     return dataChunk.get(index);
   }
 
+  @Override public int fillConvertedChunkData(ColumnVectorInfo[] vectorInfo, int column,
+      KeyStructureInfo restructuringInfo) {
+    int[] indexesReverse = chunkAttributes.getInvertedIndexesReverse();
+    ColumnVectorInfo columnVectorInfo = vectorInfo[column];
+    CarbonColumnVector vector = columnVectorInfo.vector;
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = offset + columnVectorInfo.size;
+    for (int i = offset; i < len; i++) {
+      byte[] value = dataChunk.get(indexesReverse == null ? i : indexesReverse[i]);
+      // Considering only String case now as we support only
+      // string in no dictionary case at present.
+      if (value == null || Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) {
+        vector.putNull(vectorOffset++);
+      } else {
+        vector.putBytes(vectorOffset++, value);
+      }
+    }
+    return column + 1;
+  }
+
+  @Override
+  public int fillConvertedChunkData(int[] rowMapping, ColumnVectorInfo[] vectorInfo, int column,
+      KeyStructureInfo restructuringInfo) {
+    int[] indexesReverse = chunkAttributes.getInvertedIndexesReverse();
+    ColumnVectorInfo columnVectorInfo = vectorInfo[column];
+    CarbonColumnVector vector = columnVectorInfo.vector;
+    int offset = columnVectorInfo.offset;
+    int vectorOffset = columnVectorInfo.vectorOffset;
+    int len = offset + columnVectorInfo.size;
+    for (int i = offset; i < len; i++) {
+      byte[] value =
+          dataChunk.get(indexesReverse == null ? rowMapping[i] : indexesReverse[rowMapping[i]]);
+      // Considering only String case now as we support only
+      // string in no dictionary case at present.
+      if (value == null || Arrays.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, value)) {
+        vector.putNull(vectorOffset++);
+      } else {
+        vector.putBytes(vectorOffset++, value);
+      }
+    }
+    return column + 1;
+  }
+
   /**
    * Below method will be used get the chunk attributes
    *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 06937a9..c32956c 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -165,6 +165,11 @@ public final class CarbonCommonConstants {
    * MEMBER_DEFAULT_VAL
    */
   public static final String MEMBER_DEFAULT_VAL = "@NU#LL$!";
+
+  /**
+   * MEMBER_DEFAULT_VAL_ARRAY
+   */
+  public static final byte[] MEMBER_DEFAULT_VAL_ARRAY = MEMBER_DEFAULT_VAL.getBytes();
   /**
    * FILE STATUS IN-PROGRESS
    */
@@ -968,6 +973,10 @@ public final class CarbonCommonConstants {
 
   public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT = "1024";
 
+  public static final String ENABLE_VECTOR_READER = "carbon.enable.vector.reader";
+
+  public static final String ENABLE_VECTOR_READER_DEFAULT = "false";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
index 9b8a230..b773fde 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/DirectDictionaryGenerator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.carbondata.core.keygenerator.directdictionary;
 
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+
 /**
  * The interface provides the method to generate dictionary key
  * and getting the actual value from the dictionaryKey for direct dictionary column.
@@ -55,4 +57,10 @@ public interface DirectDictionaryGenerator {
 
   void initialize();
 
+  /**
+   * Return value datatype for this generator
+   * @return
+   */
+  DataType getReturnType();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
index f3a1225..f6e8e90 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/DateDirectDictionaryGenerator.java
@@ -24,6 +24,7 @@ import java.util.Date;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -159,4 +160,7 @@ public class DateDirectDictionaryGenerator implements DirectDictionaryGenerator
     }
   }
 
+  @Override public DataType getReturnType() {
+    return DataType.INT;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
index 54d2965..ec1a710 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
@@ -24,6 +24,7 @@ import java.util.Date;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -226,4 +227,8 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
     }
   }
 
+  @Override public DataType getReturnType() {
+    return DataType.LONG;
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/collector/ScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/ScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/ScannedResultCollector.java
index dce6ae5..e5324a8 100644
--- a/core/src/main/java/org/apache/carbondata/scan/collector/ScannedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/ScannedResultCollector.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.scan.collector;
 import java.util.List;
 
 import org.apache.carbondata.scan.result.AbstractScannedResult;
+import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;
 
 /**
  * Interface which will be used to aggregate the scan result
@@ -35,4 +36,10 @@ public interface ScannedResultCollector {
    */
   List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize);
 
+  /**
+   * Collects data in columnar format.
+   * @param scannedResult
+   * @param columnarBatch
+   */
+  void collectVectorBatch(AbstractScannedResult scannedResult, CarbonColumnarBatch columnarBatch);
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
index ad52b5e..7304b3c 100644
--- a/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/impl/AbstractScannedResultCollector.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.scan.executor.util.QueryUtil;
 import org.apache.carbondata.scan.result.AbstractScannedResult;
+import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;
 import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
 
 /**
@@ -151,4 +152,8 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
     }
   }
 
+  @Override public void collectVectorBatch(AbstractScannedResult scannedResult,
+      CarbonColumnarBatch columnarBatch) {
+    throw new UnsupportedOperationException("Works only for batch collectors");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedVectorResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedVectorResultCollector.java
new file mode 100644
index 0000000..500aa2c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/collector/impl/DictionaryBasedVectorResultCollector.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.collector.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.scan.model.QueryDimension;
+import org.apache.carbondata.scan.model.QueryMeasure;
+import org.apache.carbondata.scan.result.AbstractScannedResult;
+import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.scan.result.vector.MeasureDataVectorProcessor;
+
+/**
+ * It is not a collector it is just a scanned result holder.
+ */
+public class DictionaryBasedVectorResultCollector extends AbstractScannedResultCollector {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DictionaryBasedVectorResultCollector.class.getName());
+
+  private ColumnVectorInfo[] dictionaryInfo;
+
+  private ColumnVectorInfo[] noDictionaryInfo;
+
+  private ColumnVectorInfo[] complexInfo;
+
+  private ColumnVectorInfo[] measureInfo;
+
+  private ColumnVectorInfo[] allColumnInfo;
+
+  public DictionaryBasedVectorResultCollector(BlockExecutionInfo blockExecutionInfos) {
+    super(blockExecutionInfos);
+    QueryDimension[] queryDimensions = tableBlockExecutionInfos.getQueryDimensions();
+    QueryMeasure[] queryMeasures = tableBlockExecutionInfos.getQueryMeasures();
+    measureInfo = new ColumnVectorInfo[queryMeasures.length];
+    allColumnInfo = new ColumnVectorInfo[queryDimensions.length + queryMeasures.length];
+    List<ColumnVectorInfo> dictInfoList = new ArrayList<>();
+    List<ColumnVectorInfo> noDictInfoList = new ArrayList<>();
+    List<ColumnVectorInfo> complexList = new ArrayList<>();
+    for (int i = 0; i < queryDimensions.length; i++) {
+      if (!queryDimensions[i].getDimension().hasEncoding(Encoding.DICTIONARY)) {
+        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
+        noDictInfoList.add(columnVectorInfo);
+        columnVectorInfo.dimension = queryDimensions[i];
+        columnVectorInfo.ordinal = queryDimensions[i].getDimension().getOrdinal();
+        allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
+      } else if (queryDimensions[i].getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
+        dictInfoList.add(columnVectorInfo);
+        columnVectorInfo.dimension = queryDimensions[i];
+        columnVectorInfo.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(queryDimensions[i].getDimension().getDataType());
+        columnVectorInfo.ordinal = queryDimensions[i].getDimension().getOrdinal();
+        allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
+      } else if (queryDimensions[i].getDimension().isComplex()) {
+        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
+        complexList.add(columnVectorInfo);
+        columnVectorInfo.dimension = queryDimensions[i];
+        columnVectorInfo.ordinal = queryDimensions[i].getDimension().getOrdinal();
+        columnVectorInfo.genericQueryType =
+            tableBlockExecutionInfos.getComlexDimensionInfoMap().get(columnVectorInfo.ordinal);
+        allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
+      } else {
+        ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
+        dictInfoList.add(columnVectorInfo);
+        columnVectorInfo.dimension = queryDimensions[i];
+        columnVectorInfo.ordinal = queryDimensions[i].getDimension().getOrdinal();
+        allColumnInfo[queryDimensions[i].getQueryOrder()] = columnVectorInfo;
+      }
+    }
+    for (int i = 0; i < queryMeasures.length; i++) {
+      ColumnVectorInfo columnVectorInfo = new ColumnVectorInfo();
+      columnVectorInfo.measureVectorFiller = MeasureDataVectorProcessor.MeasureVectorFillerFactory
+          .getMeasureVectorFiller(queryMeasures[i].getMeasure().getDataType());
+      columnVectorInfo.ordinal = queryMeasures[i].getMeasure().getOrdinal();
+      columnVectorInfo.measure = queryMeasures[i];
+      measureInfo[i] = columnVectorInfo;
+      allColumnInfo[queryMeasures[i].getQueryOrder()] = columnVectorInfo;
+    }
+    dictionaryInfo = dictInfoList.toArray(new ColumnVectorInfo[dictInfoList.size()]);
+    noDictionaryInfo = noDictInfoList.toArray(new ColumnVectorInfo[noDictInfoList.size()]);
+    complexInfo = complexList.toArray(new ColumnVectorInfo[complexList.size()]);
+    Arrays.sort(dictionaryInfo);
+    Arrays.sort(noDictionaryInfo);
+    Arrays.sort(measureInfo);
+    Arrays.sort(complexInfo);
+  }
+
+  @Override public List<Object[]> collectData(AbstractScannedResult scannedResult, int batchSize) {
+    throw new UnsupportedOperationException("collectData is not supported here");
+  }
+
+  @Override public void collectVectorBatch(AbstractScannedResult scannedResult,
+      CarbonColumnarBatch columnarBatch) {
+    int rowCounter = scannedResult.getRowCounter();
+    int availableRows = scannedResult.numberOfOutputRows() - rowCounter;
+    int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getActualSize();
+    requiredRows = Math.min(requiredRows, availableRows);
+    if (requiredRows < 1) {
+      return;
+    }
+    for (int i = 0; i < allColumnInfo.length; i++) {
+      allColumnInfo[i].size = requiredRows;
+      allColumnInfo[i].offset = rowCounter;
+      allColumnInfo[i].vectorOffset = columnarBatch.getRowCounter();
+      allColumnInfo[i].vector = columnarBatch.columnVectors[i];
+    }
+
+    scannedResult.fillColumnarDictionaryBatch(dictionaryInfo);
+    scannedResult.fillColumnarNoDictionaryBatch(noDictionaryInfo);
+    scannedResult.fillColumnarMeasureBatch(measureInfo, measuresOrdinal);
+    scannedResult.fillColumnarComplexBatch(complexInfo);
+    scannedResult.setRowCounter(rowCounter + requiredRows);
+    columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows);
+    columnarBatch.setRowCounter(columnarBatch.getRowCounter() + requiredRows);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
index 7ed6e60..947b992 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/QueryExecutorFactory.java
@@ -19,6 +19,8 @@
 package org.apache.carbondata.scan.executor;
 
 import org.apache.carbondata.scan.executor.impl.DetailQueryExecutor;
+import org.apache.carbondata.scan.executor.impl.VectorDetailQueryExecutor;
+import org.apache.carbondata.scan.model.QueryModel;
 
 /**
  * Factory class to get the query executor from RDD
@@ -26,7 +28,11 @@ import org.apache.carbondata.scan.executor.impl.DetailQueryExecutor;
  */
 public class QueryExecutorFactory {
 
-  public static QueryExecutor getQueryExecutor() {
-    return new DetailQueryExecutor();
+  public static QueryExecutor getQueryExecutor(QueryModel queryModel) {
+    if (queryModel.isVectorReader()) {
+      return new VectorDetailQueryExecutor();
+    } else {
+      return new DetailQueryExecutor();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
index 8289c8b..df04dae 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/AbstractQueryExecutor.java
@@ -381,6 +381,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     blockExecutionInfo.setEachColumnValueSize(segmentProperties.getEachDimColumnValueSize());
     blockExecutionInfo.setComplexColumnParentBlockIndexes(
         getComplexDimensionParentBlockIndexes(updatedQueryDimension));
+    blockExecutionInfo.setVectorBatchCollector(queryModel.isVectorReader());
     try {
       // to set column group and its key structure info which will be used
       // to

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java
new file mode 100644
index 0000000..1cbf9c9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/VectorDetailQueryExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.executor.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.scan.model.QueryModel;
+import org.apache.carbondata.scan.result.iterator.VectorDetailQueryResultIterator;
+
+/**
+ * Below class will be used to execute the detail query and returns columnar vectors.
+ */
+public class VectorDetailQueryExecutor extends AbstractQueryExecutor {
+
+  @Override public CarbonIterator<Object[]> execute(QueryModel queryModel)
+      throws QueryExecutionException {
+    List<BlockExecutionInfo> blockExecutionInfoList = getBlockExecutionInfos(queryModel);
+    return new VectorDetailQueryResultIterator(blockExecutionInfoList, queryModel,
+        queryProperties.executorService);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
index 2e80984..12ed20f 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
@@ -242,6 +242,11 @@ public class BlockExecutionInfo {
   private QueryMeasure[] queryMeasures;
 
   /**
+   * whether it needs to read data in vector/columnar format.
+   */
+  private boolean vectorBatchCollector;
+
+  /**
    * @param blockIndex the tableBlock to set
    */
   public void setDataBlock(AbstractIndex blockIndex) {
@@ -724,4 +729,12 @@ public class BlockExecutionInfo {
   public void setStartBlockletIndex(int startBlockletIndex) {
     this.startBlockletIndex = startBlockletIndex;
   }
+
+  public boolean isVectorBatchCollector() {
+    return vectorBatchCollector;
+  }
+
+  public void setVectorBatchCollector(boolean vectorBatchCollector) {
+    this.vectorBatchCollector = vectorBatchCollector;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
index b21a082..61332f5 100644
--- a/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
+++ b/core/src/main/java/org/apache/carbondata/scan/model/QueryModel.java
@@ -132,6 +132,8 @@ public class QueryModel implements Serializable {
 
   private QueryStatisticsRecorder statisticsRecorder;
 
+  private boolean vectorReader;
+
   /**
    * Invalid table blocks, which need to be removed from
    * memory, invalid blocks can be segment which are deleted
@@ -520,4 +522,12 @@ public class QueryModel implements Serializable {
   public void setInvalidSegmentIds(List<String> invalidSegmentIds) {
     this.invalidSegmentIds = invalidSegmentIds;
   }
+
+  public boolean isVectorReader() {
+    return vectorReader;
+  }
+
+  public void setVectorReader(boolean vectorReader) {
+    this.vectorReader = vectorReader;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/processor/AbstractDataBlockIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/processor/AbstractDataBlockIterator.java b/core/src/main/java/org/apache/carbondata/scan/processor/AbstractDataBlockIterator.java
index f2a82c9..ff3b31a 100644
--- a/core/src/main/java/org/apache/carbondata/scan/processor/AbstractDataBlockIterator.java
+++ b/core/src/main/java/org/apache/carbondata/scan/processor/AbstractDataBlockIterator.java
@@ -28,10 +28,12 @@ import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsModel;
 import org.apache.carbondata.core.datastorage.store.FileHolder;
 import org.apache.carbondata.scan.collector.ScannedResultCollector;
 import org.apache.carbondata.scan.collector.impl.DictionaryBasedResultCollector;
+import org.apache.carbondata.scan.collector.impl.DictionaryBasedVectorResultCollector;
 import org.apache.carbondata.scan.collector.impl.RawBasedResultCollector;
 import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.scan.result.AbstractScannedResult;
+import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;
 import org.apache.carbondata.scan.scanner.BlockletScanner;
 import org.apache.carbondata.scan.scanner.impl.FilterScanner;
 import org.apache.carbondata.scan.scanner.impl.NonFilterScanner;
@@ -96,6 +98,9 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
     if (blockExecutionInfo.isRawRecordDetailQuery()) {
       this.scannerResultAggregator =
           new RawBasedResultCollector(blockExecutionInfo);
+    } else if (blockExecutionInfo.isVectorBatchCollector()) {
+      this.scannerResultAggregator =
+          new DictionaryBasedVectorResultCollector(blockExecutionInfo);
     } else {
       this.scannerResultAggregator =
           new DictionaryBasedResultCollector(blockExecutionInfo);
@@ -140,5 +145,5 @@ public abstract class AbstractDataBlockIterator extends CarbonIterator<List<Obje
     return null;
   }
 
-
+  public abstract void processNextBatch(CarbonColumnarBatch columnarBatch);
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/processor/impl/DataBlockIteratorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/processor/impl/DataBlockIteratorImpl.java b/core/src/main/java/org/apache/carbondata/scan/processor/impl/DataBlockIteratorImpl.java
index f553e38..2a8bdc4 100644
--- a/core/src/main/java/org/apache/carbondata/scan/processor/impl/DataBlockIteratorImpl.java
+++ b/core/src/main/java/org/apache/carbondata/scan/processor/impl/DataBlockIteratorImpl.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsModel;
 import org.apache.carbondata.core.datastorage.store.FileHolder;
 import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.scan.processor.AbstractDataBlockIterator;
+import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;
 
 /**
  * Below class will be used to process the block for detail query
@@ -60,4 +61,13 @@ public class DataBlockIteratorImpl extends AbstractDataBlockIterator {
     return collectedResult;
   }
 
+  public void processNextBatch(CarbonColumnarBatch columnarBatch) {
+    if (updateScanner()) {
+      this.scannerResultAggregator.collectVectorBatch(scannedResult, columnarBatch);
+      while (columnarBatch.getActualSize() < columnarBatch.getBatchSize() && updateScanner()) {
+        this.scannerResultAggregator.collectVectorBatch(scannedResult, columnarBatch);
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/result/AbstractScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/scan/result/AbstractScannedResult.java
index 1e7b7d8..cd0ce9c 100644
--- a/core/src/main/java/org/apache/carbondata/scan/result/AbstractScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/scan/result/AbstractScannedResult.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -32,6 +33,8 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.scan.filter.GenericQueryType;
+import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Scanned result class which will store and provide the result on request
@@ -59,24 +62,24 @@ public abstract class AbstractScannedResult {
   /**
    * to keep track of number of rows process
    */
-  private int rowCounter;
+  protected int rowCounter;
   /**
    * dimension column data chunk
    */
-  private DimensionColumnDataChunk[] dataChunks;
+  protected DimensionColumnDataChunk[] dataChunks;
   /**
    * measure column data chunk
    */
-  private MeasureColumnDataChunk[] measureDataChunks;
+  protected MeasureColumnDataChunk[] measureDataChunks;
   /**
    * dictionary column block index in file
    */
-  private int[] dictionaryColumnBlockIndexes;
+  protected int[] dictionaryColumnBlockIndexes;
 
   /**
    * no dictionary column block index in file
    */
-  private int[] noDictionaryColumnBlockIndexes;
+  protected int[] noDictionaryColumnBlockIndexes;
 
   /**
    * column group to is key structure info
@@ -86,7 +89,7 @@ public abstract class AbstractScannedResult {
    * then from complete column group key it will be used to mask the key and
    * get the particular column key
    */
-  private Map<Integer, KeyStructureInfo> columnGroupKeyStructureInfo;
+  protected Map<Integer, KeyStructureInfo> columnGroupKeyStructureInfo;
 
   /**
    *
@@ -178,6 +181,65 @@ public abstract class AbstractScannedResult {
   }
 
   /**
+   * Fill the column data of dictionary to vector
+   */
+  public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
+    int column = 0;
+    for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
+      column = dataChunks[dictionaryColumnBlockIndexes[i]]
+          .fillConvertedChunkData(vectorInfo, column,
+              columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
+    }
+  }
+
+  /**
+   * Fill the column data to vector
+   */
+  public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
+    int column = 0;
+    for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
+      column = dataChunks[noDictionaryColumnBlockIndexes[i]]
+          .fillConvertedChunkData(vectorInfo, column,
+              columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i]));
+    }
+  }
+
+  /**
+   * Fill the measure column data to vector
+   */
+  public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) {
+    for (int i = 0; i < measuresOrdinal.length; i++) {
+      vectorInfo[i].measureVectorFiller
+          .fillMeasureVector(measureDataChunks[measuresOrdinal[i]], vectorInfo[i]);
+    }
+  }
+
+  public void fillColumnarComplexBatch(ColumnVectorInfo[] vectorInfos) {
+    for (int i = 0; i < vectorInfos.length; i++) {
+      int offset = vectorInfos[i].offset;
+      int len = offset + vectorInfos[i].size;
+      int vectorOffset = vectorInfos[i].vectorOffset;
+      CarbonColumnVector vector = vectorInfos[i].vector;
+      for (int j = offset; j < len; j++) {
+        ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+        DataOutputStream dataOutput = new DataOutputStream(byteStream);
+        try {
+          vectorInfos[i].genericQueryType.parseBlocksAndReturnComplexColumnByteArray(dataChunks,
+              rowMapping == null ? j : rowMapping[j], dataOutput);
+          Object data = vectorInfos[i].genericQueryType
+              .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
+          vector.putObject(vectorOffset++, data);
+        } catch (IOException e) {
+          LOGGER.error(e);
+        } finally {
+          CarbonUtil.closeStreams(dataOutput);
+          CarbonUtil.closeStreams(byteStream);
+        }
+      }
+    }
+  }
+
+  /**
    * Just increment the counter incase of query only on measures.
    */
   public void incrementCounter() {
@@ -186,6 +248,13 @@ public abstract class AbstractScannedResult {
   }
 
   /**
+   * increment the counter.
+   */
+  public void setRowCounter(int rowCounter) {
+    this.rowCounter = rowCounter;
+  }
+
+  /**
    * Below method will be used to get the dimension data based on dimension
    * ordinal and index
    *
@@ -349,6 +418,10 @@ public abstract class AbstractScannedResult {
         .getReadableBigDecimalValueByIndex(rowIndex);
   }
 
+  public int getRowCounter() {
+    return rowCounter;
+  }
+
   /**
    * will return the current valid row id
    *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/result/impl/FilterQueryScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/impl/FilterQueryScannedResult.java b/core/src/main/java/org/apache/carbondata/scan/result/impl/FilterQueryScannedResult.java
index f1e6594..e192f95 100644
--- a/core/src/main/java/org/apache/carbondata/scan/result/impl/FilterQueryScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/scan/result/impl/FilterQueryScannedResult.java
@@ -22,6 +22,7 @@ import java.math.BigDecimal;
 
 import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.scan.result.AbstractScannedResult;
+import org.apache.carbondata.scan.result.vector.ColumnVectorInfo;
 
 /**
  * Result provider class in case of filter query
@@ -144,4 +145,38 @@ public class FilterQueryScannedResult extends AbstractScannedResult {
     return getBigDecimalMeasureValue(ordinal, rowMapping[currentRow]);
   }
 
+  /**
+   * Fill the column data to vector
+   */
+  public void fillColumnarDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
+    int column = 0;
+    for (int i = 0; i < this.dictionaryColumnBlockIndexes.length; i++) {
+      column = dataChunks[dictionaryColumnBlockIndexes[i]]
+          .fillConvertedChunkData(rowMapping, vectorInfo, column,
+              columnGroupKeyStructureInfo.get(dictionaryColumnBlockIndexes[i]));
+    }
+  }
+
+  /**
+   * Fill the column data to vector
+   */
+  public void fillColumnarNoDictionaryBatch(ColumnVectorInfo[] vectorInfo) {
+    int column = 0;
+    for (int i = 0; i < this.noDictionaryColumnBlockIndexes.length; i++) {
+      column = dataChunks[noDictionaryColumnBlockIndexes[i]]
+          .fillConvertedChunkData(rowMapping, vectorInfo, column,
+              columnGroupKeyStructureInfo.get(noDictionaryColumnBlockIndexes[i]));
+    }
+  }
+
+  /**
+   * Fill the measure column data to vector
+   */
+  public void fillColumnarMeasureBatch(ColumnVectorInfo[] vectorInfo, int[] measuresOrdinal) {
+    for (int i = 0; i < measuresOrdinal.length; i++) {
+      vectorInfo[i].measureVectorFiller
+          .fillMeasureVectorForFilter(rowMapping, measureDataChunks[measuresOrdinal[i]],
+              vectorInfo[i]);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
index 07ccab4..c6e9ff7 100644
--- a/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
+++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/AbstractDetailQueryResultIterator.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.scan.model.QueryModel;
 import org.apache.carbondata.scan.processor.AbstractDataBlockIterator;
 import org.apache.carbondata.scan.processor.impl.DataBlockIteratorImpl;
+import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;
 
 /**
  * In case of detail query we cannot keep all the records in memory so for
@@ -177,4 +178,8 @@ public abstract class AbstractDetailQueryResultIterator extends CarbonIterator {
         .put(QueryStatisticsConstants.VALID_SCAN_BLOCKLET_NUM, queryStatisticValidScanBlocklet);
   }
 
+  public void processNextBatch(CarbonColumnarBatch columnarBatch) {
+    throw new UnsupportedOperationException("Please use VectorDetailQueryResultIterator");
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorChunkRowIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorChunkRowIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorChunkRowIterator.java
new file mode 100644
index 0000000..a2ea05b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorChunkRowIterator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.scan.result.iterator;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;
+
+/**
+ * Iterator over row result
+ */
+public class VectorChunkRowIterator extends CarbonIterator<Object[]> {
+
+  /**
+   * iterator over chunk result
+   */
+  private AbstractDetailQueryResultIterator iterator;
+
+  /**
+   * currect chunk
+   */
+  private CarbonColumnarBatch columnarBatch;
+
+  private int batchSize;
+
+  private int currentIndex;
+
+  public VectorChunkRowIterator(AbstractDetailQueryResultIterator iterator,
+      CarbonColumnarBatch columnarBatch) {
+    this.iterator = iterator;
+    this.columnarBatch = columnarBatch;
+    if (iterator.hasNext()) {
+      iterator.processNextBatch(columnarBatch);
+      batchSize = columnarBatch.getActualSize();
+      currentIndex = 0;
+    }
+  }
+
+  /**
+   * Returns {@code true} if the iteration has more elements. (In other words,
+   * returns {@code true} if {@link #next} would return an element rather than
+   * throwing an exception.)
+   *
+   * @return {@code true} if the iteration has more elements
+   */
+  @Override public boolean hasNext() {
+    if (currentIndex < batchSize) {
+      return true;
+    } else {
+      while (iterator.hasNext()) {
+        columnarBatch.reset();
+        iterator.processNextBatch(columnarBatch);
+        batchSize = columnarBatch.getActualSize();
+        currentIndex = 0;
+        if (currentIndex < batchSize) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Returns the next element in the iteration.
+   *
+   * @return the next element in the iteration
+   */
+  @Override public Object[] next() {
+    Object[] row = new Object[columnarBatch.columnVectors.length];
+    for (int i = 0; i < row.length; i++) {
+      row[i] = columnarBatch.columnVectors[i].getData(currentIndex);
+    }
+    currentIndex++;
+    return row;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java b/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java
new file mode 100644
index 0000000..8d0e80c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/result/iterator/VectorDetailQueryResultIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.result.iterator;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.carbondata.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.scan.model.QueryModel;
+import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;
+
+/**
+ * It reads the data vector batch format
+ */
+public class VectorDetailQueryResultIterator extends AbstractDetailQueryResultIterator {
+
+  private final Object lock = new Object();
+
+  public VectorDetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel queryModel,
+      ExecutorService execService) {
+    super(infos, queryModel, execService);
+  }
+
+  @Override public Object next() {
+    throw new UnsupportedOperationException("call processNextBatch instaed");
+  }
+
+  public void processNextBatch(CarbonColumnarBatch columnarBatch) {
+    synchronized (lock) {
+      updateDataBlockIterator();
+      if (dataBlockIterator != null) {
+        dataBlockIterator.processNextBatch(columnarBatch);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/result/vector/CarbonColumnVector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/vector/CarbonColumnVector.java b/core/src/main/java/org/apache/carbondata/scan/result/vector/CarbonColumnVector.java
new file mode 100644
index 0000000..d89f52f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/result/vector/CarbonColumnVector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.scan.result.vector;
+
+import org.apache.spark.sql.types.Decimal;
+
+public interface CarbonColumnVector {
+
+  public void putShort(int rowId, short value);
+
+  public void putInt(int rowId, int value);
+
+  public void putLong(int rowId, long value);
+
+  public void putDecimal(int rowId, Decimal value, int precision);
+
+  public void putDouble(int rowId, double value);
+
+  public void putBytes(int rowId, byte[] value);
+
+  public void putBytes(int rowId, int offset, int length, byte[] value);
+
+  public void putNull(int rowId);
+
+  public boolean isNull(int rowId);
+
+  public void putObject(int rowId, Object obj);
+
+  public Object getData(int rowId);
+
+  public void reset();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/result/vector/CarbonColumnarBatch.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/vector/CarbonColumnarBatch.java b/core/src/main/java/org/apache/carbondata/scan/result/vector/CarbonColumnarBatch.java
new file mode 100644
index 0000000..afc53ca
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/result/vector/CarbonColumnarBatch.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.scan.result.vector;
+
+public class CarbonColumnarBatch {
+
+  public CarbonColumnVector[] columnVectors;
+
+  private int batchSize;
+
+  private int actualSize;
+
+  private int rowCounter;
+
+  public CarbonColumnarBatch(CarbonColumnVector[] columnVectors, int batchSize) {
+    this.columnVectors = columnVectors;
+    this.batchSize = batchSize;
+  }
+
+  public int getBatchSize() {
+    return batchSize;
+  }
+
+  public int getActualSize() {
+    return actualSize;
+  }
+
+  public void setActualSize(int actualSize) {
+    this.actualSize = actualSize;
+  }
+
+  public void reset() {
+    actualSize = 0;
+    rowCounter = 0;
+    for (int i = 0; i < columnVectors.length; i++) {
+      columnVectors[i].reset();
+    }
+  }
+
+  public int getRowCounter() {
+    return rowCounter;
+  }
+
+  public void setRowCounter(int rowCounter) {
+    this.rowCounter = rowCounter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/result/vector/ColumnVectorInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/vector/ColumnVectorInfo.java b/core/src/main/java/org/apache/carbondata/scan/result/vector/ColumnVectorInfo.java
new file mode 100644
index 0000000..4b0d7b3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/result/vector/ColumnVectorInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.result.vector;
+
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.scan.filter.GenericQueryType;
+import org.apache.carbondata.scan.model.QueryDimension;
+import org.apache.carbondata.scan.model.QueryMeasure;
+
+public class ColumnVectorInfo implements Comparable<ColumnVectorInfo> {
+  public int offset;
+  public int size;
+  public CarbonColumnVector vector;
+  public int vectorOffset;
+  public QueryDimension dimension;
+  public QueryMeasure measure;
+  public int ordinal;
+  public DirectDictionaryGenerator directDictionaryGenerator;
+  public MeasureDataVectorProcessor.MeasureVectorFiller measureVectorFiller;
+  public GenericQueryType genericQueryType;
+
+  @Override public int compareTo(ColumnVectorInfo o) {
+    return ordinal - o.ordinal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/result/vector/MeasureDataVectorProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/vector/MeasureDataVectorProcessor.java b/core/src/main/java/org/apache/carbondata/scan/result/vector/MeasureDataVectorProcessor.java
new file mode 100644
index 0000000..9e455e2
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/result/vector/MeasureDataVectorProcessor.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.result.vector;
+
+import java.math.BigDecimal;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+
+import org.apache.spark.sql.types.Decimal;
+
+public class MeasureDataVectorProcessor {
+
+  public static interface MeasureVectorFiller {
+
+    void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info);
+
+    void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info);
+  }
+
+  public static class IntegralMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putInt(vectorOffset,
+              (int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i));
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putInt(vectorOffset,
+              (int)dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow));
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class ShortMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putShort(vectorOffset,
+              (short) dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i));
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putShort(vectorOffset,
+              (short) dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow));
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class LongMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putLong(vectorOffset,
+              dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(i));
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putLong(vectorOffset,
+              dataChunk.getMeasureDataHolder().getReadableLongValueByIndex(currentRow));
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class DecimalMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      int precision = info.measure.getMeasure().getPrecision();
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          BigDecimal decimal =
+              dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(i);
+          Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(decimal);
+          vector.putDecimal(vectorOffset, toDecimal, precision);
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      int precision = info.measure.getMeasure().getPrecision();
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          BigDecimal decimal =
+              dataChunk.getMeasureDataHolder().getReadableBigDecimalValueByIndex(currentRow);
+          Decimal toDecimal = org.apache.spark.sql.types.Decimal.apply(decimal);
+          vector.putDecimal(vectorOffset, toDecimal, precision);
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class DefaultMeasureVectorFiller implements MeasureVectorFiller {
+
+    @Override
+    public void fillMeasureVector(MeasureColumnDataChunk dataChunk, ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        if (nullBitSet.get(i)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putDouble(vectorOffset,
+              dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(i));
+        }
+        vectorOffset++;
+      }
+    }
+
+    @Override
+    public void fillMeasureVectorForFilter(int[] rowMapping, MeasureColumnDataChunk dataChunk,
+        ColumnVectorInfo info) {
+      int offset = info.offset;
+      int len = offset + info.size;
+      int vectorOffset = info.vectorOffset;
+      CarbonColumnVector vector = info.vector;
+      BitSet nullBitSet = dataChunk.getNullValueIndexHolder().getBitSet();
+      for (int i = offset; i < len; i++) {
+        int currentRow = rowMapping[i];
+        if (nullBitSet.get(currentRow)) {
+          vector.putNull(vectorOffset);
+        } else {
+          vector.putDouble(vectorOffset,
+              dataChunk.getMeasureDataHolder().getReadableDoubleValueByIndex(currentRow));
+        }
+        vectorOffset++;
+      }
+    }
+  }
+
+  public static class MeasureVectorFillerFactory {
+
+    public static MeasureVectorFiller getMeasureVectorFiller(DataType dataType) {
+      switch (dataType) {
+        case SHORT:
+          return new ShortMeasureVectorFiller();
+        case INT:
+          return new IntegralMeasureVectorFiller();
+        case LONG:
+          return new LongMeasureVectorFiller();
+        case DECIMAL:
+          return new DecimalMeasureVectorFiller();
+        default:
+          return new DefaultMeasureVectorFiller();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/core/src/main/java/org/apache/carbondata/scan/result/vector/impl/CarbonColumnVectorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/result/vector/impl/CarbonColumnVectorImpl.java b/core/src/main/java/org/apache/carbondata/scan/result/vector/impl/CarbonColumnVectorImpl.java
new file mode 100644
index 0000000..369e1b7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/scan/result/vector/impl/CarbonColumnVectorImpl.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.scan.result.vector.impl;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
+
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class CarbonColumnVectorImpl implements CarbonColumnVector {
+
+  private Object[] data;
+
+  private int[] ints;
+
+  private long[] longs;
+
+  private Decimal[] decimals;
+
+  private byte[][] bytes;
+
+  private double[] doubles;
+
+  private BitSet nullBytes;
+
+  private DataType dataType;
+
+  public CarbonColumnVectorImpl(int batchSize, DataType dataType) {
+    nullBytes = new BitSet(batchSize);
+    this.dataType = dataType;
+    switch (dataType) {
+      case INT:
+        ints = new int[batchSize];
+        break;
+      case LONG:
+        longs = new long[batchSize];
+        break;
+      case DOUBLE:
+        doubles = new double[batchSize];
+        break;
+      case STRING:
+        bytes = new byte[batchSize][];
+        break;
+      case DECIMAL:
+        decimals = new Decimal[batchSize];
+        break;
+      default:
+        data = new Object[batchSize];
+    }
+  }
+
+  @Override public void putShort(int rowId, short value) {
+
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    ints[rowId] = value;
+  }
+
+  @Override public void putLong(int rowId, long value) {
+    longs[rowId] = value;
+  }
+
+  @Override public void putDecimal(int rowId, Decimal value, int precision) {
+    decimals[rowId] = value;
+  }
+
+  @Override public void putDouble(int rowId, double value) {
+    doubles[rowId] = value;
+  }
+
+  @Override public void putBytes(int rowId, byte[] value) {
+    bytes[rowId] = value;
+  }
+
+  @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+
+  }
+
+  @Override public void putNull(int rowId) {
+    nullBytes.set(rowId);
+  }
+
+  @Override public boolean isNull(int rowId) {
+    return nullBytes.get(rowId);
+  }
+
+  @Override public void putObject(int rowId, Object obj) {
+    data[rowId] = obj;
+  }
+
+  @Override public Object getData(int rowId) {
+    if (nullBytes.get(rowId)) {
+      return null;
+    }
+    switch (dataType) {
+      case INT:
+        return ints[rowId];
+      case LONG:
+        return longs[rowId];
+      case DOUBLE:
+        return doubles[rowId];
+      case STRING:
+        return UTF8String.fromBytes(bytes[rowId]);
+      case DECIMAL:
+        return decimals[rowId];
+      default:
+        return data[rowId];
+    }
+  }
+
+  @Override public void reset() {
+    nullBytes.clear();
+    switch (dataType) {
+      case INT:
+        Arrays.fill(ints, 0);
+        break;
+      case LONG:
+        Arrays.fill(longs, 0);
+        break;
+      case DOUBLE:
+        Arrays.fill(doubles, 0);
+        break;
+      case STRING:
+        Arrays.fill(bytes, null);
+        break;
+      case DECIMAL:
+        Arrays.fill(decimals, null);
+        break;
+      default:
+        Arrays.fill(data, null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 66f0e3b..aa995fa 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -433,6 +433,14 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
       TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
     Configuration configuration = taskAttemptContext.getConfiguration();
+    QueryModel queryModel = getQueryModel(inputSplit, taskAttemptContext);
+    CarbonReadSupport readSupport = getReadSupportClass(configuration);
+    return new CarbonRecordReader<T>(queryModel, readSupport);
+  }
+
+  public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException {
+    Configuration configuration = taskAttemptContext.getConfiguration();
     CarbonTable carbonTable = getCarbonTable(configuration);
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(configuration);
 
@@ -455,12 +463,10 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         queryModel.setInvalidSegmentIds(invalidSegments);
       }
     }
-
-    CarbonReadSupport readSupport = getReadSupportClass(configuration);
-    return new CarbonRecordReader<T>(queryModel, readSupport);
+    return queryModel;
   }
 
-  private CarbonReadSupport getReadSupportClass(Configuration configuration) {
+  public CarbonReadSupport getReadSupportClass(Configuration configuration) {
     String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
     //By default it uses dictionary decoder read class
     CarbonReadSupport readSupport = null;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 91e6e46..363163b 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -54,7 +54,7 @@ public class CarbonRecordReader<T> extends RecordReader<Void, T> {
   public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) {
     this.queryModel = queryModel;
     this.readSupport = readSupport;
-    this.queryExecutor = QueryExecutorFactory.getQueryExecutor();
+    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/376d69ff/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
index 59b45ad..3a68efb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
@@ -22,7 +22,10 @@ import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 
-public class RawDataReadSupport implements CarbonReadSupport<Object[]> {
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow;
+
+public class RawDataReadSupport implements CarbonReadSupport<InternalRow> {
 
   @Override
   public void initialize(CarbonColumn[] carbonColumns,
@@ -36,8 +39,8 @@ public class RawDataReadSupport implements CarbonReadSupport<Object[]> {
    * @return
    */
   @Override
-  public Object[] readRow(Object[] data) {
-    return data;
+  public InternalRow readRow(Object[] data) {
+    return new GenericMutableRow(data);
   }
 
   /**