You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2018/04/08 13:16:33 UTC

carbondata git commit: [CARBONDATA-2140 ] Refactoring code to improve performance and removing unnecessary code in Presto Integration

Repository: carbondata
Updated Branches:
  refs/heads/master b439b00f6 -> 2ad621df5


[CARBONDATA-2140 ] Refactoring code to improve performance and removing unnecessary code in Presto Integration

This PR is for optimizing Presto performance and refactoring the code to remove unnecessary classes and making it simpler

This closes #1940


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2ad621df
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2ad621df
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2ad621df

Branch: refs/heads/master
Commit: 2ad621df5c5793473526da01c1874cac7000113c
Parents: b439b00
Author: Bhavya <bh...@knoldus.com>
Authored: Tue Feb 6 18:57:42 2018 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Sun Apr 8 21:16:14 2018 +0800

----------------------------------------------------------------------
 .../carbondata/presto/CarbondataConnector.java  |   7 +-
 .../presto/CarbondataConnectorFactory.java      |   4 +-
 .../carbondata/presto/CarbondataModule.java     |   3 -
 .../carbondata/presto/CarbondataPageSource.java |  31 +--
 .../presto/CarbondataPageSourceProvider.java    | 179 +++++++++++++++-
 .../presto/CarbondataRecordCursor.java          | 204 -------------------
 .../carbondata/presto/CarbondataRecordSet.java  |  92 ---------
 .../presto/CarbondataRecordSetProvider.java     | 132 ------------
 .../carbondata/presto/CarbondataUtil.java       |  49 -----
 .../presto/readers/BooleanStreamReader.java     |  40 ++--
 .../presto/readers/DoubleStreamReader.java      |  38 ++--
 .../presto/readers/IntegerStreamReader.java     |  24 ++-
 .../presto/readers/LongStreamReader.java        |  14 +-
 .../presto/readers/ShortStreamReader.java       |  42 ++--
 .../presto/readers/SliceStreamReader.java       |  32 ++-
 15 files changed, 304 insertions(+), 587 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
index 92cd655..3740ba1 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
@@ -33,17 +33,15 @@ public class CarbondataConnector implements Connector {
   private final LifeCycleManager lifeCycleManager;
   private final ConnectorMetadata metadata;
   private final ConnectorSplitManager splitManager;
-  private final ConnectorRecordSetProvider recordSetProvider;
   private final ClassLoader classLoader;
   private final ConnectorPageSourceProvider pageSourceProvider;
 
   public CarbondataConnector(LifeCycleManager lifeCycleManager, ConnectorMetadata metadata,
-      ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider,
+      ConnectorSplitManager splitManager,
       ClassLoader classLoader, ConnectorPageSourceProvider pageSourceProvider) {
     this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
     this.metadata = requireNonNull(metadata, "metadata is null");
     this.splitManager = requireNonNull(splitManager, "splitManager is null");
-    this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
     this.classLoader = requireNonNull(classLoader, "classLoader is null");
     this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
   }
@@ -62,9 +60,6 @@ public class CarbondataConnector implements Connector {
     return splitManager;
   }
 
-  @Override public ConnectorRecordSetProvider getRecordSetProvider() {
-    return recordSetProvider;
-  }
 
   @Override
   public ConnectorPageSourceProvider getPageSourceProvider()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index 579af50..2876d1e 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -70,12 +70,10 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
       LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
       ConnectorMetadata metadata = injector.getInstance(CarbondataMetadata.class);
       ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
-      ConnectorRecordSetProvider connectorRecordSet =
-          injector.getInstance(ConnectorRecordSetProvider.class);
       ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
 
       return new CarbondataConnector(lifeCycleManager, new ClassLoaderSafeConnectorMetadata(metadata,classLoader),
-          new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
+          new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
           classLoader,
           new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader)
       );

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
index 1d8b2b2..7bb864a 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
@@ -55,12 +55,9 @@ public class CarbondataModule implements Module {
     binder.bind(CarbondataMetadata.class).in(Scopes.SINGLETON);
     binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
     binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
-    binder.bind(ConnectorRecordSetProvider.class).to(CarbondataRecordSetProvider.class)
-        .in(Scopes.SINGLETON);
     binder.bind(ConnectorPageSourceProvider.class).to(CarbondataPageSourceProvider.class)
         .in(Scopes.SINGLETON);
     binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
-    binder.bind(CarbondataRecordSetProvider.class).in(Scopes.SINGLETON);
     configBinder(binder).bindConfig(CarbonTableConfig.class);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
index 5f1f90a..a7682ce 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.presto;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -28,6 +29,7 @@ import org.apache.carbondata.presto.readers.StreamReaders;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 
 import com.facebook.presto.hadoop.$internal.com.google.common.base.Throwables;
+import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorPageSource;
 import com.facebook.presto.spi.Page;
 import com.facebook.presto.spi.PageBuilder;
@@ -50,31 +52,26 @@ class CarbondataPageSource implements ConnectorPageSource {
 
   private static final LogService logger =
       LogServiceFactory.getLogService(CarbondataPageSource.class.getName());
-  private final RecordCursor cursor;
   private final List<Type> types;
   private final PageBuilder pageBuilder;
   private boolean closed;
   private PrestoCarbonVectorizedRecordReader vectorReader;
   private CarbonDictionaryDecodeReadSupport<Object[]> readSupport;
+  List<ColumnHandle> columnHandles;
   private long sizeOfData = 0;
-
   private final StreamReader[] readers ;
   private int batchId;
-
   private long nanoStart;
   private long nanoEnd;
 
-  CarbondataPageSource(RecordSet recordSet) {
-    this(requireNonNull(recordSet, "recordSet is null").getColumnTypes(),
-        recordSet.cursor());
-  }
-
-  private CarbondataPageSource(List<Type> types, RecordCursor cursor) {
-    this.cursor = requireNonNull(cursor, "cursor is null");
-    this.types = unmodifiableList(new ArrayList<>(requireNonNull(types, "types is null")));
+  public CarbondataPageSource(CarbonDictionaryDecodeReadSupport readSupport,
+      PrestoCarbonVectorizedRecordReader vectorizedRecordReader,
+      List<ColumnHandle> columnHandles ) {
+    this.columnHandles = columnHandles;
+    this.types = getColumnTypes();
     this.pageBuilder = new PageBuilder(this.types);
-    this.readSupport = ((CarbondataRecordCursor) cursor).getReadSupport();
-    this.vectorReader = ((CarbondataRecordCursor) cursor).getVectorizedRecordReader();
+    this.readSupport = readSupport;
+    vectorReader = vectorizedRecordReader;
     this.readers = createStreamReaders();
   }
 
@@ -152,7 +149,6 @@ class CarbondataPageSource implements ConnectorPageSource {
     closed = true;
     try {
       vectorReader.close();
-      cursor.close();
       nanoEnd = System.nanoTime();
     } catch (Exception e) {
       throw Throwables.propagate(e);
@@ -208,6 +204,7 @@ class CarbondataPageSource implements ConnectorPageSource {
       }
       loaded = true;
     }
+
   }
 
 
@@ -227,4 +224,10 @@ class CarbondataPageSource implements ConnectorPageSource {
     return readers;
   }
 
+   private List<Type> getColumnTypes() {
+    return columnHandles.stream().map(a -> ((CarbondataColumnHandle)a).getColumnType())
+        .collect(Collectors.toList());
+  }
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index 46d8611..a268549 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -17,34 +17,201 @@
 
 package org.apache.carbondata.presto;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.presto.impl.CarbonLocalInputSplit;
+import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorPageSource;
 import com.facebook.presto.spi.ConnectorSession;
 import com.facebook.presto.spi.ConnectorSplit;
 import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Objects.requireNonNull;
+import static org.apache.carbondata.presto.Types.checkType;
 
 /**
  * Provider Class for Carbondata Page Source class.
  */
 public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider {
 
-  private CarbondataRecordSetProvider carbondataRecordSetProvider;
+  private String connectorId;
+  private CarbonTableReader carbonTableReader;
+
+  @Inject public CarbondataPageSourceProvider(CarbondataConnectorId connectorId,
+      CarbonTableReader carbonTableReader) {
+    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+    this.carbonTableReader = requireNonNull(carbonTableReader, "carbonTableReader is null");
 
-  @Inject
-  public CarbondataPageSourceProvider(CarbondataRecordSetProvider carbondataRecordSetProvider)
-  {
-    this.carbondataRecordSetProvider = requireNonNull(carbondataRecordSetProvider, "recordSetProvider is null");
   }
 
   @Override
   public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle,
       ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) {
-    return new CarbondataPageSource(carbondataRecordSetProvider.getRecordSet(transactionHandle, session, split, columns));
+    CarbonDictionaryDecodeReadSupport readSupport = new CarbonDictionaryDecodeReadSupport();
+    PrestoCarbonVectorizedRecordReader carbonRecordReader = createReader(split, columns, readSupport);
+    return new CarbondataPageSource(readSupport, carbonRecordReader, columns );
+  }
+
+
+  /**
+   *
+   * @param split
+   * @param columns
+   * @param readSupport
+   * @return
+   */
+  private PrestoCarbonVectorizedRecordReader createReader(ConnectorSplit split,
+      List<? extends ColumnHandle> columns, CarbonDictionaryDecodeReadSupport readSupport) {
+
+    CarbondataSplit carbondataSplit =
+        checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
+    checkArgument(carbondataSplit.getConnectorId().equals(connectorId),
+        "split is not for this connector");
+    QueryModel queryModel = createQueryModel(carbondataSplit, columns);
+    QueryExecutor queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+    try {
+      CarbonIterator iterator = queryExecutor.execute(queryModel);
+      readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getTable());
+      return new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel,
+          (AbstractDetailQueryResultIterator) iterator);
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to get the Query Model ", e);
+    } catch (QueryExecutionException e) {
+      throw new RuntimeException(e.getMessage(), e);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   *
+   * @param carbondataSplit
+   * @param columns
+   * @return
+   */
+  private QueryModel createQueryModel(CarbondataSplit carbondataSplit,
+      List<? extends ColumnHandle> columns) {
+
+    try {
+      CarbonProjection carbonProjection = getCarbonProjection(columns);
+      CarbonTable carbonTable = getCarbonTable(carbondataSplit);
+
+      Configuration conf = new Configuration();
+      conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
+      String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
+
+      conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
+      JobConf jobConf = new JobConf(conf);
+      CarbonTableInputFormat carbonTableInputFormat = createInputFormat(jobConf, carbonTable,
+          PrestoFilterUtil.parseFilterExpression(carbondataSplit.getConstraints()),
+          carbonProjection);
+      TaskAttemptContextImpl hadoopAttemptContext =
+          new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
+      CarbonInputSplit carbonInputSplit =
+          CarbonLocalInputSplit.convertSplit(carbondataSplit.getLocalInputSplit());
+      QueryModel queryModel =
+          carbonTableInputFormat.createQueryModel(carbonInputSplit, hadoopAttemptContext);
+      queryModel.setVectorReader(true);
+
+      List<CarbonInputSplit> splitList = new ArrayList<>(1);
+      splitList.add(carbonInputSplit);
+      List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+      queryModel.setTableBlockInfos(tableBlockInfoList);
+
+      return queryModel;
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to get the Query Model ", e);
+    }
+  }
+
+  /**
+   *
+   * @param conf
+   * @param carbonTable
+   * @param filterExpression
+   * @param projection
+   * @return
+   */
+  private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,
+      CarbonTable carbonTable, Expression filterExpression, CarbonProjection projection) {
+
+    AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+    CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
+    try {
+      CarbonTableInputFormat
+          .setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath()));
+      CarbonTableInputFormat
+          .setDatabaseName(conf, identifier.getCarbonTableIdentifier().getDatabaseName());
+      CarbonTableInputFormat
+          .setTableName(conf, identifier.getCarbonTableIdentifier().getTableName());
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to create the CarbonTableInputFormat", e);
+    }
+    CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
+    CarbonTableInputFormat.setColumnProjection(conf, projection);
+
+    return format;
+  }
+
+
+  /**
+   *
+   * @param columns
+   * @return
+   */
+  private CarbonProjection getCarbonProjection(List<? extends ColumnHandle> columns) {
+    CarbonProjection carbonProjection = new CarbonProjection();
+    // Convert all columns handles
+    ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
+    for (ColumnHandle handle : columns) {
+      handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
+      carbonProjection.addColumn(((CarbondataColumnHandle) handle).getColumnName());
+    }
+    return carbonProjection;
+  }
+
+  /**
+   *
+   * @param carbonSplit
+   * @return
+   */
+  private CarbonTable getCarbonTable(CarbondataSplit carbonSplit) {
+    CarbonTableCacheModel tableCacheModel =
+        carbonTableReader.getCarbonCache(carbonSplit.getSchemaTableName());
+    checkNotNull(tableCacheModel, "tableCacheModel should not be null");
+    checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
+    checkNotNull(tableCacheModel.carbonTable.getTableInfo(),
+        "tableCacheModel.carbonTable.tableInfo should not be null");
+    return tableCacheModel.carbonTable;
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
deleted file mode 100755
index 5772fbf..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.sql.Timestamp;
-import java.util.List;
-
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-import com.facebook.presto.spi.RecordCursor;
-import com.facebook.presto.spi.type.DecimalType;
-import com.facebook.presto.spi.type.Decimals;
-import com.facebook.presto.spi.type.TimestampType;
-import com.facebook.presto.spi.type.Type;
-import io.airlift.log.Logger;
-import io.airlift.slice.Slice;
-import scala.Int;
-import scala.Tuple3;
-
-import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
-import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
-import static com.facebook.presto.spi.type.Decimals.rescale;
-import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
-import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-import static io.airlift.slice.Slices.utf8Slice;
-import static org.apache.carbondata.presto.CarbondataUtil.shortDecimalPartitionKey;
-
-public class CarbondataRecordCursor implements RecordCursor {
-
-  private static final Logger log = Logger.get(CarbondataRecordCursor.class);
-  private final List<CarbondataColumnHandle> columnHandles;
-
-  private Object[] fields;
-  private CarbondataSplit split;
-  private CarbonDictionaryDecodeReadSupport readSupport;
-  private Tuple3<DataType, Dictionary, Int>[] dictionary;
-  PrestoCarbonVectorizedRecordReader vectorizedRecordReader;
-
-  private long totalBytes;
-  private long nanoStart;
-  private long nanoEnd;
-
-
-
-  public CarbondataRecordCursor(CarbonDictionaryDecodeReadSupport readSupport,
-       PrestoCarbonVectorizedRecordReader vectorizedRecordReader,
-      List<CarbondataColumnHandle> columnHandles,
-      CarbondataSplit split) {
-    this.vectorizedRecordReader = vectorizedRecordReader;
-    this.columnHandles = columnHandles;
-    this.readSupport = readSupport;
-    this.totalBytes = 0;
-  }
-
-  @Override public long getCompletedBytes() {
-    return totalBytes;
-  }
-
-  @Override public long getReadTimeNanos() {
-    return nanoStart > 0L ? (nanoEnd == 0 ? System.nanoTime() : nanoEnd) - nanoStart : 0L;
-  }
-
-  @Override public Type getType(int field) {
-
-    checkArgument(field < columnHandles.size(), "Invalid field index");
-    return columnHandles.get(field).getColumnType();
-  }
-
-  /**
-   * get next Row/Page
-   */
-  @Override public boolean advanceNextPosition() {
-
-    if (nanoStart == 0) {
-      nanoStart = System.nanoTime();
-    }
-    return false;
-  }
-
-  @Override public boolean getBoolean(int field) {
-    checkFieldType(field, BOOLEAN);
-    return (Boolean) getFieldValue(field);
-  }
-
-  @Override public long getLong(int field) {
-    Object obj = getFieldValue(field);
-    Long timeStr = 0L;
-    if (obj instanceof Integer) {
-      timeStr = ((Integer) obj).longValue();
-    } else if (obj instanceof Long) {
-      timeStr = (Long) obj;
-    } else {
-      timeStr = Math.round(Double.parseDouble(obj.toString()));
-    }
-    Type actual = getType(field);
-
-    if (actual instanceof TimestampType) {
-      return new Timestamp(timeStr).getTime() / 1000;
-    } else if (isShortDecimal(actual)) {
-      return shortDecimalPartitionKey(obj.toString(), (DecimalType) actual,
-          columnHandles.get(field).getColumnName());
-    }
-    //suppose the
-    return timeStr;
-  }
-
-  @Override public double getDouble(int field) {
-    checkFieldType(field, DOUBLE);
-    return (Double) getFieldValue(field);
-  }
-
-  @Override public Slice getSlice(int field) {
-    Type decimalType = getType(field);
-    if (decimalType instanceof DecimalType) {
-      DecimalType actual = (DecimalType) decimalType;
-      CarbondataColumnHandle carbondataColumnHandle = columnHandles.get(field);
-      if (carbondataColumnHandle.getPrecision() > 0) {
-        checkFieldType(field, DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
-            carbondataColumnHandle.getScale()));
-      } else {
-        checkFieldType(field, DecimalType.createDecimalType());
-      }
-      Object fieldValue = getFieldValue(field);
-      BigDecimal bigDecimalValue = new BigDecimal(fieldValue.toString());
-      if (isShortDecimal(decimalType)) {
-        return utf8Slice(Decimals.toString(bigDecimalValue.longValue(), actual.getScale()));
-      } else {
-        if (bigDecimalValue.scale() > actual.getScale()) {
-          BigInteger unscaledDecimal =
-              rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(),
-                  bigDecimalValue.scale());
-          Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
-          return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
-          //return decimalSlice;
-        } else {
-          BigInteger unscaledDecimal =
-              rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(), actual.getScale());
-          Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
-          return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
-          //return decimalSlice;
-
-        }
-
-      }
-    } else {
-      checkFieldType(field, VARCHAR);
-      return utf8Slice(getFieldValue(field).toString());
-    }
-  }
-
-  @Override public Object getObject(int field) {
-    return null;
-  }
-
-  @Override public boolean isNull(int field) {
-    checkArgument(field < columnHandles.size(), "Invalid field index");
-    return getFieldValue(field) == null;
-  }
-
-  Object getFieldValue(int field) {
-    checkState(fields != null, "Cursor has not been advanced yet");
-    return fields[field];
-  }
-
-  private void checkFieldType(int field, Type expected) {
-    Type actual = getType(field);
-    checkArgument(actual.equals(expected), "Expected field %s to be type %s but is %s", field,
-        expected, actual);
-  }
-
-  @Override public void close() {
-    nanoEnd = System.nanoTime();
-
-    //todo  delete cache from readSupport
-  }
-
-  public PrestoCarbonVectorizedRecordReader getVectorizedRecordReader() {
-    return vectorizedRecordReader;
-  }
-
-  public CarbonDictionaryDecodeReadSupport getReadSupport() {
-    return readSupport;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
deleted file mode 100755
index 286ff0e..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.datastore.block.TableBlockInfo;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.executor.QueryExecutor;
-import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
-import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.presto.impl.CarbonLocalInputSplit;
-
-import com.facebook.presto.spi.ConnectorSession;
-import com.facebook.presto.spi.ConnectorSplit;
-import com.facebook.presto.spi.RecordCursor;
-import com.facebook.presto.spi.RecordSet;
-import com.facebook.presto.spi.type.Type;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-
-import static org.apache.carbondata.presto.Types.checkType;
-
-public class CarbondataRecordSet implements RecordSet {
-
-  private QueryModel queryModel;
-  private CarbondataSplit split;
-  private List<CarbondataColumnHandle> columns;
-  private QueryExecutor queryExecutor;
-
-  private CarbonDictionaryDecodeReadSupport readSupport;
-
-  public CarbondataRecordSet(CarbonTable carbonTable, ConnectorSession session,
-      ConnectorSplit split, List<CarbondataColumnHandle> columns, QueryModel queryModel,
-      TaskAttemptContext taskAttemptContext) {
-    this.split = checkType(split, CarbondataSplit.class, "connectorSplit");
-    this.queryModel = queryModel;
-    this.columns = columns;
-    this.readSupport = new CarbonDictionaryDecodeReadSupport();
-  }
-
-  @Override public List<Type> getColumnTypes() {
-    return columns.stream().map(a -> a.getColumnType()).collect(Collectors.toList());
-  }
-
-  /**
-   * get data blocks via Carbondata QueryModel API.
-   */
-  @Override public RecordCursor cursor() {
-    CarbonLocalInputSplit carbonLocalInputSplit = split.getLocalInputSplit();
-    List<CarbonInputSplit> splitList = new ArrayList<>(1);
-    splitList.add(CarbonLocalInputSplit.convertSplit(carbonLocalInputSplit));
-    List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
-    queryModel.setTableBlockInfos(tableBlockInfoList);
-    queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
-    try {
-
-      readSupport
-          .initialize(queryModel.getProjectionColumns(), queryModel.getTable());
-      CarbonIterator iterator = queryExecutor.execute(queryModel);
-      PrestoCarbonVectorizedRecordReader vectorReader =
-          new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel,
-              (AbstractDetailQueryResultIterator) iterator);
-      return new CarbondataRecordCursor(readSupport, vectorReader, columns, split);
-    } catch (QueryExecutionException e) {
-      throw new RuntimeException(e.getMessage(), e);
-    } catch (Exception ex) {
-      throw new RuntimeException(ex.getMessage(), ex);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
deleted file mode 100755
index 8f7e88c..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import javax.inject.Inject;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.scan.expression.Expression;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
-import org.apache.carbondata.presto.impl.CarbonLocalInputSplit;
-import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
-import org.apache.carbondata.presto.impl.CarbonTableReader;
-
-import com.facebook.presto.spi.ColumnHandle;
-import com.facebook.presto.spi.ConnectorSession;
-import com.facebook.presto.spi.ConnectorSplit;
-import com.facebook.presto.spi.RecordSet;
-import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
-import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskAttemptContextImpl;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskType;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.Objects.requireNonNull;
-import static org.apache.carbondata.presto.Types.checkType;
-
-public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
-
-  private final String connectorId;
-  private final CarbonTableReader carbonTableReader;
-
-  @Inject
-  public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) {
-    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
-    this.carbonTableReader = reader;
-  }
-
-  @Override public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle,
-      ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) {
-
-    CarbondataSplit carbondataSplit =
-        checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
-    checkArgument(carbondataSplit.getConnectorId().equals(connectorId),
-        "split is not for this connector");
-
-    CarbonProjection carbonProjection = new CarbonProjection();
-    // Convert all columns handles
-    ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
-    for (ColumnHandle handle : columns) {
-      handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
-      carbonProjection.addColumn(((CarbondataColumnHandle) handle).getColumnName());
-    }
-
-    CarbonTableCacheModel tableCacheModel =
-        carbonTableReader.getCarbonCache(carbondataSplit.getSchemaTableName());
-    checkNotNull(tableCacheModel, "tableCacheModel should not be null");
-    checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
-    checkNotNull(
-        tableCacheModel.carbonTable.getTableInfo(), "tableCacheModel.tableInfo should not be null");
-
-    // Build Query Model
-    CarbonTable targetTable = tableCacheModel.carbonTable;
-
-    QueryModel queryModel ;
-    TaskAttemptContextImpl hadoopAttemptContext;
-    try {
-      Configuration conf = new Configuration();
-      conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
-      String carbonTablePath = targetTable.getAbsoluteTableIdentifier().getTablePath();
-
-      conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
-      JobConf jobConf = new JobConf(conf);
-      CarbonTableInputFormat carbonTableInputFormat =
-          createInputFormat(jobConf, tableCacheModel.carbonTable,
-              PrestoFilterUtil.parseFilterExpression(carbondataSplit.getConstraints()),
-              carbonProjection);
-      hadoopAttemptContext =
-          new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
-      CarbonInputSplit carbonInputSplit =
-          CarbonLocalInputSplit.convertSplit(carbondataSplit.getLocalInputSplit());
-      queryModel = carbonTableInputFormat.createQueryModel(carbonInputSplit, hadoopAttemptContext);
-      queryModel.setVectorReader(true);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to get the Query Model ", e);
-    }
-    return new CarbondataRecordSet(targetTable, session, carbondataSplit, handles.build(),
-        queryModel, hadoopAttemptContext);
-  }
-
-  private CarbonTableInputFormat<Object> createInputFormat(Configuration conf,
-      CarbonTable carbonTable, Expression filterExpression, CarbonProjection projection) {
-
-    AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
-    CarbonTableInputFormat format = new CarbonTableInputFormat<Object>();
-    CarbonTableInputFormat
-        .setTablePath(conf, identifier.appendWithLocalPrefix(identifier.getTablePath()));
-    CarbonTableInputFormat
-        .setDatabaseName(conf, identifier.getCarbonTableIdentifier().getDatabaseName());
-    CarbonTableInputFormat
-        .setTableName(conf, identifier.getCarbonTableIdentifier().getTableName());
-  CarbonTableInputFormat.setFilterPredicates(conf, filterExpression);
-    CarbonTableInputFormat.setColumnProjection(conf, projection);
-
-    return format;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataUtil.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataUtil.java
deleted file mode 100644
index 52c8920..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataUtil.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.math.BigDecimal;
-
-import com.facebook.presto.spi.PrestoException;
-import com.facebook.presto.spi.type.DecimalType;
-
-import static java.lang.String.format;
-import static java.math.BigDecimal.ROUND_UNNECESSARY;
-import static org.apache.carbondata.presto.CarbondataErrorCode.CARBON_INVALID_TYPE_VALUE;
-
-public class CarbondataUtil {
-
-  public static long shortDecimalPartitionKey(String value, DecimalType type, String name) {
-    return decimalPartitionKey(value, type, name).unscaledValue().longValue();
-  }
-
-  private static BigDecimal decimalPartitionKey(String value, DecimalType type, String name) {
-    try {
-      BigDecimal decimal = new BigDecimal(value);
-      decimal = decimal.setScale(type.getScale(), ROUND_UNNECESSARY);
-      if (decimal.precision() > type.getPrecision()) {
-        throw new PrestoException(CARBON_INVALID_TYPE_VALUE,
-            format("Invalid type value '%s' for %s type key: %s", value, type.toString(), name));
-      }
-      return decimal;
-    } catch (NumberFormatException e) {
-      throw new PrestoException(CARBON_INVALID_TYPE_VALUE,
-          format("Invalid type value '%s' for %s type key: %s", value, type.toString(), name));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
index 2a45e72..4507425 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
@@ -49,14 +49,16 @@ public class BooleanStreamReader extends AbstractStreamReader {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if(columnVector.anyNullsSet()) {
-          handleNullInVector(type, numberOfRows, builder);
-        }
-        else {
-          populateVector(type, numberOfRows, builder);
+        if (isDictionary) {
+          populateDictionaryVector(type, numberOfRows, builder);
+        } else {
+          if (columnVector.anyNullsSet()) {
+            handleNullInVector(type, numberOfRows, builder);
+          } else {
+            populateVector(type, numberOfRows, builder);
+          }
         }
       }
-
     } else {
       numberOfRows = streamData.length;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
@@ -81,24 +83,24 @@ public class BooleanStreamReader extends AbstractStreamReader {
   }
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-    if(isDictionary) {
-      for (int i = 0; i < numberOfRows; i++) {
-        int value = (int) columnVector.getData(i);
-        Object data = DataTypeUtil
-            .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.BOOLEAN);
-        if (data != null) {
-          type.writeBoolean(builder,(boolean) data);
-        } else {
-          builder.appendNull();
-        }
-      }
-    }
-    else {
       for (int i = 0; i < numberOfRows; i++) {
         type.writeBoolean(builder, byteToBoolean(columnVector.getData(i)));
       }
+  }
+
+  private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
+    for (int i = 0; i < numberOfRows; i++) {
+      int value = (int) columnVector.getData(i);
+      Object data = DataTypeUtil
+          .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.BOOLEAN);
+      if (data != null) {
+        type.writeBoolean(builder,(boolean) data);
+      } else {
+        builder.appendNull();
+      }
     }
   }
+
   private Boolean byteToBoolean(Object value){
     byte byteValue = (byte)value;
     return byteValue == 1;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
index 7968ae6..23db769 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
@@ -59,10 +59,14 @@ public class DoubleStreamReader extends AbstractStreamReader {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if (columnVector.anyNullsSet()) {
-          handleNullInVector(type, numberOfRows, builder);
+        if(isDictionary) {
+          populateDictionaryVector(type, numberOfRows, builder);
         } else {
-          populateVector(type, numberOfRows, builder);
+          if (columnVector.anyNullsSet()) {
+            handleNullInVector(type, numberOfRows, builder);
+          } else {
+            populateVector(type, numberOfRows, builder);
+          }
         }
       }
     } else {
@@ -89,23 +93,21 @@ public class DoubleStreamReader extends AbstractStreamReader {
   }
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-    if (isDictionary) {
-      for (int i = 0; i < numberOfRows; i++) {
-        int value = (int) columnVector.getData(i);
-        Object data = DataTypeUtil
-            .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.DOUBLE);
-        if (data != null) {
-          type.writeDouble(builder, (Double) data);
-        } else {
-          builder.appendNull();
-        }
+    for (int i = 0; i < numberOfRows; i++) {
+      type.writeDouble(builder, (Double) columnVector.getData(i));
+    }
+  }
 
-      }
-    } else {
-      for (int i = 0; i < numberOfRows; i++) {
-        type.writeDouble(builder, (Double) columnVector.getData(i));
+  private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
+    for (int i = 0; i < numberOfRows; i++) {
+      int value = (int) columnVector.getData(i);
+      Object data = DataTypeUtil
+          .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.DOUBLE);
+      if (data != null) {
+        type.writeDouble(builder, (Double) data);
+      } else {
+        builder.appendNull();
       }
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
index 5b20925..7ddd181 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
@@ -49,10 +49,14 @@ public class IntegerStreamReader extends AbstractStreamReader {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if (columnVector.anyNullsSet()) {
-          handleNullInVector(type, numberOfRows, builder);
+        if(isDictionary) {
+          populateDictionaryVector(type, numberOfRows, builder);
         } else {
-          populateVector(type, numberOfRows, builder);
+          if (columnVector.anyNullsSet()) {
+            handleNullInVector(type, numberOfRows, builder);
+          } else {
+            populateVector(type, numberOfRows, builder);
+          }
         }
       }
     } else {
@@ -78,7 +82,13 @@ public class IntegerStreamReader extends AbstractStreamReader {
   }
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-    if (isDictionary) {
+      for (int i = 0; i < numberOfRows; i++) {
+        Integer value = (Integer) columnVector.getData(i);
+        type.writeLong(builder, value.longValue());
+      }
+  }
+
+  private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
       for (int i = 0; i < numberOfRows; i++) {
         int value = (int) columnVector.getData(i);
         Object data = DataTypeUtil
@@ -90,11 +100,5 @@ public class IntegerStreamReader extends AbstractStreamReader {
           builder.appendNull();
         }
       }
-    } else {
-      for (int i = 0; i < numberOfRows; i++) {
-        Integer value = (Integer) columnVector.getData(i);
-        type.writeLong(builder, value.longValue());
-      }
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
index ab68889..015ac80 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
@@ -49,6 +49,9 @@ public class LongStreamReader extends AbstractStreamReader {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
+        if(isDictionary) {
+          populateDictionaryVector(type, numberOfRows, builder);
+        }
         if (columnVector.anyNullsSet()) {
           handleNullInVector(type, numberOfRows, builder);
         } else {
@@ -79,7 +82,12 @@ public class LongStreamReader extends AbstractStreamReader {
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
     for (int i = 0; i < numberOfRows; i++) {
-      if (isDictionary) {
+        type.writeLong(builder, (long) columnVector.getData(i));
+    }
+  }
+
+  private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
+    for (int i = 0; i < numberOfRows; i++) {
         int value = (int) columnVector.getData(i);
         Object data = DataTypeUtil
             .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.LONG);
@@ -88,10 +96,6 @@ public class LongStreamReader extends AbstractStreamReader {
         } else {
           builder.appendNull();
         }
-      } else {
-        type.writeLong(builder, (long) columnVector.getData(i));
       }
     }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
index 4a28761..00e5485 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
@@ -49,14 +49,17 @@ public class ShortStreamReader extends AbstractStreamReader {
       numberOfRows = batchSize;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (columnVector != null) {
-        if (columnVector.anyNullsSet()) {
-          handleNullInVector(type, numberOfRows, builder);
+        if(isDictionary) {
+          populateDictionaryVector(type, numberOfRows, builder);
         } else {
-          populateVector(type, numberOfRows, builder);
+          if (columnVector.anyNullsSet()) {
+            handleNullInVector(type, numberOfRows, builder);
+          } else {
+            populateVector(type, numberOfRows, builder);
+          }
         }
       }
-
-    } else {
+   } else {
       numberOfRows = streamData.length;
       builder = type.createBlockBuilder(new BlockBuilderStatus(), numberOfRows);
       if (streamData != null) {
@@ -65,7 +68,6 @@ public class ShortStreamReader extends AbstractStreamReader {
         }
       }
     }
-
     return builder.build();
   }
 
@@ -80,20 +82,20 @@ public class ShortStreamReader extends AbstractStreamReader {
   }
 
   private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
-    if (isDictionary) {
-      for (int i = 0; i < numberOfRows; i++) {
-        int value = (int) columnVector.getData(i);
-        Object data = DataTypeUtil
-            .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.SHORT);
-        if (data != null) {
-          type.writeLong(builder, (Short) data);
-        } else {
-          builder.appendNull();
-        }
-      }
-    } else {
-      for (int i = 0; i < numberOfRows; i++) {
-        type.writeLong(builder, (Short) columnVector.getData(i));
+    for (int i = 0; i < numberOfRows; i++) {
+      type.writeLong(builder, (Short) columnVector.getData(i));
+    }
+  }
+
+  private void populateDictionaryVector(Type type, int numberOfRows, BlockBuilder builder) {
+    for (int i = 0; i < numberOfRows; i++) {
+      int value = (int) columnVector.getData(i);
+      Object data = DataTypeUtil
+          .getDataBasedOnDataType(dictionary.getDictionaryValueForKey(value), DataTypes.SHORT);
+      if (data != null) {
+        type.writeLong(builder, (Short) data);
+      } else {
+        builder.appendNull();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ad621df/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index 00b4bce..d98afa3 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -19,6 +19,9 @@ package org.apache.carbondata.presto.readers;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
 import com.facebook.presto.spi.block.Block;
 import com.facebook.presto.spi.block.BlockBuilder;
 import com.facebook.presto.spi.block.BlockBuilderStatus;
@@ -69,12 +72,10 @@ public class SliceStreamReader extends AbstractStreamReader {
           }
           return new DictionaryBlock(batchSize, dictionarySliceArrayBlock, values);
         } else {
-          for (int i = 0; i < numberOfRows; i++) {
-            if (columnVector.isNullAt(i)) {
-              builder.appendNull();
-            } else {
-              type.writeSlice(builder, wrappedBuffer((byte[]) columnVector.getData(i)));
-            }
+          if(columnVector.anyNullsSet()) {
+            handleNullInVector(type, numberOfRows, builder);
+          } else {
+            populateVector(type, numberOfRows, builder);
           }
         }
       }
@@ -91,4 +92,23 @@ public class SliceStreamReader extends AbstractStreamReader {
     return builder.build();
   }
 
+  private void handleNullInVector(Type type, int numberOfRows, BlockBuilder builder) {
+    for (int i = 0; i < numberOfRows; i++) {
+      if (columnVector.isNullAt(i)) {
+        builder.appendNull();
+      } else {
+        type.writeSlice(builder, wrappedBuffer((byte[]) columnVector.getData(i)));
+      }
+    }
+  }
+
+  private void populateVector(Type type, int numberOfRows, BlockBuilder builder) {
+    for (int i = 0; i < numberOfRows; i++) {
+      type.writeSlice(builder, wrappedBuffer((byte[]) columnVector.getData(i)));
+    }
+  }
+
+
+
+
 }