You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/01/10 11:30:08 UTC

[1/2] carbondata git commit: [CARBONDATA-3220] Support presto to read stream segment data

Repository: carbondata
Updated Branches:
  refs/heads/master 8e6def9fa -> d78db8f6b


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index aebe549..31417bc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -41,13 +41,13 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.stream.CarbonStreamInputFormat
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
 import org.apache.carbondata.spark.util.CommonUtil
-import org.apache.carbondata.streaming.CarbonStreamInputFormat
 
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
index 50d6c46..e822634 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/CarbonStreamRecordReader.java
@@ -18,285 +18,45 @@
 package org.apache.carbondata.stream;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.reader.CarbonHeaderReader;
 import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
-import org.apache.carbondata.core.scan.filter.FilterUtil;
-import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
-import org.apache.carbondata.core.scan.filter.intf.RowImpl;
-import org.apache.carbondata.core.scan.filter.intf.RowIntf;
-import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.util.CarbonMetadataUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.format.BlockletHeader;
-import org.apache.carbondata.format.FileHeader;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 import org.apache.carbondata.hadoop.InputMetricsStats;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-import org.apache.carbondata.streaming.CarbonStreamInputFormat;
-import org.apache.carbondata.streaming.StreamBlockletReader;
+import org.apache.carbondata.hadoop.stream.StreamRecordReader;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.CarbonVectorProxy;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 
 /**
- * Stream record reader
+ * Stream vector/row record reader
  */
-public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
-  // vector reader
-  private boolean isVectorReader;
-
-  // metadata
-  private CarbonTable carbonTable;
-  private CarbonColumn[] storageColumns;
-  private boolean[] isRequired;
-  private DataType[] measureDataTypes;
-  private int dimensionCount;
-  private int measureCount;
-
-  // input
-  private FileSplit fileSplit;
-  private Configuration hadoopConf;
-  private StreamBlockletReader input;
-  private boolean isFirstRow = true;
-  private QueryModel model;
+public class CarbonStreamRecordReader extends StreamRecordReader {
 
-  // decode data
-  private BitSet allNonNull;
-  private boolean[] isNoDictColumn;
-  private DirectDictionaryGenerator[] directDictionaryGenerators;
-  private CacheProvider cacheProvider;
-  private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
-  private GenericQueryType[] queryTypes;
-  private String compressorName;
-
-  // vectorized reader
-  private StructType outputSchema;
+  // vector reader
+  protected boolean isVectorReader;
   private CarbonVectorProxy vectorProxy;
-  private boolean isFinished = false;
-
-  // filter
-  private FilterExecuter filter;
-  private boolean[] isFilterRequired;
-  private Object[] filterValues;
-  private RowIntf filterRow;
-  private int[] filterMap;
-
-  // output
-  private CarbonColumn[] projection;
-  private boolean[] isProjectionRequired;
-  private int[] projectionMap;
-  private Object[] outputValues;
+  private StructType outputSchema;
   private InternalRow outputRow;
 
-  // empty project, null filter
-  private boolean skipScanData;
-
-  // return raw row for handoff
-  private boolean useRawRow = false;
-
   // InputMetricsStats
   private InputMetricsStats inputMetricsStats;
 
   public CarbonStreamRecordReader(boolean isVectorReader, InputMetricsStats inputMetricsStats,
       QueryModel mdl, boolean useRawRow) {
+    super(mdl, useRawRow);
     this.isVectorReader = isVectorReader;
     this.inputMetricsStats = inputMetricsStats;
-    this.model = mdl;
-    this.useRawRow = useRawRow;
-
   }
-  @Override public void initialize(InputSplit split, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    // input
-    if (split instanceof CarbonInputSplit) {
-      fileSplit = (CarbonInputSplit) split;
-    } else if (split instanceof CarbonMultiBlockSplit) {
-      fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
-    } else {
-      fileSplit = (FileSplit) split;
-    }
-
-    // metadata
-    hadoopConf = context.getConfiguration();
-    if (model == null) {
-      CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
-      model = format.createQueryModel(split, context);
-    }
-    carbonTable = model.getTable();
-    List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getTableName());
-    dimensionCount = dimensions.size();
-    List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getTableName());
-    measureCount = measures.size();
-    List<CarbonColumn> carbonColumnList =
-        carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
-    storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
-    isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
-    directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
-    for (int i = 0; i < storageColumns.length; i++) {
-      if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
-            .getDirectDictionaryGenerator(storageColumns[i].getDataType());
-      }
-    }
-    measureDataTypes = new DataType[measureCount];
-    for (int i = 0; i < measureCount; i++) {
-      measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType();
-    }
-
-    // decode data
-    allNonNull = new BitSet(storageColumns.length);
-    projection = model.getProjectionColumns();
-
-    isRequired = new boolean[storageColumns.length];
-    boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
-    boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
-    isFilterRequired = new boolean[storageColumns.length];
-    filterMap = new int[storageColumns.length];
-    for (int i = 0; i < storageColumns.length; i++) {
-      if (storageColumns[i].isDimension()) {
-        if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
-          isRequired[i] = true;
-          isFilterRequired[i] = true;
-          filterMap[i] = storageColumns[i].getOrdinal();
-        }
-      } else {
-        if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
-          isRequired[i] = true;
-          isFilterRequired[i] = true;
-          filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
-        }
-      }
-    }
-
-    isProjectionRequired = new boolean[storageColumns.length];
-    projectionMap = new int[storageColumns.length];
-    for (int j = 0; j < projection.length; j++) {
-      for (int i = 0; i < storageColumns.length; i++) {
-        if (storageColumns[i].getColName().equals(projection[j].getColName())) {
-          isRequired[i] = true;
-          isProjectionRequired[i] = true;
-          projectionMap[i] = j;
-          break;
-        }
-      }
-    }
-
-    // initialize filter
-    if (null != model.getFilterExpressionResolverTree()) {
-      initializeFilter();
-    } else if (projection.length == 0) {
-      skipScanData = true;
-    }
-
-  }
-
-  private void initializeFilter() {
-
-    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
-        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
-            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
-    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
-    for (int i = 0; i < dimLensWithComplex.length; i++) {
-      dimLensWithComplex[i] = Integer.MAX_VALUE;
-    }
 
-    int[] dictionaryColumnCardinality =
-        CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
-    SegmentProperties segmentProperties =
-        new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality);
-    Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
-
-    FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
-    filter = FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties,
-        complexDimensionInfoMap);
-    // for row filter, we need update column index
-    FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
-        carbonTable.getDimensionOrdinalMax());
-
-  }
-
-  private byte[] getSyncMarker(String filePath) throws IOException {
-    CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
-    FileHeader header = headerReader.readHeader();
-    // legacy store does not have this member
-    if (header.isSetCompressor_name()) {
-      compressorName = header.getCompressor_name();
-    } else {
-      compressorName = CompressorFactory.NativeSupportedCompressor.SNAPPY.getName();
-    }
-    return header.getSync_marker();
-  }
-
-  private void initializeAtFirstRow() throws IOException {
-    filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
-    filterRow = new RowImpl();
-    filterRow.setValues(filterValues);
-
-    outputValues = new Object[projection.length];
+  protected void initializeAtFirstRow() throws IOException {
+    super.initializeAtFirstRow();
     outputRow = new GenericInternalRow(outputValues);
-
-    Path file = fileSplit.getPath();
-
-    byte[] syncMarker = getSyncMarker(file.toString());
-
-    FileSystem fs = file.getFileSystem(hadoopConf);
-
-    int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE,
-        CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT));
-
-    FSDataInputStream fileIn = fs.open(file, bufferSize);
-    fileIn.seek(fileSplit.getStart());
-    input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
-        fileSplit.getStart() == 0, compressorName);
-
-    cacheProvider = CacheProvider.getInstance();
-    cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
-    queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
-
     outputSchema = new StructType((StructField[])
         DataTypeUtil.getDataTypeConverter().convertCarbonSchemaToSparkSchema(projection));
   }
@@ -317,6 +77,23 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
     return nextRow();
   }
 
+  @Override public Object getCurrentValue() {
+    if (isVectorReader) {
+      int value = vectorProxy.numRows();
+      if (inputMetricsStats != null) {
+        inputMetricsStats.incrementRecordRead((long) value);
+      }
+
+      return vectorProxy.getColumnarBatch();
+    }
+
+    if (inputMetricsStats != null) {
+      inputMetricsStats.incrementRecordRead(1L);
+    }
+
+    return outputRow;
+  }
+
   /**
    * for vector reader, check next columnar batch
    */
@@ -343,384 +120,53 @@ public class CarbonStreamRecordReader extends RecordReader<Void, Object> {
     return hasNext;
   }
 
-  /**
-   * check next Row
-   */
-  private boolean nextRow() throws IOException {
-    // read row one by one
-    try {
-      boolean hasNext;
-      boolean scanMore = false;
-      do {
-        hasNext = input.hasNext();
-        if (hasNext) {
-          if (skipScanData) {
-            input.nextRow();
-            scanMore = false;
-          } else {
-            if (useRawRow) {
-              // read raw row for streaming handoff which does not require decode raw row
-              readRawRowFromStream();
-            } else {
-              readRowFromStream();
-            }
-            if (null != filter) {
-              scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
-            } else {
-              scanMore = false;
-            }
-          }
-        } else {
-          if (input.nextBlocklet()) {
-            BlockletHeader header = input.readBlockletHeader();
-            if (isScanRequired(header)) {
-              if (skipScanData) {
-                input.skipBlockletData(false);
-              } else {
-                input.readBlockletData(header);
-              }
-            } else {
-              input.skipBlockletData(true);
-            }
-            scanMore = true;
-          } else {
-            isFinished = true;
-            scanMore = false;
+  private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
+    // if filter is null and output projection is empty, use the row number of blocklet header
+    if (skipScanData) {
+      int rowNums = header.getBlocklet_info().getNum_rows();
+      vectorProxy = new CarbonVectorProxy(MemoryMode.OFF_HEAP, outputSchema, rowNums, false);
+      vectorProxy.setNumRows(rowNums);
+      input.skipBlockletData(true);
+      return rowNums > 0;
+    }
+
+    input.readBlockletData(header);
+    vectorProxy =
+        new CarbonVectorProxy(MemoryMode.OFF_HEAP, outputSchema, input.getRowNums(), false);
+    int rowNum = 0;
+    if (null == filter) {
+      while (input.hasNext()) {
+        readRowFromStream();
+        putRowToColumnBatch(rowNum++);
+      }
+    } else {
+      try {
+        while (input.hasNext()) {
+          readRowFromStream();
+          if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
+            putRowToColumnBatch(rowNum++);
           }
         }
-      } while (scanMore);
-      return hasNext;
-    } catch (FilterUnsupportedException e) {
-      throw new IOException("Failed to filter row in detail reader", e);
-    }
-  }
-
-  @Override public Void getCurrentKey() throws IOException, InterruptedException {
-    return null;
-  }
-
-    @Override public Object getCurrentValue() throws IOException, InterruptedException {
-        if (isVectorReader) {
-            int value = vectorProxy.numRows();
-            if (inputMetricsStats != null) {
-                inputMetricsStats.incrementRecordRead((long) value);
-            }
-
-            return vectorProxy.getColumnarBatch();
-        }
-
-    if (inputMetricsStats != null) {
-      inputMetricsStats.incrementRecordRead(1L);
-    }
-
-    return outputRow;
-  }
-
-  private boolean isScanRequired(BlockletHeader header) {
-    if (filter != null && header.getBlocklet_index() != null) {
-      BlockletMinMaxIndex minMaxIndex = CarbonMetadataUtil.convertExternalMinMaxIndex(
-          header.getBlocklet_index().getMin_max_index());
-      if (minMaxIndex != null) {
-        BitSet bitSet = filter
-            .isScanRequired(minMaxIndex.getMaxValues(), minMaxIndex.getMinValues(),
-                minMaxIndex.getIsMinMaxSet());
-        if (bitSet.isEmpty()) {
-          return false;
-        } else {
-          return true;
-        }
+      } catch (FilterUnsupportedException e) {
+        throw new IOException("Failed to filter row in vector reader", e);
       }
     }
-    return true;
+    vectorProxy.setNumRows(rowNum);
+    return rowNum > 0;
   }
 
-    private boolean scanBlockletAndFillVector(BlockletHeader header) throws IOException {
-        // if filter is null and output projection is empty, use the row number of blocklet header
-        if (skipScanData) {
-            int rowNums = header.getBlocklet_info().getNum_rows();
-            vectorProxy = new CarbonVectorProxy(MemoryMode.OFF_HEAP, outputSchema, rowNums, false);
-            vectorProxy.setNumRows(rowNums);
-            input.skipBlockletData(true);
-            return rowNums > 0;
-        }
+  private void putRowToColumnBatch(int rowId) {
+    for (int i = 0; i < projection.length; i++) {
+      Object value = outputValues[i];
+      vectorProxy.getColumnVector(i).putRowToColumnBatch(rowId,value);
 
-        input.readBlockletData(header);
-        vectorProxy =
-          new CarbonVectorProxy(MemoryMode.OFF_HEAP, outputSchema, input.getRowNums(), false);
-        int rowNum = 0;
-        if (null == filter) {
-            while (input.hasNext()) {
-                readRowFromStream();
-                putRowToColumnBatch(rowNum++);
-            }
-        } else {
-            try {
-                while (input.hasNext()) {
-                    readRowFromStream();
-                    if (filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax())) {
-                        putRowToColumnBatch(rowNum++);
-                    }
-                }
-            } catch (FilterUnsupportedException e) {
-                throw new IOException("Failed to filter row in vector reader", e);
-            }
-        }
-        vectorProxy.setNumRows(rowNum);
-        return rowNum > 0;
-    }
-
-  private void readRowFromStream() {
-    input.nextRow();
-    short nullLen = input.readShort();
-    BitSet nullBitSet = allNonNull;
-    if (nullLen > 0) {
-      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
-    }
-    int colCount = 0;
-    // primitive type dimension
-    for (; colCount < isNoDictColumn.length; colCount++) {
-      if (nullBitSet.get(colCount)) {
-        if (isFilterRequired[colCount]) {
-          filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
-        }
-        if (isProjectionRequired[colCount]) {
-          outputValues[projectionMap[colCount]] = null;
-        }
-      } else {
-        if (isNoDictColumn[colCount]) {
-          int v = input.readShort();
-          if (isRequired[colCount]) {
-            byte[] b = input.readBytes(v);
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = b;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] =
-                  DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(b,
-                      storageColumns[colCount].getDataType());
-            }
-          } else {
-            input.skipBytes(v);
-          }
-        } else if (null != directDictionaryGenerators[colCount]) {
-          if (isRequired[colCount]) {
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = input.copy(4);
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] =
-                  directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt());
-            } else {
-              input.skipBytes(4);
-            }
-          } else {
-            input.skipBytes(4);
-          }
-        } else {
-          if (isRequired[colCount]) {
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = input.copy(4);
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = input.readInt();
-            } else {
-              input.skipBytes(4);
-            }
-          } else {
-            input.skipBytes(4);
-          }
-        }
-      }
-    }
-    // complex type dimension
-    for (; colCount < dimensionCount; colCount++) {
-      if (nullBitSet.get(colCount)) {
-        if (isFilterRequired[colCount]) {
-          filterValues[filterMap[colCount]] = null;
-        }
-        if (isProjectionRequired[colCount]) {
-          outputValues[projectionMap[colCount]] = null;
-        }
-      } else {
-        short v = input.readShort();
-        if (isRequired[colCount]) {
-          byte[] b = input.readBytes(v);
-          if (isFilterRequired[colCount]) {
-            filterValues[filterMap[colCount]] = b;
-          }
-          if (isProjectionRequired[colCount]) {
-            outputValues[projectionMap[colCount]] = queryTypes[colCount]
-                .getDataBasedOnDataType(ByteBuffer.wrap(b));
-          }
-        } else {
-          input.skipBytes(v);
-        }
-      }
-    }
-    // measure
-    DataType dataType;
-    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
-      if (nullBitSet.get(colCount)) {
-        if (isFilterRequired[colCount]) {
-          filterValues[filterMap[colCount]] = null;
-        }
-        if (isProjectionRequired[colCount]) {
-          outputValues[projectionMap[colCount]] = null;
-        }
-      } else {
-        dataType = measureDataTypes[msrCount];
-        if (dataType == DataTypes.BOOLEAN) {
-          if (isRequired[colCount]) {
-            boolean v = input.readBoolean();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(1);
-          }
-        } else if (dataType == DataTypes.SHORT) {
-          if (isRequired[colCount]) {
-            short v = input.readShort();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(2);
-          }
-        } else if (dataType == DataTypes.INT) {
-          if (isRequired[colCount]) {
-            int v = input.readInt();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(4);
-          }
-        } else if (dataType == DataTypes.LONG) {
-          if (isRequired[colCount]) {
-            long v = input.readLong();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(8);
-          }
-        } else if (dataType == DataTypes.DOUBLE) {
-          if (isRequired[colCount]) {
-            double v = input.readDouble();
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] = v;
-            }
-          } else {
-            input.skipBytes(8);
-          }
-        } else if (DataTypes.isDecimal(dataType)) {
-          int len = input.readShort();
-          if (isRequired[colCount]) {
-            BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
-            if (isFilterRequired[colCount]) {
-              filterValues[filterMap[colCount]] = v;
-            }
-            if (isProjectionRequired[colCount]) {
-              outputValues[projectionMap[colCount]] =
-                  DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(v);
-            }
-          } else {
-            input.skipBytes(len);
-          }
-        }
-      }
     }
   }
 
-  private void readRawRowFromStream() {
-    input.nextRow();
-    short nullLen = input.readShort();
-    BitSet nullBitSet = allNonNull;
-    if (nullLen > 0) {
-      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
-    }
-    int colCount = 0;
-    // primitive type dimension
-    for (; colCount < isNoDictColumn.length; colCount++) {
-      if (nullBitSet.get(colCount)) {
-        outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
-      } else {
-        if (isNoDictColumn[colCount]) {
-          int v = input.readShort();
-          outputValues[colCount] = input.readBytes(v);
-        } else {
-          outputValues[colCount] = input.readInt();
-        }
-      }
-    }
-    // complex type dimension
-    for (; colCount < dimensionCount; colCount++) {
-      if (nullBitSet.get(colCount)) {
-        outputValues[colCount] = null;
-      } else {
-        short v = input.readShort();
-        outputValues[colCount] = input.readBytes(v);
-      }
-    }
-    // measure
-    DataType dataType;
-    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
-      if (nullBitSet.get(colCount)) {
-        outputValues[colCount] = null;
-      } else {
-        dataType = measureDataTypes[msrCount];
-        if (dataType == DataTypes.BOOLEAN) {
-          outputValues[colCount] = input.readBoolean();
-        } else if (dataType == DataTypes.SHORT) {
-          outputValues[colCount] = input.readShort();
-        } else if (dataType == DataTypes.INT) {
-          outputValues[colCount] = input.readInt();
-        } else if (dataType == DataTypes.LONG) {
-          outputValues[colCount] = input.readLong();
-        } else if (dataType == DataTypes.DOUBLE) {
-          outputValues[colCount] = input.readDouble();
-        } else if (DataTypes.isDecimal(dataType)) {
-          int len = input.readShort();
-          outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
-        }
-      }
+  @Override public void close() throws IOException {
+    super.close();
+    if (null != vectorProxy) {
+      vectorProxy.close();
     }
   }
-
-    private void putRowToColumnBatch(int rowId) {
-        for (int i = 0; i < projection.length; i++) {
-            Object value = outputValues[i];
-            vectorProxy.getColumnVector(i).putRowToColumnBatch(rowId,value);
-
-        }
-    }
-
-    @Override public float getProgress() throws IOException, InterruptedException {
-        return 0;
-    }
-
-    @Override public void close() throws IOException {
-        if (null != input) {
-            input.close();
-        }
-        if (null != vectorProxy) {
-            vectorProxy.close();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
deleted file mode 100644
index 835b115..0000000
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamInputFormat.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.streaming;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
-import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
-import org.apache.carbondata.core.scan.complextypes.StructQueryType;
-import org.apache.carbondata.core.scan.filter.GenericQueryType;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.hadoop.InputMetricsStats;
-import org.apache.carbondata.streaming.CarbonStreamUtils;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * Stream input format
- */
-public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> {
-
-  public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size";
-  public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
-  public static final String STREAM_RECORD_READER_INSTANCE =
-      "org.apache.carbondata.stream.CarbonStreamRecordReader";
-  // return raw row for handoff
-  private boolean useRawRow = false;
-
-  public void setUseRawRow(boolean useRawRow) {
-    this.useRawRow = useRawRow;
-  }
-
-  public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
-    this.inputMetricsStats = inputMetricsStats;
-  }
-
-  public void setIsVectorReader(boolean vectorReader) {
-    isVectorReader = vectorReader;
-  }
-
-  public void setModel(QueryModel model) {
-    this.model = model;
-  }
-
-  // InputMetricsStats
-  private InputMetricsStats inputMetricsStats;
-  // vector reader
-  private boolean isVectorReader;
-  private QueryModel model;
-
-  @Override
-  public RecordReader<Void, Object> createRecordReader(InputSplit split, TaskAttemptContext context)
-      throws IOException, InterruptedException {
-    try {
-      Constructor cons = CarbonStreamUtils
-          .getConstructorWithReflection(STREAM_RECORD_READER_INSTANCE, boolean.class,
-              InputMetricsStats.class, QueryModel.class, boolean.class);
-      return (RecordReader) CarbonStreamUtils
-          .getInstanceWithReflection(cons, isVectorReader, inputMetricsStats, model, useRawRow);
-
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-  public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable,
-      CarbonColumn[] carbonColumns, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache)
-      throws IOException {
-    GenericQueryType[] queryTypes = new GenericQueryType[carbonColumns.length];
-    for (int i = 0; i < carbonColumns.length; i++) {
-      if (carbonColumns[i].isComplex()) {
-        if (DataTypes.isArrayType(carbonColumns[i].getDataType())) {
-          queryTypes[i] = new ArrayQueryType(carbonColumns[i].getColName(),
-              carbonColumns[i].getColName(), i);
-        } else if (DataTypes.isStructType(carbonColumns[i].getDataType())) {
-          queryTypes[i] = new StructQueryType(carbonColumns[i].getColName(),
-              carbonColumns[i].getColName(), i);
-        } else {
-          throw new UnsupportedOperationException(
-              carbonColumns[i].getDataType().getName() + " is not supported");
-        }
-
-        fillChildren(carbontable, queryTypes[i], (CarbonDimension) carbonColumns[i], i, cache);
-      }
-    }
-
-    return queryTypes;
-  }
-
-  private static void fillChildren(CarbonTable carbontable, GenericQueryType parentQueryType,
-      CarbonDimension dimension, int parentBlockIndex,
-      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) throws IOException {
-    for (int i = 0; i < dimension.getNumberOfChild(); i++) {
-      CarbonDimension child = dimension.getListOfChildDimensions().get(i);
-      DataType dataType = child.getDataType();
-      GenericQueryType queryType = null;
-      if (DataTypes.isArrayType(dataType)) {
-        queryType =
-            new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
-
-      } else if (DataTypes.isStructType(dataType)) {
-        queryType =
-            new StructQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
-        parentQueryType.addChildren(queryType);
-      } else {
-        boolean isDirectDictionary =
-            CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY);
-        boolean isDictionary =
-            CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DICTIONARY);
-        Dictionary dictionary = null;
-        if (isDictionary) {
-          String dictionaryPath = carbontable.getTableInfo().getFactTable().getTableProperties()
-              .get(CarbonCommonConstants.DICTIONARY_PATH);
-          DictionaryColumnUniqueIdentifier dictionarIdentifier =
-              new DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(),
-                  child.getColumnIdentifier(), child.getDataType(), dictionaryPath);
-          dictionary = cache.get(dictionarIdentifier);
-        }
-        queryType =
-            new PrimitiveQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex,
-                child.getDataType(), 4, dictionary,
-                isDirectDictionary);
-      }
-      parentQueryType.addChildren(queryType);
-      if (child.getNumberOfChild() > 0) {
-        fillChildren(carbontable, queryType, child, parentBlockIndex, cache);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java
deleted file mode 100644
index a7940f9..0000000
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamUtils.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.streaming;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-/**
- * Util class which does utility function for stream module
- */
-public class CarbonStreamUtils {
-
-  public static Constructor getConstructorWithReflection(String className,
-                                                           Class<?>... parameterTypes)
-            throws ClassNotFoundException, NoSuchMethodException {
-    Class loadedClass = Class.forName(className);
-    return loadedClass.getConstructor(parameterTypes);
-
-  }
-
-  public static Object getInstanceWithReflection(Constructor cons, Object... initargs) throws
-          IllegalAccessException,
-          InvocationTargetException, InstantiationException {
-    return cons.newInstance(initargs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
deleted file mode 100644
index 0467fe4..0000000
--- a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletReader.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.streaming;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.format.BlockletHeader;
-
-/**
- * stream blocklet reader
- */
-public class StreamBlockletReader {
-
-  private byte[] buffer;
-  private int offset;
-  private final byte[] syncMarker;
-  private final byte[] syncBuffer;
-  private final int syncLen;
-  private long pos = 0;
-  private final InputStream in;
-  private final long limitStart;
-  private final long limitEnd;
-  private boolean isAlreadySync = false;
-  private Compressor compressor;
-  private int rowNums = 0;
-  private int rowIndex = 0;
-  private boolean isHeaderPresent;
-
-  public StreamBlockletReader(byte[] syncMarker, InputStream in, long limit,
-      boolean isHeaderPresent, String compressorName) {
-    this.syncMarker = syncMarker;
-    syncLen = syncMarker.length;
-    syncBuffer = new byte[syncLen];
-    this.in = in;
-    limitStart = limit;
-    limitEnd = limitStart + syncLen;
-    this.isHeaderPresent = isHeaderPresent;
-    this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
-  }
-
-  private void ensureCapacity(int capacity) {
-    if (buffer == null || capacity > buffer.length) {
-      buffer = new byte[capacity];
-    }
-  }
-
-  /**
-   * find the first position of sync_marker in input stream
-   */
-  private boolean sync() throws IOException {
-    if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
-      return false;
-    }
-    boolean skipHeader = false;
-    for (int i = 0; i < limitStart; i++) {
-      int j = 0;
-      for (; j < syncLen; j++) {
-        if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break;
-      }
-      if (syncLen == j) {
-        if (isHeaderPresent) {
-          if (skipHeader) {
-            return true;
-          } else {
-            skipHeader = true;
-          }
-        } else {
-          return true;
-        }
-      }
-      int value = in.read();
-      if (-1 == value) {
-        return false;
-      }
-      syncBuffer[i % syncLen] = (byte) value;
-      pos++;
-    }
-    return false;
-  }
-
-  public BlockletHeader readBlockletHeader() throws IOException {
-    int len = readIntFromStream();
-    byte[] b = new byte[len];
-    if (!readBytesFromStream(b, 0, len)) {
-      throw new EOFException("Failed to read blocklet header");
-    }
-    BlockletHeader header = CarbonUtil.readBlockletHeader(b);
-    rowNums = header.getBlocklet_info().getNum_rows();
-    rowIndex = 0;
-    return header;
-  }
-
-  public void readBlockletData(BlockletHeader header) throws IOException {
-    ensureCapacity(header.getBlocklet_length());
-    offset = 0;
-    int len = readIntFromStream();
-    byte[] b = new byte[len];
-    if (!readBytesFromStream(b, 0, len)) {
-      throw new EOFException("Failed to read blocklet data");
-    }
-    compressor.rawUncompress(b, buffer);
-  }
-
-  public void skipBlockletData(boolean reset) throws IOException {
-    int len = readIntFromStream();
-    skip(len);
-    pos += len;
-    if (reset) {
-      this.rowNums = 0;
-      this.rowIndex = 0;
-    }
-  }
-
-  private void skip(int len) throws IOException {
-    long remaining = len;
-    do {
-      long skipLen = in.skip(remaining);
-      remaining -= skipLen;
-    } while (remaining > 0);
-  }
-
-  /**
-   * find the next blocklet
-   */
-  public boolean nextBlocklet() throws IOException {
-    if (pos >= limitStart) {
-      return false;
-    }
-    if (isAlreadySync) {
-      if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
-        return false;
-      }
-    } else {
-      isAlreadySync = true;
-      if (!sync()) {
-        return false;
-      }
-    }
-
-    return pos < limitEnd;
-  }
-
-  public boolean hasNext() throws IOException {
-    return rowIndex < rowNums;
-  }
-
-  public void nextRow() {
-    rowIndex++;
-  }
-
-  public int readIntFromStream() throws IOException {
-    int ch1 = in.read();
-    int ch2 = in.read();
-    int ch3 = in.read();
-    int ch4 = in.read();
-    if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException();
-    pos += 4;
-    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
-  }
-
-  /**
-   * Reads <code>len</code> bytes of data from the input stream into
-   * an array of bytes.
-   * @return <code>true</code> if reading data successfully, or
-   * <code>false</code> if there is no more data because the end of the stream has been reached.
-   */
-  public boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException {
-    int readLen = in.read(b, offset, len);
-    if (readLen < 0) {
-      return false;
-    }
-    pos += readLen;
-    if (readLen < len) {
-      return readBytesFromStream(b, offset + readLen, len - readLen);
-    } else {
-      return true;
-    }
-  }
-
-  public boolean readBoolean() {
-    return (buffer[offset++]) != 0;
-  }
-
-  public short readShort() {
-    short v =  (short) ((buffer[offset + 1] & 255) +
-        ((buffer[offset]) << 8));
-    offset += 2;
-    return v;
-  }
-
-  public byte[] copy(int len) {
-    byte[] b = new byte[len];
-    System.arraycopy(buffer, offset, b, 0, len);
-    return b;
-  }
-
-  public int readInt() {
-    int v = ((buffer[offset + 3] & 255) +
-        ((buffer[offset + 2] & 255) << 8) +
-        ((buffer[offset + 1] & 255) << 16) +
-        ((buffer[offset]) << 24));
-    offset += 4;
-    return v;
-  }
-
-  public long readLong() {
-    long v = ((long)(buffer[offset + 7] & 255)) +
-        ((long) (buffer[offset + 6] & 255) << 8) +
-        ((long) (buffer[offset + 5] & 255) << 16) +
-        ((long) (buffer[offset + 4] & 255) << 24) +
-        ((long) (buffer[offset + 3] & 255) << 32) +
-        ((long) (buffer[offset + 2] & 255) << 40) +
-        ((long) (buffer[offset + 1] & 255) << 48) +
-        ((long) (buffer[offset]) << 56);
-    offset += 8;
-    return v;
-  }
-
-  public double readDouble() {
-    return Double.longBitsToDouble(readLong());
-  }
-
-  public byte[] readBytes(int len) {
-    byte[] b = new byte[len];
-    System.arraycopy(buffer, offset, b, 0, len);
-    offset += len;
-    return b;
-  }
-
-  public void skipBytes(int len) {
-    offset += len;
-  }
-
-  public int getRowNums() {
-    return rowNums;
-  }
-
-  public void close() {
-    CarbonUtil.closeStreams(in);
-  }
-}


[2/2] carbondata git commit: [CARBONDATA-3220] Support presto to read stream segment data

Posted by ra...@apache.org.
[CARBONDATA-3220] Support presto to read stream segment data

Support presto read the streaming table

re-factory old CarbonStreamRecordReader to reuse code for presto
change CarbondataPageSource to support read streaming data by StreamRecordReader

This closes #3001


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d78db8f6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d78db8f6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d78db8f6

Branch: refs/heads/master
Commit: d78db8f6bd714e6a5111812b0c0473418201b23c
Parents: 8e6def9
Author: QiangCai <qi...@qq.com>
Authored: Wed Jan 9 22:06:02 2019 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Jan 10 16:59:54 2019 +0530

----------------------------------------------------------------------
 .../carbondata/hadoop/CarbonInputSplit.java     |   4 +-
 .../hadoop/stream/CarbonStreamInputFormat.java  | 159 +++++
 .../hadoop/stream/CarbonStreamUtils.java        |  40 ++
 .../hadoop/stream/StreamBlockletReader.java     | 261 +++++++
 .../hadoop/stream/StreamRecordReader.java       | 614 +++++++++++++++++
 .../carbondata/presto/CarbonVectorBatch.java    |  10 +-
 .../carbondata/presto/CarbondataPageSource.java | 318 ++++++++-
 .../presto/CarbondataPageSourceProvider.java    | 142 +---
 .../presto/impl/CarbonLocalInputSplit.java      |  44 +-
 .../presto/impl/CarbonLocalMultiBlockSplit.java |   4 +
 .../presto/impl/CarbonTableReader.java          |   3 +-
 .../presto/readers/BooleanStreamReader.java     |  11 +
 .../readers/DecimalSliceStreamReader.java       |  12 +
 .../presto/readers/DoubleStreamReader.java      |  12 +
 .../presto/readers/IntegerStreamReader.java     |   7 +
 .../presto/readers/LongStreamReader.java        |  12 +
 .../presto/readers/ShortStreamReader.java       |  12 +
 .../presto/readers/SliceStreamReader.java       |  13 +
 .../presto/readers/TimestampStreamReader.java   |  12 +
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   2 +-
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |   2 +-
 .../stream/CarbonStreamRecordReader.java        | 684 ++-----------------
 .../streaming/CarbonStreamInputFormat.java      | 160 -----
 .../carbondata/streaming/CarbonStreamUtils.java |  40 --
 .../streaming/StreamBlockletReader.java         | 261 -------
 25 files changed, 1595 insertions(+), 1244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index de2451b..bcf703c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -195,7 +195,9 @@ public class CarbonInputSplit extends FileSplit
                 blockletInfos, split.getVersion(), split.getDeleteDeltaFiles());
         blockInfo.setDetailInfo(split.getDetailInfo());
         blockInfo.setDataMapWriterPath(split.dataMapWritePath);
-        blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset());
+        if (split.getDetailInfo() != null) {
+          blockInfo.setBlockOffset(split.getDetailInfo().getBlockFooterOffset());
+        }
         tableBlockInfoList.add(blockInfo);
       } catch (IOException e) {
         throw new RuntimeException("fail to get location of split: " + split, e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java
new file mode 100644
index 0000000..e4819ee
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamInputFormat.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.stream;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
+import org.apache.carbondata.core.scan.complextypes.PrimitiveQueryType;
+import org.apache.carbondata.core.scan.complextypes.StructQueryType;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.InputMetricsStats;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * Stream input format
+ */
+public class CarbonStreamInputFormat extends FileInputFormat<Void, Object> {
+
+  public static final String READ_BUFFER_SIZE = "carbon.stream.read.buffer.size";
+  public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
+  public static final String STREAM_RECORD_READER_INSTANCE =
+      "org.apache.carbondata.stream.CarbonStreamRecordReader";
+  // return raw row for handoff
+  private boolean useRawRow = false;
+
+  public void setUseRawRow(boolean useRawRow) {
+    this.useRawRow = useRawRow;
+  }
+
+  public void setInputMetricsStats(InputMetricsStats inputMetricsStats) {
+    this.inputMetricsStats = inputMetricsStats;
+  }
+
+  public void setIsVectorReader(boolean vectorReader) {
+    isVectorReader = vectorReader;
+  }
+
+  public void setModel(QueryModel model) {
+    this.model = model;
+  }
+
+  // InputMetricsStats
+  private InputMetricsStats inputMetricsStats;
+  // vector reader
+  private boolean isVectorReader;
+  private QueryModel model;
+
+  @Override
+  public RecordReader<Void, Object> createRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException {
+    try {
+      Constructor cons = CarbonStreamUtils
+          .getConstructorWithReflection(STREAM_RECORD_READER_INSTANCE, boolean.class,
+              InputMetricsStats.class, QueryModel.class, boolean.class);
+      return (RecordReader) CarbonStreamUtils
+          .getInstanceWithReflection(cons, isVectorReader, inputMetricsStats, model, useRawRow);
+
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  public static GenericQueryType[] getComplexDimensions(CarbonTable carbontable,
+      CarbonColumn[] carbonColumns, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache)
+      throws IOException {
+    GenericQueryType[] queryTypes = new GenericQueryType[carbonColumns.length];
+    for (int i = 0; i < carbonColumns.length; i++) {
+      if (carbonColumns[i].isComplex()) {
+        if (DataTypes.isArrayType(carbonColumns[i].getDataType())) {
+          queryTypes[i] = new ArrayQueryType(carbonColumns[i].getColName(),
+              carbonColumns[i].getColName(), i);
+        } else if (DataTypes.isStructType(carbonColumns[i].getDataType())) {
+          queryTypes[i] = new StructQueryType(carbonColumns[i].getColName(),
+              carbonColumns[i].getColName(), i);
+        } else {
+          throw new UnsupportedOperationException(
+              carbonColumns[i].getDataType().getName() + " is not supported");
+        }
+
+        fillChildren(carbontable, queryTypes[i], (CarbonDimension) carbonColumns[i], i, cache);
+      }
+    }
+
+    return queryTypes;
+  }
+
+  private static void fillChildren(CarbonTable carbontable, GenericQueryType parentQueryType,
+      CarbonDimension dimension, int parentBlockIndex,
+      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache) throws IOException {
+    for (int i = 0; i < dimension.getNumberOfChild(); i++) {
+      CarbonDimension child = dimension.getListOfChildDimensions().get(i);
+      DataType dataType = child.getDataType();
+      GenericQueryType queryType = null;
+      if (DataTypes.isArrayType(dataType)) {
+        queryType =
+            new ArrayQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
+
+      } else if (DataTypes.isStructType(dataType)) {
+        queryType =
+            new StructQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex);
+        parentQueryType.addChildren(queryType);
+      } else {
+        boolean isDirectDictionary =
+            CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DIRECT_DICTIONARY);
+        boolean isDictionary =
+            CarbonUtil.hasEncoding(child.getEncoder(), Encoding.DICTIONARY);
+        Dictionary dictionary = null;
+        if (isDictionary) {
+          String dictionaryPath = carbontable.getTableInfo().getFactTable().getTableProperties()
+              .get(CarbonCommonConstants.DICTIONARY_PATH);
+          DictionaryColumnUniqueIdentifier dictionarIdentifier =
+              new DictionaryColumnUniqueIdentifier(carbontable.getAbsoluteTableIdentifier(),
+                  child.getColumnIdentifier(), child.getDataType(), dictionaryPath);
+          dictionary = cache.get(dictionarIdentifier);
+        }
+        queryType =
+            new PrimitiveQueryType(child.getColName(), dimension.getColName(), ++parentBlockIndex,
+                child.getDataType(), 4, dictionary,
+                isDirectDictionary);
+      }
+      parentQueryType.addChildren(queryType);
+      if (child.getNumberOfChild() > 0) {
+        fillChildren(carbontable, queryType, child, parentBlockIndex, cache);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java
new file mode 100644
index 0000000..78669e7
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/CarbonStreamUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop.stream;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Util class which does utility function for stream module
+ */
+public class CarbonStreamUtils {
+
+  public static Constructor getConstructorWithReflection(String className,
+                                                           Class<?>... parameterTypes)
+            throws ClassNotFoundException, NoSuchMethodException {
+    Class loadedClass = Class.forName(className);
+    return loadedClass.getConstructor(parameterTypes);
+
+  }
+
+  public static Object getInstanceWithReflection(Constructor cons, Object... initargs) throws
+          IllegalAccessException,
+          InvocationTargetException, InstantiationException {
+    return cons.newInstance(initargs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java
new file mode 100644
index 0000000..dbcf72d
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamBlockletReader.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.stream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.BlockletHeader;
+
+/**
+ * stream blocklet reader
+ */
+public class StreamBlockletReader {
+
+  private byte[] buffer;
+  private int offset;
+  private final byte[] syncMarker;
+  private final byte[] syncBuffer;
+  private final int syncLen;
+  private long pos = 0;
+  private final InputStream in;
+  private final long limitStart;
+  private final long limitEnd;
+  private boolean isAlreadySync = false;
+  private Compressor compressor;
+  private int rowNums = 0;
+  private int rowIndex = 0;
+  private boolean isHeaderPresent;
+
+  public StreamBlockletReader(byte[] syncMarker, InputStream in, long limit,
+      boolean isHeaderPresent, String compressorName) {
+    this.syncMarker = syncMarker;
+    syncLen = syncMarker.length;
+    syncBuffer = new byte[syncLen];
+    this.in = in;
+    limitStart = limit;
+    limitEnd = limitStart + syncLen;
+    this.isHeaderPresent = isHeaderPresent;
+    this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
+  }
+
+  private void ensureCapacity(int capacity) {
+    if (buffer == null || capacity > buffer.length) {
+      buffer = new byte[capacity];
+    }
+  }
+
+  /**
+   * find the first position of sync_marker in input stream
+   */
+  private boolean sync() throws IOException {
+    if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
+      return false;
+    }
+    boolean skipHeader = false;
+    for (int i = 0; i < limitStart; i++) {
+      int j = 0;
+      for (; j < syncLen; j++) {
+        if (syncMarker[j] != syncBuffer[(i + j) % syncLen]) break;
+      }
+      if (syncLen == j) {
+        if (isHeaderPresent) {
+          if (skipHeader) {
+            return true;
+          } else {
+            skipHeader = true;
+          }
+        } else {
+          return true;
+        }
+      }
+      int value = in.read();
+      if (-1 == value) {
+        return false;
+      }
+      syncBuffer[i % syncLen] = (byte) value;
+      pos++;
+    }
+    return false;
+  }
+
+  public BlockletHeader readBlockletHeader() throws IOException {
+    int len = readIntFromStream();
+    byte[] b = new byte[len];
+    if (!readBytesFromStream(b, 0, len)) {
+      throw new EOFException("Failed to read blocklet header");
+    }
+    BlockletHeader header = CarbonUtil.readBlockletHeader(b);
+    rowNums = header.getBlocklet_info().getNum_rows();
+    rowIndex = 0;
+    return header;
+  }
+
+  public void readBlockletData(BlockletHeader header) throws IOException {
+    ensureCapacity(header.getBlocklet_length());
+    offset = 0;
+    int len = readIntFromStream();
+    byte[] b = new byte[len];
+    if (!readBytesFromStream(b, 0, len)) {
+      throw new EOFException("Failed to read blocklet data");
+    }
+    compressor.rawUncompress(b, buffer);
+  }
+
+  public void skipBlockletData(boolean reset) throws IOException {
+    int len = readIntFromStream();
+    skip(len);
+    pos += len;
+    if (reset) {
+      this.rowNums = 0;
+      this.rowIndex = 0;
+    }
+  }
+
+  private void skip(int len) throws IOException {
+    long remaining = len;
+    do {
+      long skipLen = in.skip(remaining);
+      remaining -= skipLen;
+    } while (remaining > 0);
+  }
+
+  /**
+   * find the next blocklet
+   */
+  public boolean nextBlocklet() throws IOException {
+    if (pos >= limitStart) {
+      return false;
+    }
+    if (isAlreadySync) {
+      if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
+        return false;
+      }
+    } else {
+      isAlreadySync = true;
+      if (!sync()) {
+        return false;
+      }
+    }
+
+    return pos < limitEnd;
+  }
+
+  public boolean hasNext() throws IOException {
+    return rowIndex < rowNums;
+  }
+
+  public void nextRow() {
+    rowIndex++;
+  }
+
+  public int readIntFromStream() throws IOException {
+    int ch1 = in.read();
+    int ch2 = in.read();
+    int ch3 = in.read();
+    int ch4 = in.read();
+    if ((ch1 | ch2 | ch3 | ch4) < 0) throw new EOFException();
+    pos += 4;
+    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+  }
+
+  /**
+   * Reads <code>len</code> bytes of data from the input stream into
+   * an array of bytes.
+   * @return <code>true</code> if reading data successfully, or
+   * <code>false</code> if there is no more data because the end of the stream has been reached.
+   */
+  public boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException {
+    int readLen = in.read(b, offset, len);
+    if (readLen < 0) {
+      return false;
+    }
+    pos += readLen;
+    if (readLen < len) {
+      return readBytesFromStream(b, offset + readLen, len - readLen);
+    } else {
+      return true;
+    }
+  }
+
+  public boolean readBoolean() {
+    return (buffer[offset++]) != 0;
+  }
+
+  public short readShort() {
+    short v =  (short) ((buffer[offset + 1] & 255) +
+        ((buffer[offset]) << 8));
+    offset += 2;
+    return v;
+  }
+
+  public byte[] copy(int len) {
+    byte[] b = new byte[len];
+    System.arraycopy(buffer, offset, b, 0, len);
+    return b;
+  }
+
+  public int readInt() {
+    int v = ((buffer[offset + 3] & 255) +
+        ((buffer[offset + 2] & 255) << 8) +
+        ((buffer[offset + 1] & 255) << 16) +
+        ((buffer[offset]) << 24));
+    offset += 4;
+    return v;
+  }
+
+  public long readLong() {
+    long v = ((long)(buffer[offset + 7] & 255)) +
+        ((long) (buffer[offset + 6] & 255) << 8) +
+        ((long) (buffer[offset + 5] & 255) << 16) +
+        ((long) (buffer[offset + 4] & 255) << 24) +
+        ((long) (buffer[offset + 3] & 255) << 32) +
+        ((long) (buffer[offset + 2] & 255) << 40) +
+        ((long) (buffer[offset + 1] & 255) << 48) +
+        ((long) (buffer[offset]) << 56);
+    offset += 8;
+    return v;
+  }
+
+  public double readDouble() {
+    return Double.longBitsToDouble(readLong());
+  }
+
+  public byte[] readBytes(int len) {
+    byte[] b = new byte[len];
+    System.arraycopy(buffer, offset, b, 0, len);
+    offset += len;
+    return b;
+  }
+
+  public void skipBytes(int len) {
+    offset += len;
+  }
+
+  public int getRowNums() {
+    return rowNums;
+  }
+
+  public void close() {
+    CarbonUtil.closeStreams(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
new file mode 100644
index 0000000..75e36be
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.stream;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonHeaderReader;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.intf.RowImpl;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.format.BlockletHeader;
+import org.apache.carbondata.format.FileHeader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * Stream row record reader
+ */
+public class StreamRecordReader extends RecordReader<Void, Object> {
+
+  // metadata
+  protected CarbonTable carbonTable;
+  private CarbonColumn[] storageColumns;
+  private boolean[] isRequired;
+  private DataType[] measureDataTypes;
+  private int dimensionCount;
+  private int measureCount;
+
+  // input
+  private FileSplit fileSplit;
+  private Configuration hadoopConf;
+  protected StreamBlockletReader input;
+  protected boolean isFirstRow = true;
+  protected QueryModel model;
+
+  // decode data
+  private BitSet allNonNull;
+  private boolean[] isNoDictColumn;
+  private DirectDictionaryGenerator[] directDictionaryGenerators;
+  private CacheProvider cacheProvider;
+  private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
+  private GenericQueryType[] queryTypes;
+  private String compressorName;
+
+  // vectorized reader
+  protected boolean isFinished = false;
+
+  // filter
+  protected FilterExecuter filter;
+  private boolean[] isFilterRequired;
+  private Object[] filterValues;
+  protected RowIntf filterRow;
+  private int[] filterMap;
+
+  // output
+  protected CarbonColumn[] projection;
+  private boolean[] isProjectionRequired;
+  private int[] projectionMap;
+  protected Object[] outputValues;
+
+  // empty project, null filter
+  protected boolean skipScanData;
+
+  // return raw row for handoff
+  private boolean useRawRow = false;
+
+  public StreamRecordReader(QueryModel mdl, boolean useRawRow) {
+    this.model = mdl;
+    this.useRawRow = useRawRow;
+
+  }
+
+  @Override public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException {
+    // input
+    if (split instanceof CarbonInputSplit) {
+      fileSplit = (CarbonInputSplit) split;
+    } else if (split instanceof CarbonMultiBlockSplit) {
+      fileSplit = ((CarbonMultiBlockSplit) split).getAllSplits().get(0);
+    } else {
+      fileSplit = (FileSplit) split;
+    }
+
+    // metadata
+    hadoopConf = context.getConfiguration();
+    if (model == null) {
+      CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+      model = format.createQueryModel(split, context);
+    }
+    carbonTable = model.getTable();
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
+    dimensionCount = dimensions.size();
+    List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(carbonTable.getTableName());
+    measureCount = measures.size();
+    List<CarbonColumn> carbonColumnList =
+        carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName());
+    storageColumns = carbonColumnList.toArray(new CarbonColumn[carbonColumnList.size()]);
+    isNoDictColumn = CarbonDataProcessorUtil.getNoDictionaryMapping(storageColumns);
+    directDictionaryGenerators = new DirectDictionaryGenerator[storageColumns.length];
+    for (int i = 0; i < storageColumns.length; i++) {
+      if (storageColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        directDictionaryGenerators[i] = DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(storageColumns[i].getDataType());
+      }
+    }
+    measureDataTypes = new DataType[measureCount];
+    for (int i = 0; i < measureCount; i++) {
+      measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType();
+    }
+
+    // decode data
+    allNonNull = new BitSet(storageColumns.length);
+    projection = model.getProjectionColumns();
+
+    isRequired = new boolean[storageColumns.length];
+    boolean[] isFiltlerDimensions = model.getIsFilterDimensions();
+    boolean[] isFiltlerMeasures = model.getIsFilterMeasures();
+    isFilterRequired = new boolean[storageColumns.length];
+    filterMap = new int[storageColumns.length];
+    for (int i = 0; i < storageColumns.length; i++) {
+      if (storageColumns[i].isDimension()) {
+        if (isFiltlerDimensions[storageColumns[i].getOrdinal()]) {
+          isRequired[i] = true;
+          isFilterRequired[i] = true;
+          filterMap[i] = storageColumns[i].getOrdinal();
+        }
+      } else {
+        if (isFiltlerMeasures[storageColumns[i].getOrdinal()]) {
+          isRequired[i] = true;
+          isFilterRequired[i] = true;
+          filterMap[i] = carbonTable.getDimensionOrdinalMax() + storageColumns[i].getOrdinal();
+        }
+      }
+    }
+
+    isProjectionRequired = new boolean[storageColumns.length];
+    projectionMap = new int[storageColumns.length];
+    for (int j = 0; j < projection.length; j++) {
+      for (int i = 0; i < storageColumns.length; i++) {
+        if (storageColumns[i].getColName().equals(projection[j].getColName())) {
+          isRequired[i] = true;
+          isProjectionRequired[i] = true;
+          projectionMap[i] = j;
+          break;
+        }
+      }
+    }
+
+    // initialize filter
+    if (null != model.getFilterExpressionResolverTree()) {
+      initializeFilter();
+    } else if (projection.length == 0) {
+      skipScanData = true;
+    }
+
+  }
+
+  private void initializeFilter() {
+
+    List<ColumnSchema> wrapperColumnSchemaList = CarbonUtil
+        .getColumnSchemaList(carbonTable.getDimensionByTableName(carbonTable.getTableName()),
+            carbonTable.getMeasureByTableName(carbonTable.getTableName()));
+    int[] dimLensWithComplex = new int[wrapperColumnSchemaList.size()];
+    for (int i = 0; i < dimLensWithComplex.length; i++) {
+      dimLensWithComplex[i] = Integer.MAX_VALUE;
+    }
+
+    int[] dictionaryColumnCardinality =
+        CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchemaList);
+    SegmentProperties segmentProperties =
+        new SegmentProperties(wrapperColumnSchemaList, dictionaryColumnCardinality);
+    Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>();
+
+    FilterResolverIntf resolverIntf = model.getFilterExpressionResolverTree();
+    filter =
+        FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, complexDimensionInfoMap);
+    // for row filter, we need update column index
+    FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(),
+        carbonTable.getDimensionOrdinalMax());
+
+  }
+
+  private byte[] getSyncMarker(String filePath) throws IOException {
+    CarbonHeaderReader headerReader = new CarbonHeaderReader(filePath);
+    FileHeader header = headerReader.readHeader();
+    // legacy store does not have this member
+    if (header.isSetCompressor_name()) {
+      compressorName = header.getCompressor_name();
+    } else {
+      compressorName = CompressorFactory.NativeSupportedCompressor.SNAPPY.getName();
+    }
+    return header.getSync_marker();
+  }
+
+  protected void initializeAtFirstRow() throws IOException {
+    filterValues = new Object[carbonTable.getDimensionOrdinalMax() + measureCount];
+    filterRow = new RowImpl();
+    filterRow.setValues(filterValues);
+
+    outputValues = new Object[projection.length];
+
+    Path file = fileSplit.getPath();
+
+    byte[] syncMarker = getSyncMarker(file.toString());
+
+    FileSystem fs = file.getFileSystem(hadoopConf);
+
+    int bufferSize = Integer.parseInt(hadoopConf.get(CarbonStreamInputFormat.READ_BUFFER_SIZE,
+        CarbonStreamInputFormat.READ_BUFFER_SIZE_DEFAULT));
+
+    FSDataInputStream fileIn = fs.open(file, bufferSize);
+    fileIn.seek(fileSplit.getStart());
+    input = new StreamBlockletReader(syncMarker, fileIn, fileSplit.getLength(),
+        fileSplit.getStart() == 0, compressorName);
+
+    cacheProvider = CacheProvider.getInstance();
+    cache = cacheProvider.createCache(CacheType.FORWARD_DICTIONARY);
+    queryTypes = CarbonStreamInputFormat.getComplexDimensions(carbonTable, storageColumns, cache);
+  }
+
+  /**
+   * check next Row
+   */
+  protected boolean nextRow() throws IOException {
+    // read row one by one
+    try {
+      boolean hasNext;
+      boolean scanMore = false;
+      do {
+        hasNext = input.hasNext();
+        if (hasNext) {
+          if (skipScanData) {
+            input.nextRow();
+            scanMore = false;
+          } else {
+            if (useRawRow) {
+              // read raw row for streaming handoff which does not require decode raw row
+              readRawRowFromStream();
+            } else {
+              readRowFromStream();
+            }
+            if (null != filter) {
+              scanMore = !filter.applyFilter(filterRow, carbonTable.getDimensionOrdinalMax());
+            } else {
+              scanMore = false;
+            }
+          }
+        } else {
+          if (input.nextBlocklet()) {
+            BlockletHeader header = input.readBlockletHeader();
+            if (isScanRequired(header)) {
+              if (skipScanData) {
+                input.skipBlockletData(false);
+              } else {
+                input.readBlockletData(header);
+              }
+            } else {
+              input.skipBlockletData(true);
+            }
+            scanMore = true;
+          } else {
+            isFinished = true;
+            scanMore = false;
+          }
+        }
+      } while (scanMore);
+      return hasNext;
+    } catch (FilterUnsupportedException e) {
+      throw new IOException("Failed to filter row in detail reader", e);
+    }
+  }
+
+  @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+    if (isFirstRow) {
+      isFirstRow = false;
+      initializeAtFirstRow();
+    }
+    if (isFinished) {
+      return false;
+    }
+
+    return nextRow();
+  }
+
+  @Override public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override public Object getCurrentValue() throws IOException, InterruptedException {
+    return outputValues;
+  }
+
+  protected boolean isScanRequired(BlockletHeader header) {
+    if (filter != null && header.getBlocklet_index() != null) {
+      BlockletMinMaxIndex minMaxIndex = CarbonMetadataUtil
+          .convertExternalMinMaxIndex(header.getBlocklet_index().getMin_max_index());
+      if (minMaxIndex != null) {
+        BitSet bitSet = filter
+            .isScanRequired(minMaxIndex.getMaxValues(), minMaxIndex.getMinValues(),
+                minMaxIndex.getIsMinMaxSet());
+        if (bitSet.isEmpty()) {
+          return false;
+        } else {
+          return true;
+        }
+      }
+    }
+    return true;
+  }
+
+  protected void readRowFromStream() {
+    input.nextRow();
+    short nullLen = input.readShort();
+    BitSet nullBitSet = allNonNull;
+    if (nullLen > 0) {
+      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+    }
+    int colCount = 0;
+    // primitive type dimension
+    for (; colCount < isNoDictColumn.length; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        if (isNoDictColumn[colCount]) {
+          int v = input.readShort();
+          if (isRequired[colCount]) {
+            byte[] b = input.readBytes(v);
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = b;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = DataTypeUtil
+                  .getDataBasedOnDataTypeForNoDictionaryColumn(b,
+                      storageColumns[colCount].getDataType());
+            }
+          } else {
+            input.skipBytes(v);
+          }
+        } else if (null != directDictionaryGenerators[colCount]) {
+          if (isRequired[colCount]) {
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = input.copy(4);
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] =
+                  directDictionaryGenerators[colCount].getValueFromSurrogate(input.readInt());
+            } else {
+              input.skipBytes(4);
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        } else {
+          if (isRequired[colCount]) {
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = input.copy(4);
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = input.readInt();
+            } else {
+              input.skipBytes(4);
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        }
+      }
+    }
+    // complex type dimension
+    for (; colCount < dimensionCount; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = null;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        short v = input.readShort();
+        if (isRequired[colCount]) {
+          byte[] b = input.readBytes(v);
+          if (isFilterRequired[colCount]) {
+            filterValues[filterMap[colCount]] = b;
+          }
+          if (isProjectionRequired[colCount]) {
+            outputValues[projectionMap[colCount]] =
+                queryTypes[colCount].getDataBasedOnDataType(ByteBuffer.wrap(b));
+          }
+        } else {
+          input.skipBytes(v);
+        }
+      }
+    }
+    // measure
+    DataType dataType;
+    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+      if (nullBitSet.get(colCount)) {
+        if (isFilterRequired[colCount]) {
+          filterValues[filterMap[colCount]] = null;
+        }
+        if (isProjectionRequired[colCount]) {
+          outputValues[projectionMap[colCount]] = null;
+        }
+      } else {
+        dataType = measureDataTypes[msrCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          if (isRequired[colCount]) {
+            boolean v = input.readBoolean();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(1);
+          }
+        } else if (dataType == DataTypes.SHORT) {
+          if (isRequired[colCount]) {
+            short v = input.readShort();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(2);
+          }
+        } else if (dataType == DataTypes.INT) {
+          if (isRequired[colCount]) {
+            int v = input.readInt();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(4);
+          }
+        } else if (dataType == DataTypes.LONG) {
+          if (isRequired[colCount]) {
+            long v = input.readLong();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(8);
+          }
+        } else if (dataType == DataTypes.DOUBLE) {
+          if (isRequired[colCount]) {
+            double v = input.readDouble();
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] = v;
+            }
+          } else {
+            input.skipBytes(8);
+          }
+        } else if (DataTypes.isDecimal(dataType)) {
+          int len = input.readShort();
+          if (isRequired[colCount]) {
+            BigDecimal v = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+            if (isFilterRequired[colCount]) {
+              filterValues[filterMap[colCount]] = v;
+            }
+            if (isProjectionRequired[colCount]) {
+              outputValues[projectionMap[colCount]] =
+                  DataTypeUtil.getDataTypeConverter().convertFromBigDecimalToDecimal(v);
+            }
+          } else {
+            input.skipBytes(len);
+          }
+        }
+      }
+    }
+  }
+
+  private void readRawRowFromStream() {
+    input.nextRow();
+    short nullLen = input.readShort();
+    BitSet nullBitSet = allNonNull;
+    if (nullLen > 0) {
+      nullBitSet = BitSet.valueOf(input.readBytes(nullLen));
+    }
+    int colCount = 0;
+    // primitive type dimension
+    for (; colCount < isNoDictColumn.length; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      } else {
+        if (isNoDictColumn[colCount]) {
+          int v = input.readShort();
+          outputValues[colCount] = input.readBytes(v);
+        } else {
+          outputValues[colCount] = input.readInt();
+        }
+      }
+    }
+    // complex type dimension
+    for (; colCount < dimensionCount; colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = null;
+      } else {
+        short v = input.readShort();
+        outputValues[colCount] = input.readBytes(v);
+      }
+    }
+    // measure
+    DataType dataType;
+    for (int msrCount = 0; msrCount < measureCount; msrCount++, colCount++) {
+      if (nullBitSet.get(colCount)) {
+        outputValues[colCount] = null;
+      } else {
+        dataType = measureDataTypes[msrCount];
+        if (dataType == DataTypes.BOOLEAN) {
+          outputValues[colCount] = input.readBoolean();
+        } else if (dataType == DataTypes.SHORT) {
+          outputValues[colCount] = input.readShort();
+        } else if (dataType == DataTypes.INT) {
+          outputValues[colCount] = input.readInt();
+        } else if (dataType == DataTypes.LONG) {
+          outputValues[colCount] = input.readLong();
+        } else if (dataType == DataTypes.DOUBLE) {
+          outputValues[colCount] = input.readDouble();
+        } else if (DataTypes.isDecimal(dataType)) {
+          int len = input.readShort();
+          outputValues[colCount] = DataTypeUtil.byteToBigDecimal(input.readBytes(len));
+        }
+      }
+    }
+  }
+
+  @Override public float getProgress() {
+    return 0;
+  }
+
+  @Override public void close() throws IOException {
+    if (null != input) {
+      input.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
index 140e46b..2f0c9eb 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
@@ -75,7 +75,7 @@ public class CarbonVectorBatch {
     }
   }
 
-  private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType,
+  public static CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType,
       StructField field, Dictionary dictionary) {
     if (dataType == DataTypes.BOOLEAN) {
       return new BooleanStreamReader(batchSize, field.getDataType(), dictionary);
@@ -92,8 +92,12 @@ public class CarbonVectorBatch {
     } else if (dataType == DataTypes.STRING) {
       return new SliceStreamReader(batchSize, field.getDataType(), dictionary);
     } else if (DataTypes.isDecimal(dataType)) {
-      return new DecimalSliceStreamReader(batchSize, field.getDataType(), (DecimalType) dataType,
-          dictionary);
+      if (dataType instanceof DecimalType) {
+        return new DecimalSliceStreamReader(batchSize, field.getDataType(), (DecimalType) dataType,
+            dictionary);
+      } else {
+        return null;
+      }
     } else {
       return new ObjectStreamReader(batchSize, field.getDataType());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index 93de394..f289718 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -18,14 +18,46 @@
 package org.apache.carbondata.presto;
 
 import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.List;
 import java.util.Objects;
 
+import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.stream.StreamRecordReader;
+import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit;
 import org.apache.carbondata.presto.readers.PrestoVectorBlockBuilder;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 
 import com.facebook.presto.hadoop.$internal.com.google.common.base.Throwables;
+import com.facebook.presto.hive.HiveColumnHandle;
+import com.facebook.presto.hive.HiveSplit;
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorPageSource;
 import com.facebook.presto.spi.Page;
@@ -33,6 +65,12 @@ import com.facebook.presto.spi.PrestoException;
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.LazyBlock;
 import com.facebook.presto.spi.block.LazyBlockLoader;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.log4j.Logger;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -44,18 +82,103 @@ class CarbondataPageSource implements ConnectorPageSource {
 
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(CarbondataPageSource.class.getName());
+
+  private HiveSplit split;
+  private CarbonTable carbonTable;
+  private String queryId;
+  private Configuration hadoopConf;
+  private FileFormat fileFormat;
   private List<ColumnHandle> columnHandles;
+  private int columnCount = 0;
   private boolean closed;
-  private PrestoCarbonVectorizedRecordReader vectorReader;
   private long sizeOfData = 0;
   private int batchId;
   private long nanoStart;
   private long nanoEnd;
+  private CarbonDictionaryDecodeReadSupport readSupport;
+
+  // columnar format split
+  private PrestoCarbonVectorizedRecordReader vectorReader;
+  private boolean isDirectVectorFill;
+
+  // row format split
+  private StreamRecordReader rowReader;
+  private StructField[] fields;
+  private int batchSize = 100;
+  private Dictionary[] dictionaries;
+  private DataType[] dataTypes;
+  private boolean isFrstPage = true;
 
-  CarbondataPageSource(PrestoCarbonVectorizedRecordReader vectorizedRecordReader,
-      List<ColumnHandle> columnHandles) {
+  CarbondataPageSource(CarbonTable carbonTable, String queryId, HiveSplit split,
+      List<ColumnHandle> columnHandles, Configuration hadoopConf, boolean isDirectVectorFill) {
+    this.carbonTable = carbonTable;
+    this.queryId = queryId;
+    this.split = split;
     this.columnHandles = columnHandles;
-    vectorReader = vectorizedRecordReader;
+    this.hadoopConf = hadoopConf;
+    this.isDirectVectorFill = isDirectVectorFill;
+    initialize();
+  }
+
+  private void initialize() {
+    CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit
+        .convertSplit(split.getSchema().getProperty("carbonSplit"));
+    fileFormat = carbonInputSplit.getFileFormat();
+    if (fileFormat.ordinal() == FileFormat.ROW_V1.ordinal()) {
+      initializeForRow();
+    } else {
+      initializeForColumnar();
+    }
+  }
+
+  private void initializeForColumnar() {
+    readSupport = new CarbonDictionaryDecodeReadSupport();
+    vectorReader = createReaderForColumnar(split, columnHandles, readSupport, hadoopConf);
+  }
+
+  private void initializeForRow() {
+    QueryModel queryModel = createQueryModel(split, columnHandles, hadoopConf);
+    rowReader = new StreamRecordReader(queryModel, false);
+    List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
+    List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
+    fields = new StructField[queryDimension.size() + queryMeasures.size()];
+    for (int i = 0; i < queryDimension.size(); i++) {
+      ProjectionDimension dim = queryDimension.get(i);
+      if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(dim.getDimension().getDataType());
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), generator.getReturnType());
+      } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
+        fields[dim.getOrdinal()] =
+            new StructField(dim.getColumnName(), dim.getDimension().getDataType());
+      } else if (dim.getDimension().isComplex()) {
+        fields[dim.getOrdinal()] =
+            new StructField(dim.getColumnName(), dim.getDimension().getDataType());
+      } else {
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(), DataTypes.INT);
+      }
+    }
+
+    for (ProjectionMeasure msr : queryMeasures) {
+      DataType dataType = msr.getMeasure().getDataType();
+      if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT || dataType == DataTypes.INT
+          || dataType == DataTypes.LONG) {
+        fields[msr.getOrdinal()] =
+            new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
+      } else if (DataTypes.isDecimal(dataType)) {
+        fields[msr.getOrdinal()] =
+            new StructField(msr.getColumnName(), msr.getMeasure().getDataType());
+      } else {
+        fields[msr.getOrdinal()] = new StructField(msr.getColumnName(), DataTypes.DOUBLE);
+      }
+    }
+
+    this.columnCount = columnHandles.size();
+    readSupport = new CarbonDictionaryDecodeReadSupport();
+    readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
+    this.dictionaries = readSupport.getDictionaries();
+    this.dataTypes = readSupport.getDataTypes();
+
   }
 
   @Override public long getCompletedBytes() {
@@ -71,6 +194,14 @@ class CarbondataPageSource implements ConnectorPageSource {
   }
 
   @Override public Page getNextPage() {
+    if (fileFormat.ordinal() == FileFormat.ROW_V1.ordinal()) {
+      return getNextPageForRow();
+    } else {
+      return getNextPageForColumnar();
+    }
+  }
+
+  private Page getNextPageForColumnar() {
     if (nanoStart == 0) {
       nanoStart = System.nanoTime();
     }
@@ -111,6 +242,68 @@ class CarbondataPageSource implements ConnectorPageSource {
     }
   }
 
+  private Page getNextPageForRow() {
+    if (isFrstPage) {
+      isFrstPage = false;
+      initialReaderForRow();
+    }
+
+    if (nanoStart == 0) {
+      nanoStart = System.nanoTime();
+    }
+    int count = 0;
+    try {
+      Block[] blocks = new Block[columnCount];
+      CarbonColumnVectorImpl[] columns = new CarbonColumnVectorImpl[columnCount];
+      for (int i = 0; i < columnCount; ++i) {
+        columns[i] = CarbonVectorBatch
+            .createDirectStreamReader(batchSize, dataTypes[i], fields[i], dictionaries[i]);
+      }
+
+      while (rowReader.nextKeyValue()) {
+        Object[] values = (Object[]) rowReader.getCurrentValue();
+        for (int index = 0; index < columnCount; index++) {
+          columns[index].putObject(count, values[index]);
+        }
+        count++;
+        if (count == batchSize) {
+          break;
+        }
+      }
+      if (count == 0) {
+        close();
+        return null;
+      } else {
+        for (int index = 0; index < columnCount; index++) {
+          blocks[index] = ((PrestoVectorBlockBuilder) columns[index]).buildBlock();
+          sizeOfData += blocks[index].getSizeInBytes();
+        }
+      }
+      return new Page(count, blocks);
+    } catch (PrestoException e) {
+      closeWithSuppression(e);
+      throw e;
+    } catch (RuntimeException | InterruptedException | IOException e) {
+      closeWithSuppression(e);
+      throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e);
+    }
+  }
+
+  private void initialReaderForRow() {
+    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
+    String jobTrackerId = formatter.format(new Date());
+    TaskAttemptID attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0);
+    TaskAttemptContextImpl attemptContext =
+        new TaskAttemptContextImpl(FileFactory.getConfiguration(), attemptId);
+    CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit
+        .convertSplit(split.getSchema().getProperty("carbonSplit"));
+    try {
+      rowReader.initialize(carbonInputSplit, attemptContext);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override public long getSystemMemoryUsage() {
     return sizeOfData;
   }
@@ -122,7 +315,12 @@ class CarbondataPageSource implements ConnectorPageSource {
     }
     closed = true;
     try {
-      vectorReader.close();
+      if (vectorReader != null) {
+        vectorReader.close();
+      }
+      if (rowReader != null) {
+        rowReader.close();
+      }
       nanoEnd = System.nanoTime();
     } catch (Exception e) {
       throw Throwables.propagate(e);
@@ -144,6 +342,116 @@ class CarbondataPageSource implements ConnectorPageSource {
   }
 
   /**
+   * Create vector reader using the split.
+   */
+  private PrestoCarbonVectorizedRecordReader createReaderForColumnar(HiveSplit carbonSplit,
+      List<? extends ColumnHandle> columns, CarbonDictionaryDecodeReadSupport readSupport,
+      Configuration conf) {
+    QueryModel queryModel = createQueryModel(carbonSplit, columns, conf);
+    if (isDirectVectorFill) {
+      queryModel.setDirectVectorFill(true);
+      queryModel.setPreFetchData(false);
+    }
+    QueryExecutor queryExecutor =
+        QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration());
+    try {
+      CarbonIterator iterator = queryExecutor.execute(queryModel);
+      readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
+      PrestoCarbonVectorizedRecordReader reader =
+          new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel,
+              (AbstractDetailQueryResultIterator) iterator, readSupport);
+      reader.setTaskId(Long.parseLong(carbonSplit.getSchema().getProperty("index")));
+      return reader;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create reader ", e);
+    }
+  }
+
+  /**
+   * @param carbondataSplit
+   * @param columns
+   * @return
+   */
+  private QueryModel createQueryModel(HiveSplit carbondataSplit,
+      List<? extends ColumnHandle> columns, Configuration conf) {
+
+    try {
+      CarbonProjection carbonProjection = getCarbonProjection(columns);
+      conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+      String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+      CarbonTableInputFormat
+          .setTransactionalTable(conf, carbonTable.getTableInfo().isTransactionalTable());
+      CarbonTableInputFormat.setTableInfo(conf, carbonTable.getTableInfo());
+      conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+      conf.set("query.id", queryId);
+      JobConf jobConf = new JobConf(conf);
+      CarbonTableInputFormat carbonTableInputFormat = createInputFormat(jobConf, carbonTable,
+          PrestoFilterUtil.parseFilterExpression(carbondataSplit.getEffectivePredicate()),
+          carbonProjection);
+      TaskAttemptContextImpl hadoopAttemptContext =
+          new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
+      CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit
+          .convertSplit(carbondataSplit.getSchema().getProperty("carbonSplit"));
+      QueryModel queryModel =
+          carbonTableInputFormat.createQueryModel(carbonInputSplit, hadoopAttemptContext);
+      queryModel.setQueryId(queryId);
+      queryModel.setVectorReader(true);
+      queryModel.setStatisticsRecorder(
+          CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId()));
+
+      List<TableBlockInfo> tableBlockInfoList =
+          CarbonInputSplit.createBlocks(carbonInputSplit.getAllSplits());
+      queryModel.setTableBlockInfos(tableBlockInfoList);
+      return queryModel;
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to get the Query Model ", e);
+    }
+  }
+
+  /**
+   * @param conf
+   * @param carbonTable
+   * @param filterExpression
+   * @param projection
+   * @return
+   */
+  private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,
+      CarbonTable carbonTable, Expression filterExpression, CarbonProjection projection) {
+
+    AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+    CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+    try {
+      CarbonTableInputFormat
+          .setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath()));
+      CarbonTableInputFormat
+          .setDatabaseName(conf, identifier.getCarbonTableIdentifier().getDatabaseName());
+      CarbonTableInputFormat
+          .setTableName(conf, identifier.getCarbonTableIdentifier().getTableName());
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to create the CarbonTableInputFormat", e);
+    }
+    CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
+    CarbonTableInputFormat.setColumnProjection(conf, projection);
+
+    return format;
+  }
+
+  /**
+   * @param columns
+   * @return
+   */
+  private CarbonProjection getCarbonProjection(List<? extends ColumnHandle> columns) {
+    CarbonProjection carbonProjection = new CarbonProjection();
+    // Convert all columns handles
+    ImmutableList.Builder<HiveColumnHandle> handles = ImmutableList.builder();
+    for (ColumnHandle handle : columns) {
+      handles.add(Types.checkType(handle, HiveColumnHandle.class, "handle"));
+      carbonProjection.addColumn(((HiveColumnHandle) handle).getName());
+    }
+    return carbonProjection;
+  }
+
+  /**
    * Lazy Block Implementation for the Carbondata
    */
   private final class CarbondataBlockLoader implements LazyBlockLoader<LazyBlock> {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index c81e0c3..be088e1 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -17,27 +17,12 @@
 
 package org.apache.carbondata.presto;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 
 import static java.util.Objects.requireNonNull;
 
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.executor.QueryExecutor;
-import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit;
 import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
 
@@ -45,7 +30,6 @@ import static org.apache.carbondata.presto.Types.checkType;
 
 import com.facebook.presto.hive.HdfsEnvironment;
 import com.facebook.presto.hive.HiveClientConfig;
-import com.facebook.presto.hive.HiveColumnHandle;
 import com.facebook.presto.hive.HivePageSourceFactory;
 import com.facebook.presto.hive.HivePageSourceProvider;
 import com.facebook.presto.hive.HiveRecordCursorProvider;
@@ -57,14 +41,9 @@ import com.facebook.presto.spi.ConnectorSplit;
 import com.facebook.presto.spi.SchemaTableName;
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
 import com.facebook.presto.spi.type.TypeManager;
-import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskAttemptContextImpl;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -103,122 +82,11 @@ public class CarbondataPageSourceProvider extends HivePageSourceProvider {
         new HdfsEnvironment.HdfsContext(session, carbonSplit.getDatabase(), carbonSplit.getTable()),
         new Path(carbonSplit.getSchema().getProperty("tablePath")));
     configuration = carbonTableReader.updateS3Properties(configuration);
-    CarbonDictionaryDecodeReadSupport readSupport = new CarbonDictionaryDecodeReadSupport();
-    PrestoCarbonVectorizedRecordReader carbonRecordReader =
-        createReader(carbonSplit, columns, readSupport, configuration);
-    return new CarbondataPageSource(carbonRecordReader, columns);
-  }
-
-  /**
-   * Create vector reader using the split.
-   */
-  private PrestoCarbonVectorizedRecordReader createReader(HiveSplit carbonSplit,
-      List<? extends ColumnHandle> columns, CarbonDictionaryDecodeReadSupport readSupport,
-      Configuration conf) {
-    QueryModel queryModel = createQueryModel(carbonSplit, columns, conf);
-    if (carbonTableReader.config.getPushRowFilter() == null ||
-        carbonTableReader.config.getPushRowFilter().equalsIgnoreCase("false")) {
-      queryModel.setDirectVectorFill(true);
-      queryModel.setPreFetchData(false);
-    }
-    QueryExecutor queryExecutor =
-        QueryExecutorFactory.getQueryExecutor(queryModel, new Configuration());
-    try {
-      CarbonIterator iterator = queryExecutor.execute(queryModel);
-      readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
-      PrestoCarbonVectorizedRecordReader reader =
-          new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel,
-              (AbstractDetailQueryResultIterator) iterator, readSupport);
-      reader.setTaskId(Long.parseLong(carbonSplit.getSchema().getProperty("index")));
-      return reader;
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to create reader ", e);
-    }
-  }
-
-  /**
-   * @param carbondataSplit
-   * @param columns
-   * @return
-   */
-  private QueryModel createQueryModel(HiveSplit carbondataSplit,
-      List<? extends ColumnHandle> columns, Configuration conf) {
-
-    try {
-      CarbonProjection carbonProjection = getCarbonProjection(columns);
-      CarbonTable carbonTable = getCarbonTable(carbondataSplit, conf);
-      conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
-      String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
-      CarbonTableInputFormat
-          .setTransactionalTable(conf, carbonTable.getTableInfo().isTransactionalTable());
-      CarbonTableInputFormat.setTableInfo(conf, carbonTable.getTableInfo());
-      conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
-      conf.set("query.id", queryId);
-      JobConf jobConf = new JobConf(conf);
-      CarbonTableInputFormat carbonTableInputFormat = createInputFormat(jobConf, carbonTable,
-          PrestoFilterUtil.parseFilterExpression(carbondataSplit.getEffectivePredicate()),
-          carbonProjection);
-      TaskAttemptContextImpl hadoopAttemptContext =
-          new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
-      CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit
-          .convertSplit(carbondataSplit.getSchema().getProperty("carbonSplit"));
-      QueryModel queryModel =
-          carbonTableInputFormat.createQueryModel(carbonInputSplit, hadoopAttemptContext);
-      queryModel.setQueryId(queryId);
-      queryModel.setVectorReader(true);
-      queryModel.setStatisticsRecorder(
-          CarbonTimeStatisticsFactory.createExecutorRecorder(queryModel.getQueryId()));
-
-      List<TableBlockInfo> tableBlockInfoList =
-          CarbonInputSplit.createBlocks(carbonInputSplit.getAllSplits());
-      queryModel.setTableBlockInfos(tableBlockInfoList);
-      return queryModel;
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to get the Query Model ", e);
-    }
-  }
-
-  /**
-   * @param conf
-   * @param carbonTable
-   * @param filterExpression
-   * @param projection
-   * @return
-   */
-  private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,
-      CarbonTable carbonTable, Expression filterExpression, CarbonProjection projection) {
-
-    AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
-    CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
-    try {
-      CarbonTableInputFormat
-          .setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath()));
-      CarbonTableInputFormat
-          .setDatabaseName(conf, identifier.getCarbonTableIdentifier().getDatabaseName());
-      CarbonTableInputFormat
-          .setTableName(conf, identifier.getCarbonTableIdentifier().getTableName());
-    } catch (Exception e) {
-      throw new RuntimeException("Unable to create the CarbonTableInputFormat", e);
-    }
-    CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
-    CarbonTableInputFormat.setColumnProjection(conf, projection);
-
-    return format;
-  }
-
-  /**
-   * @param columns
-   * @return
-   */
-  private CarbonProjection getCarbonProjection(List<? extends ColumnHandle> columns) {
-    CarbonProjection carbonProjection = new CarbonProjection();
-    // Convert all columns handles
-    ImmutableList.Builder<HiveColumnHandle> handles = ImmutableList.builder();
-    for (ColumnHandle handle : columns) {
-      handles.add(checkType(handle, HiveColumnHandle.class, "handle"));
-      carbonProjection.addColumn(((HiveColumnHandle) handle).getName());
-    }
-    return carbonProjection;
+    CarbonTable carbonTable = getCarbonTable(carbonSplit, configuration);
+    boolean isDirectVectorFill = carbonTableReader.config.getPushRowFilter() == null ||
+        carbonTableReader.config.getPushRowFilter().equalsIgnoreCase("false");
+    return new CarbondataPageSource(
+        carbonTable, queryId, carbonSplit, columns, configuration, isDirectVectorFill);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
index 718cb33..f4f50a5 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -44,9 +45,9 @@ public class CarbonLocalInputSplit {
   private short version;
   private String[] deleteDeltaFiles;
   private String blockletId;
-
-
   private String detailInfo;
+  private int fileFormatOrdinal;
+  private FileFormat fileFormat;
 
   /**
    * Number of BlockLets in a block
@@ -93,6 +94,14 @@ public class CarbonLocalInputSplit {
     return blockletId;
   }
 
+  @JsonProperty public int getFileFormatOrdinal() {
+    return fileFormatOrdinal;
+  }
+
+  public FileFormat getFileFormat() {
+    return fileFormat;
+  }
+
   public void setDetailInfo(BlockletDetailInfo blockletDetailInfo) {
     Gson gson = new Gson();
     detailInfo = gson.toJson(blockletDetailInfo);
@@ -107,7 +116,8 @@ public class CarbonLocalInputSplit {
       @JsonProperty("version") short version,
       @JsonProperty("deleteDeltaFiles") String[] deleteDeltaFiles,
       @JsonProperty("blockletId") String blockletId,
-      @JsonProperty("detailInfo") String detailInfo
+      @JsonProperty("detailInfo") String detailInfo,
+      @JsonProperty("fileFormatOrdinal") int fileFormatOrdinal
   ) {
     this.path = path;
     this.start = start;
@@ -120,7 +130,8 @@ public class CarbonLocalInputSplit {
     this.deleteDeltaFiles = deleteDeltaFiles;
     this.blockletId = blockletId;
     this.detailInfo = detailInfo;
-
+    this.fileFormatOrdinal = fileFormatOrdinal;
+    this.fileFormat = FileFormat.getByOrdinal(fileFormatOrdinal);
   }
 
   public static CarbonInputSplit convertSplit(CarbonLocalInputSplit carbonLocalInputSplit) {
@@ -132,18 +143,21 @@ public class CarbonLocalInputSplit {
         carbonLocalInputSplit.getNumberOfBlocklets(),
         ColumnarFormatVersion.valueOf(carbonLocalInputSplit.getVersion()),
         carbonLocalInputSplit.getDeleteDeltaFiles());
-    Gson gson = new Gson();
-    BlockletDetailInfo blockletDetailInfo =
-        gson.fromJson(carbonLocalInputSplit.detailInfo, BlockletDetailInfo.class);
-    if (null == blockletDetailInfo) {
-      throw new RuntimeException("Could not read blocklet details");
-    }
-    try {
-      blockletDetailInfo.readColumnSchema(blockletDetailInfo.getColumnSchemaBinary());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+    inputSplit.setFormat(carbonLocalInputSplit.getFileFormat());
+    if (FileFormat.COLUMNAR_V3.ordinal() == inputSplit.getFileFormat().ordinal()) {
+      Gson gson = new Gson();
+      BlockletDetailInfo blockletDetailInfo =
+          gson.fromJson(carbonLocalInputSplit.detailInfo, BlockletDetailInfo.class);
+      if (null == blockletDetailInfo) {
+        throw new RuntimeException("Could not read blocklet details");
+      }
+      try {
+        blockletDetailInfo.readColumnSchema(blockletDetailInfo.getColumnSchemaBinary());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      inputSplit.setDetailInfo(blockletDetailInfo);
     }
-    inputSplit.setDetailInfo(blockletDetailInfo);
     return inputSplit;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
index fd232ed..6702c5f 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
@@ -70,6 +70,9 @@ public class CarbonLocalMultiBlockSplit {
       @JsonProperty("locations") String[] locations) {
     this.splitList = splitList;
     this.locations = locations;
+    if (!splitList.isEmpty()) {
+      this.fileFormat = splitList.get(0).getFileFormat();
+    }
   }
 
   public String getJsonString() {
@@ -87,6 +90,7 @@ public class CarbonLocalMultiBlockSplit {
 
     CarbonMultiBlockSplit carbonMultiBlockSplit =
         new CarbonMultiBlockSplit(carbonInputSplitList, carbonLocalMultiBlockSplit.getLocations());
+    carbonMultiBlockSplit.setFileFormat(carbonLocalMultiBlockSplit.getFileFormat());
 
     return carbonMultiBlockSplit;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 5ede272..1121a37 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -277,7 +277,8 @@ public class CarbonTableReader {
               carbonInputSplit.getLength(), Arrays.asList(carbonInputSplit.getLocations()),
               carbonInputSplit.getNumberOfBlocklets(), carbonInputSplit.getVersion().number(),
               carbonInputSplit.getDeleteDeltaFiles(), carbonInputSplit.getBlockletId(),
-              gson.toJson(carbonInputSplit.getDetailInfo())));
+              gson.toJson(carbonInputSplit.getDetailInfo()),
+              carbonInputSplit.getFileFormat().ordinal()));
         }
 
         // Use block distribution

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
index 0eee58a..37eb111 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
@@ -92,4 +92,15 @@ public class BooleanStreamReader extends CarbonColumnVectorImpl
     builder = type.createBlockBuilder(null, batchSize);
   }
 
+  @Override public void putObject(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+    } else {
+      if (dictionary == null) {
+        putBoolean(rowId, (boolean) value);
+      } else {
+        putInt(rowId, (int) value);
+      }
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
index ddc855a..da8d913 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
@@ -167,4 +167,16 @@ public class DecimalSliceStreamReader extends CarbonColumnVectorImpl
         "Read decimal precision larger than column precision");
     return decimal;
   }
+
+  @Override public void putObject(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+    } else {
+      if (dictionary == null) {
+        decimalBlockWriter((BigDecimal) value);
+      } else {
+        putInt(rowId, (int) value);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
index ed9a202..8c3a73f 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
@@ -89,4 +89,16 @@ public class DoubleStreamReader extends CarbonColumnVectorImpl implements Presto
   @Override public void reset() {
     builder = type.createBlockBuilder(null, batchSize);
   }
+
+  @Override public void putObject(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+    } else {
+      if (dictionary == null) {
+        putDouble(rowId, (double) value);
+      } else {
+        putInt(rowId, (int) value);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
index 52ddbb2..3b7e0bf 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
@@ -88,4 +88,11 @@ public class IntegerStreamReader extends CarbonColumnVectorImpl
     builder = type.createBlockBuilder(null, batchSize);
   }
 
+  @Override public void putObject(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+    } else {
+      putInt(rowId, (int) value);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
index 81fdf88..abaf0a0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
@@ -86,4 +86,16 @@ public class LongStreamReader extends CarbonColumnVectorImpl implements PrestoVe
       builder.appendNull();
     }
   }
+
+  @Override public void putObject(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+    } else {
+      if (dictionary == null) {
+        putLong(rowId, (long) value);
+      } else {
+        putInt(rowId, (int) value);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
index 7411513..32498e0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
@@ -86,4 +86,16 @@ public class ShortStreamReader extends CarbonColumnVectorImpl implements PrestoV
   @Override public void reset() {
     builder = type.createBlockBuilder(null, batchSize);
   }
+
+  @Override public void putObject(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+    } else {
+      if (dictionary == null) {
+        putShort(rowId, (short) value);
+      } else {
+        putInt(rowId, (int) value);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index 1e4688f..3b3c78c 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 import com.facebook.presto.spi.block.Block;
@@ -156,4 +157,16 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
           ((String) data).getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))));
     }
   }
+
+  @Override public void putObject(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+    } else {
+      if (dictionaryBlock == null) {
+        putByteArray(rowId, ByteUtil.toBytes((String) value));
+      } else {
+        putInt(rowId, (int) value);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
index 1052a74..2b7f0c0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
@@ -87,4 +87,16 @@ public class TimestampStreamReader extends CarbonColumnVectorImpl
   @Override public void reset() {
     builder = type.createBlockBuilder(null, batchSize);
   }
+
+  @Override public void putObject(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+    } else {
+      if (dictionary == null) {
+        putLong(rowId, (Long) value);
+      } else {
+        putInt(rowId, (int) value);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d78db8f6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index a32a8de..0ab6a3a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -61,11 +61,11 @@ import org.apache.carbondata.hadoop._
 import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+import org.apache.carbondata.hadoop.stream.CarbonStreamInputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
 import org.apache.carbondata.spark.util.Util
-import org.apache.carbondata.streaming.CarbonStreamInputFormat
 
 /**
  * This RDD is used to perform query on CarbonData file. Before sending tasks to scan