You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/09/07 13:07:48 UTC

[2/2] carbondata git commit: [CARBONDATA-1433] Added Vectorized Reader for Presto Integration

[CARBONDATA-1433] Added Vectorized Reader for Presto Integration

This PR is for optimizing the Presto Integration Performance. 1)Added Vectorized Reader for reading the data 2)Used DictionaryBlock for loading the dictionary values. 3) Removed unused code

This closes #1307


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/531dcd23
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/531dcd23
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/531dcd23

Branch: refs/heads/master
Commit: 531dcd23457add78ad397a00129ba7efb01a0228
Parents: 0c519c4
Author: Bhavya <bh...@knoldus.com>
Authored: Tue Aug 29 17:02:18 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Thu Sep 7 21:07:29 2017 +0800

----------------------------------------------------------------------
 integration/presto/pom.xml                      |  43 +++
 .../carbondata/presto/CarbonTypeUtil.java       |  34 +++
 .../presto/CarbonVectorizedRecordReader.java    | 264 +++++++++++++++++++
 .../carbondata/presto/CarbondataPageSource.java | 256 ++++++++++--------
 .../presto/CarbondataRecordCursor.java          |  30 ++-
 .../carbondata/presto/CarbondataRecordSet.java  |  40 ++-
 .../presto/CarbondataRecordSetProvider.java     |  11 +-
 .../presto/CarbondataSplitManager.java          | 181 +------------
 .../presto/ColumnarVectorWrapper.java           | 209 +++++++++++++++
 .../presto/readers/AbstractStreamReader.java    |  66 +++++
 .../readers/DecimalSliceStreamReader.java       | 183 +++++++++++++
 .../presto/readers/DoubleStreamReader.java      |  71 +++++
 .../presto/readers/IntegerStreamReader.java     |  67 +++++
 .../presto/readers/LongStreamReader.java        |  62 +++++
 .../presto/readers/ObjectStreamReader.java      |  73 +++++
 .../presto/readers/SliceStreamReader.java       | 107 ++++++++
 .../carbondata/presto/readers/StreamReader.java |  42 +++
 .../presto/readers/StreamReaders.java           |  67 +++++
 .../CarbonDictionaryDecodeReadSupport.scala     | 144 ++++++++++
 .../presto/CarbonDictionaryDecodeSupport.scala  |  66 -----
 20 files changed, 1625 insertions(+), 391 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index 3cddc1e..562718f 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -46,8 +46,15 @@
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-core</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-sql_2.10</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
+
     <dependency>
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-common</artifactId>
@@ -58,6 +65,12 @@
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-processing</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.spark</groupId>
+          <artifactId>spark-sql_2.10</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
@@ -139,6 +152,36 @@
       <artifactId>hadoop-apache2</artifactId>
       <version>2.7.3-1</version>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_2.11</artifactId>
+      <version>2.1.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-catalyst_2.10 -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-catalyst_2.11</artifactId>
+      <version>2.1.0</version>
+    </dependency>
+    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_2.11</artifactId>
+      <version>2.1.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonTypeUtil.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonTypeUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonTypeUtil.java
new file mode 100644
index 0000000..6cb2915
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonTypeUtil.java
@@ -0,0 +1,34 @@
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+import org.apache.spark.sql.types.DataTypes;
+
+public class CarbonTypeUtil {
+
+  public static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
+      DataType carbonDataType) {
+    switch (carbonDataType) {
+      case STRING:
+        return DataTypes.StringType;
+      case SHORT:
+        return DataTypes.ShortType;
+      case INT:
+        return DataTypes.IntegerType;
+      case LONG:
+        return DataTypes.LongType;
+      case DOUBLE:
+        return DataTypes.DoubleType;
+      case BOOLEAN:
+        return DataTypes.BooleanType;
+      case DECIMAL:
+        return DataTypes.createDecimalType();
+      case TIMESTAMP:
+        return DataTypes.TimestampType;
+      case DATE:
+        return DataTypes.DateType;
+      default: return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
new file mode 100644
index 0000000..f474433
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorizedRecordReader.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.QueryDimension;
+import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.AbstractRecordReader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
+ * carbondata column APIs and fills the data directly into columns.
+ */
+class CarbonVectorizedRecordReader extends AbstractRecordReader<Object> {
+
+  private int batchIdx = 0;
+
+  private int numBatched = 0;
+
+  private ColumnarBatch columnarBatch;
+
+  private CarbonColumnarBatch carbonColumnarBatch;
+
+  /**
+   * If true, this class returns batches instead of rows.
+   */
+  private boolean returnColumnarBatch;
+
+  /**
+   * The default config on whether columnarBatch should be offheap.
+   */
+  private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.OFF_HEAP;
+
+  private QueryModel queryModel;
+
+  private AbstractDetailQueryResultIterator iterator;
+
+  private QueryExecutor queryExecutor;
+
+  public CarbonVectorizedRecordReader(QueryExecutor queryExecutor, QueryModel queryModel, AbstractDetailQueryResultIterator iterator) {
+    this.queryModel = queryModel;
+    this.iterator = iterator;
+    this.queryExecutor = queryExecutor;
+    enableReturningBatches();
+  }
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException, UnsupportedOperationException {
+    // The input split can contain single HDFS block or multiple blocks, so firstly get all the
+    // blocks and then set them in the query model.
+    List<CarbonInputSplit> splitList;
+    if (inputSplit instanceof CarbonInputSplit) {
+      splitList = new ArrayList<>(1);
+      splitList.add((CarbonInputSplit) inputSplit);
+    } else if (inputSplit instanceof CarbonMultiBlockSplit) {
+      // contains multiple blocks, this is an optimization for concurrent query.
+      CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit;
+      splitList = multiBlockSplit.getAllSplits();
+    } else {
+      throw new RuntimeException("unsupported input split type: " + inputSplit);
+    }
+    List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+    queryModel.setTableBlockInfos(tableBlockInfoList);
+    queryModel.setVectorReader(true);
+    try {
+      queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+      iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
+    } catch (QueryExecutionException e) {
+      throw new InterruptedException(e.getMessage());
+    }
+  }
+
+  @Override public void close() throws IOException {
+    logStatistics(rowCount, queryModel.getStatisticsRecorder());
+    if (columnarBatch != null) {
+      columnarBatch.close();
+      columnarBatch = null;
+    }
+    // clear dictionary cache
+    Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
+    if (null != columnToDictionaryMapping) {
+      for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
+        CarbonUtil.clearDictionaryCache(entry.getValue());
+      }
+    }
+    try {
+      queryExecutor.finish();
+    } catch (QueryExecutionException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+    resultBatch();
+
+    if (returnColumnarBatch) return nextBatch();
+
+    if (batchIdx >= numBatched) {
+      if (!nextBatch()) return false;
+    }
+    ++batchIdx;
+    return true;
+  }
+
+  @Override public Object getCurrentValue() throws IOException, InterruptedException {
+    if (returnColumnarBatch) {
+      rowCount += columnarBatch.numValidRows();
+      return columnarBatch;
+    }
+    rowCount += 1;
+    return columnarBatch.getRow(batchIdx - 1);
+  }
+
+  @Override public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override public float getProgress() throws IOException, InterruptedException {
+    // TODO : Implement it based on total number of rows it is going to retrive.
+    return 0;
+  }
+
+  /**
+   * Returns the ColumnarBatch object that will be used for all rows returned by this reader.
+   * This object is reused. Calling this enables the vectorized reader. This should be called
+   * before any calls to nextKeyValue/nextBatch.
+   */
+
+  private void initBatch(MemoryMode memMode) {
+    List<QueryDimension> queryDimension = queryModel.getQueryDimension();
+    List<QueryMeasure> queryMeasures = queryModel.getQueryMeasures();
+    StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
+    for (int i = 0; i < queryDimension.size(); i++) {
+      QueryDimension dim = queryDimension.get(i);
+      if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(dim.getDimension().getDataType());
+        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+            CarbonTypeUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, null);
+      } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
+        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+            CarbonTypeUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true,
+            null);
+      } else if (dim.getDimension().isComplex()) {
+        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+            CarbonTypeUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true,
+            null);
+      } else {
+        fields[dim.getQueryOrder()] = new StructField(dim.getColumnName(),
+            CarbonTypeUtil.convertCarbonToSparkDataType(DataType.INT), true, null);
+      }
+    }
+
+    for (int i = 0; i < queryMeasures.size(); i++) {
+      QueryMeasure msr = queryMeasures.get(i);
+      switch (msr.getMeasure().getDataType()) {
+        case SHORT:
+        case INT:
+        case LONG:
+          fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+              CarbonTypeUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
+              null);
+          break;
+        case DECIMAL:
+          fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+              new DecimalType(msr.getMeasure().getPrecision(),
+                  msr.getMeasure().getScale()), true, null);
+          break;
+        default:
+          fields[msr.getQueryOrder()] = new StructField(msr.getColumnName(),
+              CarbonTypeUtil.convertCarbonToSparkDataType(DataType.DOUBLE), true, null);
+      }
+    }
+
+    columnarBatch = ColumnarBatch.allocate(new StructType(fields), memMode);
+    CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
+    boolean[] filteredRows = new boolean[columnarBatch.capacity()];
+    for (int i = 0; i < fields.length; i++) {
+      vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i), filteredRows);
+    }
+    carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
+  }
+
+  private void initBatch() {
+    initBatch(DEFAULT_MEMORY_MODE);
+  }
+
+  private ColumnarBatch resultBatch() {
+    if (columnarBatch == null) initBatch();
+    return columnarBatch;
+  }
+
+  /*
+   * Can be called before any rows are returned to enable returning columnar batches directly.
+   */
+  private void enableReturningBatches() {
+    returnColumnarBatch = true;
+  }
+
+  /**
+   * Advances to the next batch of rows. Returns false if there are no more.
+   */
+  private boolean nextBatch() {
+    columnarBatch.reset();
+    carbonColumnarBatch.reset();
+    if (iterator.hasNext()) {
+      iterator.processNextBatch(carbonColumnarBatch);
+      int actualSize = carbonColumnarBatch.getActualSize();
+      columnarBatch.setNumRows(actualSize);
+      numBatched = actualSize;
+      batchIdx = 0;
+      return true;
+    }
+    return false;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index f7f6d1e..f13fb09 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -18,192 +18,228 @@
 package org.apache.carbondata.presto;
 
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.presto.readers.StreamReader;
+import org.apache.carbondata.presto.readers.StreamReaders;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+
+import com.facebook.presto.hadoop.$internal.com.google.common.base.Throwables;
 import com.facebook.presto.spi.ConnectorPageSource;
 import com.facebook.presto.spi.Page;
 import com.facebook.presto.spi.PageBuilder;
+import com.facebook.presto.spi.PrestoException;
 import com.facebook.presto.spi.RecordCursor;
 import com.facebook.presto.spi.RecordSet;
 import com.facebook.presto.spi.block.Block;
-import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.LazyBlock;
 import com.facebook.presto.spi.block.LazyBlockLoader;
-import com.facebook.presto.spi.type.DecimalType;
 import com.facebook.presto.spi.type.Type;
-import io.airlift.slice.Slice;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
 
-import static com.facebook.presto.spi.type.Decimals.encodeUnscaledValue;
-import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
-import static java.math.RoundingMode.HALF_UP;
 import static java.util.Collections.unmodifiableList;
 import static java.util.Objects.requireNonNull;
 
 /**
  * Carbondata Page Source class for custom Carbondata RecordSet Iteration.
  */
-public class CarbondataPageSource implements ConnectorPageSource {
+class CarbondataPageSource implements ConnectorPageSource {
 
-  private static final int ROWS_PER_REQUEST = 4096;
+  private static final LogService logger =
+      LogServiceFactory.getLogService(CarbondataPageSource.class.getName());
   private final RecordCursor cursor;
   private final List<Type> types;
   private final PageBuilder pageBuilder;
   private boolean closed;
-  private final char[] buffer = new char[100];
-  private Block[] blocks;
+  private CarbonVectorizedRecordReader vectorReader;
+  private CarbonDictionaryDecodeReadSupport<Object[]> readSupport;
+  private long sizeOfData = 0;
+
+  private final StreamReader[] readers ;
+  private int batchId;
+
+  private long nanoStart;
+  private long nanoEnd;
 
-  public CarbondataPageSource(RecordSet recordSet) {
+  CarbondataPageSource(RecordSet recordSet) {
     this(requireNonNull(recordSet, "recordSet is null").getColumnTypes(), recordSet.cursor());
   }
 
-  public CarbondataPageSource(List<Type> types, RecordCursor cursor) {
+  private CarbondataPageSource(List<Type> types, RecordCursor cursor) {
     this.cursor = requireNonNull(cursor, "cursor is null");
     this.types = unmodifiableList(new ArrayList<>(requireNonNull(types, "types is null")));
     this.pageBuilder = new PageBuilder(this.types);
-    this.blocks = new Block[types.size()];
-  }
-
-  public RecordCursor getCursor() {
-    return cursor;
+    this.readSupport = ((CarbondataRecordCursor) cursor).getReadSupport();
+    this.vectorReader = ((CarbondataRecordCursor) cursor).getVectorizedRecordReader();
+    this.readers = createStreamReaders();
   }
 
   @Override public long getTotalBytes() {
-    return cursor.getTotalBytes();
+    return sizeOfData;
   }
 
   @Override public long getCompletedBytes() {
-    return cursor.getCompletedBytes();
+    return sizeOfData;
   }
 
   @Override public long getReadTimeNanos() {
-    return cursor.getReadTimeNanos();
+    return nanoStart > 0L ? (nanoEnd == 0 ? System.nanoTime() : nanoEnd) - nanoStart : 0L;
   }
 
   @Override public boolean isFinished() {
     return closed && pageBuilder.isEmpty();
   }
 
-  @Override public Page getNextPage() {
-    BlockBuilder output;
-    Page page;
-    int size = types.size();
-    if (!closed) {
-      int i;
-      for (i = 0; i < ROWS_PER_REQUEST; i++) {
-        if (pageBuilder.isFull()) {
-          break;
-        }
-        if (!cursor.advanceNextPosition()) {
-          closed = true;
-          break;
-        }
 
-        pageBuilder.declarePosition();
-
-        for (int column = 0; column < size; column++) {
-          output = pageBuilder.getBlockBuilder(column);
-          if (cursor.isNull(column)) {
-            output.appendNull();
-          } else {
-            Type type = types.get(column);
-            Class<?> javaType = type.getJavaType();
-            if (javaType == boolean.class) {
-              type.writeBoolean(output, cursor.getBoolean(column));
-            } else if (javaType == long.class) {
-              type.writeLong(output, cursor.getLong(column));
-            } else if (javaType == double.class) {
-              type.writeDouble(output, cursor.getDouble(column));
-            } else if (javaType == Slice.class) {
-              Slice slice = cursor.getSlice(column);
-              if (type instanceof DecimalType) {
-                if (isShortDecimal(type)) {
-                  type.writeLong(output, parseLong((DecimalType) type, slice, 0, slice.length()));
-                } else {
-                  type.writeSlice(output, parseSlice((DecimalType) type, slice, 0, slice.length()));
-                }
-              } else {
-                type.writeSlice(output, slice, 0, slice.length());
-              }
-            } else {
-              type.writeObject(output, cursor.getObject(column));
-            }
+  @Override public Page getNextPage() {
+    if (nanoStart == 0) {
+      nanoStart = System.nanoTime();
+    }
+    Object vectorBatch;
+    ColumnarBatch columnarBatch = null;
+    int batchSize = 0;
+    try {
+      batchId++;
+      if(vectorReader.nextKeyValue()) {
+        vectorBatch = vectorReader.getCurrentValue();
+        if(vectorBatch instanceof ColumnarBatch)
+        {
+          columnarBatch = (ColumnarBatch) vectorBatch;
+          batchSize = columnarBatch.numRows();
+          if(batchSize == 0){
+            close();
+            return null;
           }
-          blocks[column] = new LazyBlock(output.getPositionCount(),
-              new CarbonBlockLoader(output.build(), types.get(column)));
         }
+
+      } else {
+        close();
+        return null;
       }
-    }
 
-    // only return a page if the buffer is full or we are finishing
-    if (pageBuilder.isEmpty() || (!closed && !pageBuilder.isFull())) {
-      return null;
+      Block[] blocks = new Block[types.size()];
+      for (int column = 0; column < blocks.length; column++) {
+        Type type = types.get(column);
+        readers[column].setBatchSize(columnarBatch.numRows());
+        readers[column].setVectorReader(true);
+        readers[column].setVector(columnarBatch.column(column));
+        blocks[column] = new LazyBlock(batchSize, new CarbondataBlockLoader(column, type));
+      }
+      Page page = new Page(batchSize, blocks);
+      sizeOfData += columnarBatch.capacity();
+      return page;
     }
-
-    if (blocks != null && blocks.length > 0) {
-      page = new Page(blocks[0].getPositionCount(), blocks);
-    } else {
-      page = pageBuilder.build();
+    catch (PrestoException e) {
+      closeWithSuppression(e);
+      throw e;
+    }
+    catch ( RuntimeException e) {
+      closeWithSuppression(e);
+      throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e);
+    } catch (InterruptedException e) {
+      closeWithSuppression(e);
+      throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e);
+    } catch (IOException e) {
+      closeWithSuppression(e);
+      throw new CarbonDataLoadingException("Exception when creating the Carbon data Block", e);
     }
 
-    pageBuilder.reset();
-    return page;
   }
 
   @Override public long getSystemMemoryUsage() {
-    return cursor.getSystemMemoryUsage() + pageBuilder.getSizeInBytes();
+    return sizeOfData;
   }
 
-  @Override public void close() throws IOException {
+  @Override public void close()  {
+    // some hive input formats are broken and bad things can happen if you close them multiple times
+    if (closed) {
+      return;
+    }
     closed = true;
-    cursor.close();
-
-  }
-
-  private long parseLong(DecimalType type, Slice slice, int offset, int length) {
-    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
-    return decimal.unscaledValue().longValue();
-  }
+    try {
+      vectorReader.close();
+      cursor.close();
+      nanoEnd = System.nanoTime();
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
 
-  private Slice parseSlice(DecimalType type, Slice slice, int offset, int length) {
-    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
-    return encodeUnscaledValue(decimal.unscaledValue());
   }
 
-  private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length) {
-    checkArgument(length < buffer.length);
-    for (int i = 0; i < length; i++) {
-      buffer[i] = (char) slice.getByte(offset + i);
+  protected void closeWithSuppression(Throwable throwable)
+  {
+    requireNonNull(throwable, "throwable is null");
+    try {
+      close();
+    }
+    catch (RuntimeException e) {
+      // Self-suppression not permitted
+      logger.error(e, e.getMessage());
+      if (throwable != e) {
+        throwable.addSuppressed(e);
+      }
     }
-    BigDecimal decimal = new BigDecimal(buffer, 0, length);
-    checkState(decimal.scale() <= type.getScale(),
-        "Read decimal value scale larger than column scale");
-    decimal = decimal.setScale(type.getScale(), HALF_UP);
-    checkState(decimal.precision() <= type.getPrecision(),
-        "Read decimal precision larger than column precision");
-    return decimal;
   }
 
   /**
-   * Using the LazyBlockLoader
+   * Lazy Block Implementation for the Carbondata
    */
-  private static final class CarbonBlockLoader implements LazyBlockLoader<LazyBlock> {
+  private final class CarbondataBlockLoader
+      implements LazyBlockLoader<LazyBlock>
+  {
+    private final int expectedBatchId = batchId;
+    private final int columnIndex;
+    private final Type type;
     private boolean loaded;
-    private Block dataBlock;
 
-    public CarbonBlockLoader(Block dataBlock, Type type) {
-      this.dataBlock = dataBlock;
+    public CarbondataBlockLoader(int columnIndex, Type type)
+    {
+      this.columnIndex = columnIndex;
+      this.type = requireNonNull(type, "type is null");
     }
 
-    @Override public void load(LazyBlock block) {
+    @Override
+    public final void load(LazyBlock lazyBlock)
+    {
       if (loaded) {
         return;
       }
-      block.setBlock(dataBlock);
+
+      checkState(batchId == expectedBatchId);
+
+      try {
+        Block block = readers[columnIndex].readBlock(type);
+        lazyBlock.setBlock(block);
+      }
+      catch (IOException e) {
+        throw new CarbonDataLoadingException("Error in Reading Data from Carbondata ", e);
+      }
+
       loaded = true;
     }
   }
-}
+
+
+  /**
+   * Create the Stream Reader for every column based on their type
+   * This method will be initialized only once based on the types.
+   *
+   * @return
+   */
+  private StreamReader[] createStreamReaders( ) {
+    requireNonNull(types);
+    StreamReader[] readers = new StreamReader[types.size()];
+    for (int i = 0; i < types.size(); i++) {
+      readers[i] =
+          StreamReaders.createStreamReader(types.get(i), readSupport.getSliceArrayBlock(i));
+    }
+    return readers;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
index 001392e..4663903 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -53,22 +53,24 @@ public class CarbondataRecordCursor implements RecordCursor {
 
   private Object[] fields;
   private CarbondataSplit split;
-  private CarbonIterator<Object[]> rowCursor;
-  private CarbonDictionaryDecodeReaderSupport readSupport;
+  private CarbonDictionaryDecodeReadSupport readSupport;
   private Tuple3<DataType, Dictionary, Int>[] dictionary;
+  CarbonVectorizedRecordReader vectorizedRecordReader;
 
   private long totalBytes;
   private long nanoStart;
   private long nanoEnd;
 
-  public CarbondataRecordCursor(CarbonDictionaryDecodeReaderSupport readSupport,
-      CarbonIterator<Object[]> carbonIterator, List<CarbondataColumnHandle> columnHandles,
-      CarbondataSplit split, Tuple3<DataType, Dictionary, Int>[] dictionaries) {
-    this.rowCursor = carbonIterator;
+
+
+  public CarbondataRecordCursor(CarbonDictionaryDecodeReadSupport readSupport,
+       CarbonVectorizedRecordReader vectorizedRecordReader,
+      List<CarbondataColumnHandle> columnHandles,
+      CarbondataSplit split) {
+    this.vectorizedRecordReader = vectorizedRecordReader;
     this.columnHandles = columnHandles;
     this.readSupport = readSupport;
     this.totalBytes = 0;
-    this.dictionary = dictionaries;
   }
 
   @Override public long getTotalBytes() {
@@ -97,12 +99,6 @@ public class CarbondataRecordCursor implements RecordCursor {
     if (nanoStart == 0) {
       nanoStart = System.nanoTime();
     }
-
-    if (rowCursor.hasNext()) {
-      fields = readSupport.readRow(rowCursor.next(), dictionary);
-      totalBytes += fields.length;
-      return true;
-    }
     return false;
   }
 
@@ -202,4 +198,12 @@ public class CarbondataRecordCursor implements RecordCursor {
 
     //todo  delete cache from readSupport
   }
+
+  public CarbonVectorizedRecordReader getVectorizedRecordReader() {
+    return vectorizedRecordReader;
+  }
+
+  public CarbonDictionaryDecodeReadSupport getReadSupport() {
+    return readSupport;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
index 4294403..9d70e85 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -22,33 +22,25 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
 import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.BatchResult;
-import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
+import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
 import org.apache.carbondata.hadoop.CarbonInputSplit;
 import org.apache.carbondata.presto.impl.CarbonLocalInputSplit;
 
-import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorSession;
 import com.facebook.presto.spi.ConnectorSplit;
 import com.facebook.presto.spi.RecordCursor;
 import com.facebook.presto.spi.RecordSet;
-import com.facebook.presto.spi.predicate.TupleDomain;
 import com.facebook.presto.spi.type.Type;
-import scala.Tuple3;
+import org.apache.hadoop.mapred.TaskAttemptContext;
 
 import static org.apache.carbondata.presto.Types.checkType;
 
-//import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
-
 public class CarbondataRecordSet implements RecordSet {
 
   private QueryModel queryModel;
@@ -56,19 +48,17 @@ public class CarbondataRecordSet implements RecordSet {
   private List<CarbondataColumnHandle> columns;
   private QueryExecutor queryExecutor;
 
-  private CarbonDictionaryDecodeReaderSupport readSupport;
+  private CarbonDictionaryDecodeReadSupport readSupport;
+  private TaskAttemptContext taskAttemptContext;
 
   public CarbondataRecordSet(CarbonTable carbonTable, ConnectorSession session,
-      ConnectorSplit split, List<CarbondataColumnHandle> columns, QueryModel queryModel) {
+      ConnectorSplit split, List<CarbondataColumnHandle> columns, QueryModel queryModel,
+      TaskAttemptContext taskAttemptContext) {
     this.split = checkType(split, CarbondataSplit.class, "connectorSplit");
     this.queryModel = queryModel;
     this.columns = columns;
-    this.readSupport = new CarbonDictionaryDecodeReaderSupport();
-  }
-
-  //todo support later
-  private Expression parseConstraint2Expression(TupleDomain<ColumnHandle> constraints) {
-    return null;
+    this.readSupport = new CarbonDictionaryDecodeReadSupport();
+    this.taskAttemptContext = taskAttemptContext;
   }
 
   @Override public List<Type> getColumnTypes() {
@@ -76,7 +66,7 @@ public class CarbondataRecordSet implements RecordSet {
   }
 
   /**
-   * get data blocks via Carbondata QueryModel API
+   * get data blocks via Carbondata QueryModel API.
    */
   @Override public RecordCursor cursor() {
     CarbonLocalInputSplit carbonLocalInputSplit = split.getLocalInputSplit();
@@ -87,12 +77,14 @@ public class CarbondataRecordSet implements RecordSet {
     queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
     try {
 
-      Tuple3[] dict = readSupport
+      readSupport
           .initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
-      CarbonIterator<Object[]> carbonIterator =
-          new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
+      CarbonIterator iterator = queryExecutor.execute(queryModel);
+      CarbonVectorizedRecordReader vectorReader =
+          new CarbonVectorizedRecordReader(queryExecutor, queryModel,
+              (AbstractDetailQueryResultIterator) iterator);
       RecordCursor rc =
-          new CarbondataRecordCursor(readSupport, carbonIterator, columns, split, dict);
+          new CarbondataRecordCursor(readSupport, vectorReader, columns, split);
       return rc;
     } catch (QueryExecutionException e) {
       throw new RuntimeException(e.getMessage(), e);
@@ -100,5 +92,5 @@ public class CarbondataRecordSet implements RecordSet {
       throw new RuntimeException(ex.getMessage(), ex);
     }
   }
-}
 
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 0c7b77f..e49dcee 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -88,7 +87,8 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
     // Build Query Model
     CarbonTable targetTable = tableCacheModel.carbonTable;
 
-    QueryModel queryModel = null;
+    QueryModel queryModel ;
+    TaskAttemptContextImpl hadoopAttemptContext;
     try {
       Configuration conf = new Configuration();
       conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
@@ -100,18 +100,19 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
       JobConf jobConf = new JobConf(conf);
       CarbonTableInputFormat carbonTableInputFormat =
           createInputFormat(jobConf, tableCacheModel.carbonTable,
-              PrestoFilterUtil.getFilters(targetTable.getFactTableName().hashCode()),
+              PrestoFilterUtil.parseFilterExpression(carbondataSplit.getConstraints()),
               carbonProjection);
-      TaskAttemptContextImpl hadoopAttemptContext =
+      hadoopAttemptContext =
           new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
       CarbonInputSplit carbonInputSplit =
           CarbonLocalInputSplit.convertSplit(carbondataSplit.getLocalInputSplit());
       queryModel = carbonTableInputFormat.getQueryModel(carbonInputSplit, hadoopAttemptContext);
+      queryModel.setVectorReader(true);
     } catch (IOException e) {
       throw new RuntimeException("Unable to get the Query Model ", e);
     }
     return new CarbondataRecordSet(targetTable, session, carbondataSplit, handles.build(),
-        queryModel);
+        queryModel, hadoopAttemptContext);
   }
 
   private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
index cf34f1d..b732e21 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -97,8 +97,7 @@ public class CarbondataSplitManager implements ConnectorSplitManager {
         getColumnConstraints(layoutHandle.getConstraint());
 
     CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
-    Expression filters = parseFilterExpression(layoutHandle.getConstraint(), cache.carbonTable);
-
+    Expression filters = PrestoFilterUtil.parseFilterExpression(layoutHandle.getConstraint());
     try {
       List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters);
 
@@ -109,11 +108,16 @@ public class CarbondataSplitManager implements ConnectorSplitManager {
       }
       return new FixedSplitSource(cSplits.build());
     } catch (Exception ex) {
-      System.out.println(ex.toString());
+      throw new RuntimeException(ex.getMessage(), ex);
     }
-    return null;
+
   }
 
+  /**
+   *
+   * @param constraint
+   * @return
+   */
   public List<CarbondataColumnConstraint> getColumnConstraints(
       TupleDomain<ColumnHandle> constraint) {
     ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
@@ -129,173 +133,4 @@ public class CarbondataSplitManager implements ConnectorSplitManager {
     return constraintBuilder.build();
   }
 
-  /**
-   * Convert presto-TupleDomain predication into Carbon scan express condition
-   * @param originalConstraint  presto-TupleDomain
-   * @param carbonTable
-   * @return
-   */
-  public Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint,
-      CarbonTable carbonTable) {
-    ImmutableList.Builder<Expression> filters = ImmutableList.builder();
-
-    Domain domain = null;
-
-    for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
-
-      CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
-      Type type = cdch.getColumnType();
-
-      List<CarbonColumn> ccols = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName());
-      Optional<CarbonColumn> target =
-          ccols.stream().filter(a -> a.getColName().equals(cdch.getColumnName())).findFirst();
-
-      if (target.get() == null) return null;
-
-      DataType coltype = target.get().getDataType();
-      ColumnExpression colExpression =
-          new ColumnExpression(cdch.getColumnName(), target.get().getDataType());
-      //colExpression.setColIndex(cs.getSchemaOrdinal());
-      colExpression.setDimension(target.get().isDimension());
-      colExpression.setDimension(
-          carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName()));
-      colExpression.setCarbonColumn(target.get());
-
-      domain = originalConstraint.getDomains().get().get(c);
-      checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
-
-      if (domain.getValues().isNone()) {
-      }
-
-      if (domain.getValues().isAll()) {
-      }
-
-      List<Object> singleValues = new ArrayList<>();
-
-      List<Expression> disjuncts = new ArrayList<>();
-
-      for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
-        if (range.isSingleValue()) {
-          singleValues.add(range.getLow().getValue());
-        } else {
-          List<Expression> rangeConjuncts = new ArrayList<>();
-          if (!range.getLow().isLowerUnbounded()) {
-            Object value = convertDataByType(range.getLow().getValue(), type);
-            switch (range.getLow().getBound()) {
-              case ABOVE:
-                if (type == TimestampType.TIMESTAMP) {
-                  //todo not now
-                } else {
-                  GreaterThanExpression greater = new GreaterThanExpression(colExpression,
-                      new LiteralExpression(value, coltype));
-                  rangeConjuncts.add(greater);
-                }
-                break;
-              case EXACTLY:
-                GreaterThanEqualToExpression greater =
-                    new GreaterThanEqualToExpression(colExpression,
-                        new LiteralExpression(value, coltype));
-                rangeConjuncts.add(greater);
-                break;
-              case BELOW:
-                throw new IllegalArgumentException("Low marker should never use BELOW bound");
-              default:
-                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
-            }
-          }
-          if (!range.getHigh().isUpperUnbounded()) {
-            Object value = convertDataByType(range.getHigh().getValue(), type);
-            switch (range.getHigh().getBound()) {
-              case ABOVE:
-                throw new IllegalArgumentException("High marker should never use ABOVE bound");
-              case EXACTLY:
-                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
-                    new LiteralExpression(value, coltype));
-                rangeConjuncts.add(less);
-                break;
-              case BELOW:
-                LessThanExpression less2 =
-                    new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
-                rangeConjuncts.add(less2);
-                break;
-              default:
-                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
-            }
-          }
-          disjuncts.addAll(rangeConjuncts);
-        }
-      }
-
-      if (singleValues.size() == 1) {
-        Expression ex = null;
-        if (coltype.equals(DataType.STRING)) {
-          ex = new EqualToExpression(colExpression,
-              new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
-        } else ex = new EqualToExpression(colExpression,
-            new LiteralExpression(singleValues.get(0), coltype));
-        filters.add(ex);
-      } else if (singleValues.size() > 1) {
-        ListExpression candidates = null;
-        List<Expression> exs = singleValues.stream().map((a) -> {
-          return new LiteralExpression(convertDataByType(a, type), coltype);
-        }).collect(Collectors.toList());
-        candidates = new ListExpression(exs);
-
-        if (candidates != null) filters.add(new InExpression(colExpression, candidates));
-      } else if (disjuncts.size() > 0) {
-        if (disjuncts.size() > 1) {
-          Expression finalFilters = new OrExpression(disjuncts.get(0), disjuncts.get(1));
-          if (disjuncts.size() > 2) {
-            for (int i = 2; i < disjuncts.size(); i++) {
-              filters.add(new AndExpression(finalFilters, disjuncts.get(i)));
-            }
-          }
-        } else if (disjuncts.size() == 1)//only have one value
-          filters.add(disjuncts.get(0));
-      }
-    }
-
-    Expression finalFilters;
-    List<Expression> tmp = filters.build();
-    if (tmp.size() > 1) {
-      finalFilters = new OrExpression(tmp.get(0), tmp.get(1));
-      if (tmp.size() > 2) {
-        for (int i = 2; i < tmp.size(); i++) {
-          finalFilters = new OrExpression(finalFilters, tmp.get(i));
-        }
-      }
-    } else if (tmp.size() == 1) finalFilters = tmp.get(0);
-    else//no filter
-      return null;
-
-    return finalFilters;
-  }
-
-  /**
-   * Convert presto spi Type into Carbondata Type
-   *
-   * @param colType
-   * @return
-   */
-  public static DataType spi2CarbondataTypeMapper(Type colType) {
-    if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
-    else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
-    else if (colType == IntegerType.INTEGER) return DataType.INT;
-    else if (colType == BigintType.BIGINT) return DataType.LONG;
-    else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
-    else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
-    else if (colType == VarcharType.VARCHAR) return DataType.STRING;
-    else if (colType == DateType.DATE) return DataType.DATE;
-    else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
-    else return DataType.STRING;
-  }
-
-  public Object convertDataByType(Object rawdata, Type type) {
-    if (type.equals(IntegerType.INTEGER)) return Integer.valueOf(rawdata.toString());
-    else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
-    else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
-    else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
-
-    return rawdata;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapper.java b/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapper.java
new file mode 100644
index 0000000..bcb48ba
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/ColumnarVectorWrapper.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+
+public class ColumnarVectorWrapper implements CarbonColumnVector {
+
+  private ColumnVector columnVector;
+
+  private boolean[] filteredRows;
+
+  private int counter;
+
+  private boolean filteredRowsExist;
+
+  public ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
+    this.columnVector = columnVector;
+    this.filteredRows = filteredRows;
+  }
+
+  @Override public void putBoolean(int rowId, boolean value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putBoolean(counter++, value);
+    }
+  }
+
+  @Override public void putFloat(int rowId, float value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putFloat(counter++, value);
+    }
+  }
+
+  @Override public void putShort(int rowId, short value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putShort(counter++, value);
+    }
+  }
+
+  @Override public void putShorts(int rowId, int count, short value) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putShort(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putShorts(rowId, count, value);
+    }
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putInt(counter++, value);
+    }
+  }
+
+  @Override public void putInts(int rowId, int count, int value) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putInt(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putInts(rowId, count, value);
+    }
+  }
+
+  @Override public void putLong(int rowId, long value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putLong(counter++, value);
+    }
+  }
+
+  @Override public void putLongs(int rowId, int count, long value) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putLong(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putLongs(rowId, count, value);
+    }
+  }
+
+  @Override public void putDecimal(int rowId, Decimal value, int precision) {
+    if (!filteredRows[rowId]) {
+      columnVector.putDecimal(counter++, value, precision);
+    }
+  }
+
+  @Override public void putDecimals(int rowId, int count, Decimal value, int precision) {
+    for (int i = 0; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        columnVector.putDecimal(counter++, value, precision);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putDouble(int rowId, double value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putDouble(counter++, value);
+    }
+  }
+
+  @Override public void putDoubles(int rowId, int count, double value) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putDouble(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putDoubles(rowId, count, value);
+    }
+  }
+
+  @Override public void putBytes(int rowId, byte[] value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putByteArray(counter++, value);
+    }
+  }
+
+  @Override public void putBytes(int rowId, int count, byte[] value) {
+    for (int i = 0; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        columnVector.putByteArray(counter++, value);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putByteArray(counter++, value, offset, length);
+    }
+  }
+
+  @Override public void putNull(int rowId) {
+    if (!filteredRows[rowId]) {
+      columnVector.putNull(counter++);
+    }
+  }
+
+  @Override public void putNulls(int rowId, int count) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putNull(counter++);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putNulls(rowId, count);
+    }
+  }
+
+  @Override public boolean isNull(int rowId) {
+    return columnVector.isNullAt(rowId);
+  }
+
+  @Override public void putObject(int rowId, Object obj) {
+    //TODO handle complex types
+  }
+
+  @Override public Object getData(int rowId) {
+    //TODO handle complex types
+    return null;
+  }
+
+  @Override public void reset() {
+    counter = 0;
+    filteredRowsExist = false;
+  }
+
+  @Override public DataType getType() {
+    return columnVector.dataType();
+  }
+
+  @Override public void setFilteredRowsExist(boolean filteredRowsExist) {
+    this.filteredRowsExist = filteredRowsExist;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java
new file mode 100644
index 0000000..fa09e73
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/AbstractStreamReader.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+
+/**
+ * Abstract class for Stream Readers
+ */
+public abstract class AbstractStreamReader implements StreamReader {
+
+  protected Object[] streamData;
+
+  protected ColumnVector columnVector;
+
+  protected boolean isVectorReader;
+
+  protected int batchSize;
+
+  /**
+   * Setter for StreamData
+   * @param data
+   */
+  @Override public void setStreamData(Object[] data) {
+    this.streamData = data;
+  }
+
+  /**
+   * Setter for Vector data
+   * @param vector
+   */
+  @Override public void setVector(ColumnVector vector) {
+    this.columnVector = vector;
+  }
+
+  /**
+   * Setter for vector Reader
+   * @param isVectorReader
+   */
+  public void setVectorReader(boolean isVectorReader) {
+    this.isVectorReader = isVectorReader;
+  }
+
+  /**
+   * Setter for BatchSize
+   * @param batchSize
+   */
+  public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
new file mode 100644
index 0000000..67e0fd1
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.Decimals;
+import com.facebook.presto.spi.type.Type;
+import io.airlift.slice.Slice;
+
+import static com.facebook.presto.spi.type.Decimals.encodeUnscaledValue;
+import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
+import static com.facebook.presto.spi.type.Decimals.rescale;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static io.airlift.slice.Slices.utf8Slice;
+import static java.math.RoundingMode.HALF_UP;
+
+/**
+ * Reader for DecimalValues
+ */
+public class DecimalSliceStreamReader  extends AbstractStreamReader {
+
+
+  private final char[] buffer = new char[100];
+
+  public DecimalSliceStreamReader() {
+
+  }
+
+  /**
+   * Create Block for DecimalType
+   * @param type
+   * @return
+   * @throws IOException
+   */
+  public Block readBlock(Type type)
+      throws IOException
+  {
+    int numberOfRows = 0;
+    BlockBuilder builder = null;
+    if(isVectorReader) {
+      numberOfRows = batchSize;
+      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+      int scale = ((DecimalType)type).getScale();
+      int precision = ((DecimalType)type).getPrecision();
+      if (columnVector != null) {
+        for(int i = 0; i < numberOfRows ; i++ ){
+          if(columnVector.isNullAt(i)) {
+            builder.appendNull();
+          } else {
+            Slice slice =
+                getSlice(columnVector.getDecimal(i, precision, scale).toJavaBigDecimal(), type);
+            if (isShortDecimal(type)) {
+              type.writeLong(builder, parseLong((DecimalType) type, slice, 0, slice.length()));
+            } else {
+              type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
+            }
+          }
+        }
+      }
+
+    } else {
+      if (streamData != null) {
+        numberOfRows = streamData.length;
+        builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+        for(int i = 0; i < numberOfRows ; i++ ){
+          Slice slice = getSlice(streamData[i], type);
+          if (isShortDecimal(type)) {
+            type.writeLong(builder, parseLong((DecimalType) type, slice, 0, slice.length()));
+          } else {
+            type.writeSlice(builder, parseSlice((DecimalType) type, slice, 0, slice.length()));
+          }
+        }
+      }
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Function to getSlice from Decimal Object
+   * @param value
+   * @param type
+   * @return
+   */
+  private Slice getSlice(Object value, Type type) {
+    if (type instanceof DecimalType) {
+      DecimalType actual = (DecimalType) type;
+      BigDecimal bigDecimalValue = (BigDecimal) value;
+      if (isShortDecimal(type)) {
+        return utf8Slice(value.toString());
+      } else {
+        if (bigDecimalValue.scale() > actual.getScale()) {
+          BigInteger unscaledDecimal =
+              rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(),
+                  bigDecimalValue.scale());
+          Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
+          return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
+        } else {
+          BigInteger unscaledDecimal =
+              rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(), actual.getScale());
+          Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
+          return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
+
+        }
+
+      }
+    } else {
+      return utf8Slice(value.toString());
+    }
+  }
+
+  /**
+   * Function to parse ShortDecimalType as it is internally treated as Long
+   * @param type
+   * @param slice
+   * @param offset
+   * @param length
+   * @return
+   */
+  private long parseLong(DecimalType type, Slice slice, int offset, int length) {
+    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
+    return decimal.unscaledValue().longValue();
+  }
+
+  /**
+   * Function for parsing the Slice
+   * @param type
+   * @param slice
+   * @param offset
+   * @param length
+   * @return
+   */
+  private Slice parseSlice(DecimalType type, Slice slice, int offset, int length) {
+    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
+    return encodeUnscaledValue(decimal.unscaledValue());
+  }
+
+  /**
+   * Function for parsing the BigDecimal
+   * @param type
+   * @param slice
+   * @param offset
+   * @param length
+   * @return
+   */
+  private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length) {
+    checkArgument(length < buffer.length);
+    for (int i = 0; i < length; i++) {
+      buffer[i] = (char) slice.getByte(offset + i);
+    }
+    BigDecimal decimal = new BigDecimal(buffer, 0, length);
+    checkState(decimal.scale() <= type.getScale(),
+        "Read decimal value scale larger than column scale");
+    decimal = decimal.setScale(type.getScale(), HALF_UP);
+    checkState(decimal.precision() <= type.getPrecision(),
+        "Read decimal precision larger than column precision");
+    return decimal;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
new file mode 100644
index 0000000..cacf5ce
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.io.IOException;
+
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.Type;
+
+/**
+ * Class for Reading the Double value and setting it in Block
+ */
+public class DoubleStreamReader extends AbstractStreamReader {
+
+  public DoubleStreamReader() {
+
+  }
+
+  /**
+   * Create the DoubleType Block
+   *
+   * @param type
+   * @return
+   * @throws IOException
+   */
+  public Block readBlock(Type type) throws IOException {
+    int numberOfRows;
+    BlockBuilder builder;
+    if (isVectorReader) {
+      numberOfRows = batchSize;
+      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+      if (columnVector != null) {
+        for (int i = 0; i < numberOfRows; i++) {
+          if (columnVector.isNullAt(i)) {
+            builder.appendNull();
+          } else {
+            type.writeDouble(builder, columnVector.getDouble(i));
+          }
+        }
+      }
+    } else {
+      numberOfRows = streamData.length;
+      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+      if (streamData != null) {
+        for (int i = 0; i < numberOfRows; i++) {
+          type.writeDouble(builder, (Double) streamData[i]);
+        }
+      }
+    }
+
+    return builder.build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
new file mode 100644
index 0000000..13280c8
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.io.IOException;
+
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.Type;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+
+public class IntegerStreamReader extends AbstractStreamReader {
+
+
+  public IntegerStreamReader( ) {
+
+  }
+
+  public Block readBlock(Type type)
+      throws IOException
+  {
+    int numberOfRows = 0;
+    BlockBuilder builder = null;
+    if(isVectorReader) {
+      numberOfRows = batchSize;
+      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+      if (columnVector != null) {
+        for(int i = 0; i < numberOfRows ; i++ ){
+          if(columnVector.isNullAt(i)){
+            builder.appendNull();
+          } else {
+            type.writeLong(builder, ((Integer)columnVector.getInt(i)).longValue());
+          }
+
+        }
+      }
+
+    } else {
+      numberOfRows = streamData.length;
+      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+      if (streamData != null) {
+        for(int i = 0; i < numberOfRows ; i++ ){
+          type.writeLong(builder, ((Integer)streamData[i]).longValue());
+        }
+      }
+    }
+
+    return builder.build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
new file mode 100644
index 0000000..9d602a6
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.io.IOException;
+
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.Type;
+
+public class LongStreamReader extends AbstractStreamReader {
+
+  public LongStreamReader() {
+
+  }
+
+  public Block readBlock(Type type) throws IOException {
+    int numberOfRows = 0;
+    BlockBuilder builder = null;
+    if (isVectorReader) {
+      numberOfRows = batchSize;
+      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+      if (columnVector != null) {
+        for (int i = 0; i < numberOfRows; i++) {
+          if (columnVector.isNullAt(i)) {
+            builder.appendNull();
+          } else {
+            type.writeLong(builder, columnVector.getLong(i));
+          }
+        }
+      }
+
+    } else {
+      numberOfRows = streamData.length;
+      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+      if (streamData != null) {
+        for (int i = 0; i < numberOfRows; i++) {
+          type.writeLong(builder, (Long) streamData[i]);
+        }
+      }
+    }
+
+    return builder.build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
new file mode 100644
index 0000000..c659e1d
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.io.IOException;
+
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.type.Type;
+import io.airlift.slice.Slice;
+
+/**
+ * Class to read the Object Stream
+ */
+public class ObjectStreamReader  extends AbstractStreamReader {
+
+
+
+  public ObjectStreamReader() {
+
+  }
+
+  /**
+   * Function to create the object Block
+   * @param type
+   * @return
+   * @throws IOException
+   */
+  public Block readBlock(Type type)
+      throws IOException
+  {
+    int numberOfRows = 0;
+    BlockBuilder builder = null;
+    if(isVectorReader) {
+      numberOfRows = batchSize;
+      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+      if (columnVector != null) {
+        for(int i = 0; i < numberOfRows ; i++ ){
+          type.writeObject(builder, columnVector.getByte(i));
+        }
+      }
+
+    } else {
+      numberOfRows = streamData.length;
+      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+      if (streamData != null) {
+        for(int i = 0; i < numberOfRows ; i++ ){
+          type.writeObject(builder, streamData[i]);
+        }
+      }
+    }
+
+    return builder.build();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
new file mode 100644
index 0000000..bb6146a
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryChunksWrapper;
+
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
+import com.facebook.presto.spi.block.DictionaryBlock;
+import com.facebook.presto.spi.block.SliceArrayBlock;
+import com.facebook.presto.spi.type.Type;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+
+import static io.airlift.slice.Slices.utf8Slice;
+import static io.airlift.slice.Slices.wrappedBuffer;
+
+/**
+ * This class reads the String data and convert it into Slice Block
+ */
+public class SliceStreamReader extends AbstractStreamReader {
+
+
+  private boolean isDictionary;
+
+  private SliceArrayBlock dictionaryBlock;
+
+  public SliceStreamReader() {}
+
+  public SliceStreamReader(boolean isDictionary, SliceArrayBlock dictionaryBlock) {
+    this.isDictionary = isDictionary;
+    this.dictionaryBlock = dictionaryBlock;
+  }
+
+  /**
+   * Function to create the Slice Block
+   * @param type
+   * @return
+   * @throws IOException
+   */
+  public Block readBlock(Type type)
+      throws IOException
+  {
+    int numberOfRows = 0;
+    BlockBuilder builder = null;
+    if(isVectorReader) {
+      numberOfRows = batchSize;
+      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+      if (columnVector != null) {
+        if(isDictionary) {
+          int[] values = new int[numberOfRows];
+          for (int i = 0; i < numberOfRows; i++) {
+            if (!columnVector.isNullAt(i)) {
+              values[i] = columnVector.getInt(i);
+            }
+          }
+          Block block = new DictionaryBlock(batchSize, dictionaryBlock, values);
+
+          return block;
+        } else {
+          for (int i = 0; i < numberOfRows; i++) {
+            if (columnVector.isNullAt(i)) {
+              builder.appendNull();
+            } else {
+              type.writeSlice(builder, wrappedBuffer(columnVector.getArray(i).toByteArray()));
+            }
+          }
+        }
+      }
+    } else {
+      numberOfRows = streamData.length;
+      builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
+      if (streamData != null) {
+        for(int i = 0; i < numberOfRows ; i++ ){
+          type.writeSlice(builder, utf8Slice(streamData[i].toString()));
+        }
+      }
+    }
+
+    return builder.build();
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java
new file mode 100644
index 0000000..a54df0d
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.readers;
+
+import java.io.IOException;
+
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.type.Type;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+
+/**
+ * Interface for StreamReader
+ */
+public interface StreamReader {
+
+  Block readBlock(Type type) throws IOException;
+
+  void setStreamData(Object[] data);
+
+  void setVector(ColumnVector vector);
+
+  void setVectorReader(boolean isVectorReader);
+
+  void setBatchSize(int batchSize);
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/531dcd23/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
new file mode 100644
index 0000000..abd8787
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/StreamReaders.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.presto.readers;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.presto.CarbonDictionaryDecodeReadSupport;
+
+import com.facebook.presto.spi.block.SliceArrayBlock;
+import com.facebook.presto.spi.type.DateType;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.IntegerType;
+import com.facebook.presto.spi.type.Type;
+import io.airlift.slice.Slice;
+
+/**
+ * This class creates streamReader
+ * Based on type.
+ */
+public final class StreamReaders {
+  /**
+   * This function select Stream readers based on Type and use it.
+   * @param type
+   * @param dictionary
+   * @return StreamReader
+   */
+  public static StreamReader createStreamReader(Type type, SliceArrayBlock dictionary) {
+    Class<?> javaType = type.getJavaType();
+    if (javaType == long.class) {
+      if(type instanceof IntegerType || type instanceof DateType) {
+        return new IntegerStreamReader();
+      } else if (type instanceof DecimalType) {
+        return new DecimalSliceStreamReader();
+      }
+      return new LongStreamReader();
+    } else if (javaType == double.class) {
+      return new DoubleStreamReader();
+    } else if (javaType == Slice.class) {
+      if (type instanceof DecimalType) {
+       return new DecimalSliceStreamReader();
+      } else {
+        if(dictionary != null) {
+          return new SliceStreamReader(true, dictionary);
+        } else {
+        return new SliceStreamReader();
+      }
+
+      }
+    } else {
+      return new ObjectStreamReader();
+    }
+  }
+
+}