You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/01/02 13:29:48 UTC

carbondata git commit: [CARBONDATA-3218] Fix schema refresh and wrong query result issues in presto.

Repository: carbondata
Updated Branches:
  refs/heads/master 7477527e9 -> f8697b106


[CARBONDATA-3218] Fix schema refresh and wrong query result issues in presto.

Problem:
Schema which is updated in spark is not reflecting in presto. which results in wrong query result in presto.

Solution:
Update the schema in presto whenever the schema changed in spark. And also override the putNulls method in all presto readers to work for null data scenarios.

This closes #3041


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f8697b10
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f8697b10
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f8697b10

Branch: refs/heads/master
Commit: f8697b1065cd76e3b96be571fd78761a44a58e7e
Parents: 7477527
Author: ravipesala <ra...@gmail.com>
Authored: Mon Dec 31 17:20:24 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Wed Jan 2 18:59:05 2019 +0530

----------------------------------------------------------------------
 .../presto/CarbondataPageSourceProvider.java    |   7 +-
 .../presto/CarbondataSplitManager.java          |  65 +++++-----
 .../presto/impl/CarbonTableCacheModel.java      |  29 ++++-
 .../presto/impl/CarbonTableReader.java          | 119 ++++++++-----------
 .../presto/readers/BooleanStreamReader.java     |   6 +
 .../readers/DecimalSliceStreamReader.java       |  12 ++
 .../presto/readers/DoubleStreamReader.java      |  12 ++
 .../presto/readers/IntegerStreamReader.java     |  12 ++
 .../presto/readers/LongStreamReader.java        |  12 ++
 .../presto/readers/ObjectStreamReader.java      |   6 +
 .../presto/readers/ShortStreamReader.java       |  12 ++
 .../presto/readers/SliceStreamReader.java       |  24 +++-
 .../presto/readers/TimestampStreamReader.java   |  12 ++
 13 files changed, 215 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index d7b7266..c81e0c3 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -230,10 +230,11 @@ public class CarbondataPageSourceProvider extends HivePageSourceProvider {
         .getCarbonCache(new SchemaTableName(carbonSplit.getDatabase(), carbonSplit.getTable()),
             carbonSplit.getSchema().getProperty("tablePath"), configuration);
     checkNotNull(tableCacheModel, "tableCacheModel should not be null");
-    checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
-    checkNotNull(tableCacheModel.carbonTable.getTableInfo(),
+    checkNotNull(tableCacheModel.getCarbonTable(),
+        "tableCacheModel.carbonTable should not be null");
+    checkNotNull(tableCacheModel.getCarbonTable().getTableInfo(),
         "tableCacheModel.carbonTable.tableInfo should not be null");
-    return tableCacheModel.carbonTable;
+    return tableCacheModel.getCarbonTable();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
index ded00fc..6efef93 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -119,45 +119,40 @@ public class CarbondataSplitManager extends HiveSplitManager {
     configuration = carbonTableReader.updateS3Properties(configuration);
     CarbonTableCacheModel cache =
         carbonTableReader.getCarbonCache(schemaTableName, location, configuration);
-    if (null != cache) {
-      Expression filters = PrestoFilterUtil.parseFilterExpression(predicate);
-      try {
-
-        List<CarbonLocalMultiBlockSplit> splits =
-            carbonTableReader.getInputSplits2(cache, filters, predicate, configuration);
-
-        ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
-        long index = 0;
-        for (CarbonLocalMultiBlockSplit split : splits) {
-          index++;
-          Properties properties = new Properties();
-          for (Map.Entry<String, String> entry : table.getStorage().getSerdeParameters()
-              .entrySet()) {
-            properties.setProperty(entry.getKey(), entry.getValue());
-          }
-          properties.setProperty("tablePath", cache.carbonTable.getTablePath());
-          properties.setProperty("carbonSplit", split.getJsonString());
-          properties.setProperty("queryId", queryId);
-          properties.setProperty("index", String.valueOf(index));
-          cSplits.add(
-              new HiveSplit(schemaTableName.getSchemaName(), schemaTableName.getTableName(),
-                  schemaTableName.getTableName(), "", 0, 0, 0, properties, new ArrayList(),
-                  getHostAddresses(split.getLocations()), OptionalInt.empty(), false, predicate,
-                  new HashMap<>(), Optional.empty()));
+    Expression filters = PrestoFilterUtil.parseFilterExpression(predicate);
+    try {
+
+      List<CarbonLocalMultiBlockSplit> splits =
+          carbonTableReader.getInputSplits2(cache, filters, predicate, configuration);
+
+      ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
+      long index = 0;
+      for (CarbonLocalMultiBlockSplit split : splits) {
+        index++;
+        Properties properties = new Properties();
+        for (Map.Entry<String, String> entry : table.getStorage().getSerdeParameters().entrySet()) {
+          properties.setProperty(entry.getKey(), entry.getValue());
         }
+        properties.setProperty("tablePath", cache.getCarbonTable().getTablePath());
+        properties.setProperty("carbonSplit", split.getJsonString());
+        properties.setProperty("queryId", queryId);
+        properties.setProperty("index", String.valueOf(index));
+        cSplits.add(new HiveSplit(schemaTableName.getSchemaName(), schemaTableName.getTableName(),
+            schemaTableName.getTableName(), "", 0, 0, 0, properties, new ArrayList(),
+            getHostAddresses(split.getLocations()), OptionalInt.empty(), false, predicate,
+            new HashMap<>(), Optional.empty()));
+      }
 
-        statisticRecorder.logStatisticsAsTableDriver();
+      statisticRecorder.logStatisticsAsTableDriver();
 
-        statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
-            System.currentTimeMillis());
-        statisticRecorder.recordStatisticsForDriver(statistic, queryId);
-        statisticRecorder.logStatisticsAsTableDriver();
-        return new FixedSplitSource(cSplits.build());
-      } catch (Exception ex) {
-        throw new RuntimeException(ex.getMessage(), ex);
-      }
+      statistic
+          .addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION, System.currentTimeMillis());
+      statisticRecorder.recordStatisticsForDriver(statistic, queryId);
+      statisticRecorder.logStatisticsAsTableDriver();
+      return new FixedSplitSource(cSplits.build());
+    } catch (Exception ex) {
+      throw new RuntimeException(ex.getMessage(), ex);
     }
-    return null;
   }
 
   private static List<HostAddress> getHostAddresses(String[] hosts) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
index 4984406..2f0d7b3 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
@@ -25,10 +25,35 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
  */
 public class CarbonTableCacheModel {
 
-  public CarbonTable carbonTable;
+  private long lastUpdatedTime;
+
+  private boolean isValid;
+
+  private CarbonTable carbonTable;
+
+  public CarbonTableCacheModel(long lastUpdatedTime, CarbonTable carbonTable) {
+    this.lastUpdatedTime = lastUpdatedTime;
+    this.carbonTable = carbonTable;
+    this.isValid = true;
+  }
+
+  public void setCurrentSchemaTime(long currentSchemaTime) {
+    if (lastUpdatedTime != currentSchemaTime) {
+      isValid = false;
+    }
+    this.lastUpdatedTime = currentSchemaTime;
+  }
+
+  public CarbonTable getCarbonTable() {
+    return carbonTable;
+  }
 
   public boolean isValid() {
-    return carbonTable != null;
+    return isValid;
   }
 
+  public void setCarbonTable(CarbonTable carbonTable) {
+    this.carbonTable = carbonTable;
+    this.isValid = true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 7ad6568..5ede272 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -20,19 +20,19 @@ package org.apache.carbondata.presto.impl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -57,13 +57,11 @@ import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 import org.apache.carbondata.presto.PrestoFilterUtil;
 
 import com.facebook.presto.hadoop.$internal.com.google.gson.Gson;
-import com.facebook.presto.hadoop.$internal.io.netty.util.internal.ConcurrentSet;
 import com.facebook.presto.hadoop.$internal.org.apache.commons.collections.CollectionUtils;
 import com.facebook.presto.hive.HiveColumnHandle;
 import com.facebook.presto.spi.SchemaTableName;
 import com.facebook.presto.spi.predicate.TupleDomain;
 import com.google.inject.Inject;
-import org.apache.commons.lang.time.DateUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
@@ -97,14 +95,10 @@ public class CarbonTableReader {
   };
   public CarbonTableConfig config;
   /**
-   * The names of the tables under the schema (this.carbonFileList).
-   */
-  private ConcurrentSet<SchemaTableName> tableList;
-  /**
    * A cache for Carbon reader, with this cache,
    * metadata of a table is only read from file system once.
    */
-  private AtomicReference<HashMap<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+  private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
 
   private String queryId;
 
@@ -121,8 +115,7 @@ public class CarbonTableReader {
 
   @Inject public CarbonTableReader(CarbonTableConfig config) {
     this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
-    this.carbonCache = new AtomicReference(new HashMap());
-    tableList = new ConcurrentSet<>();
+    this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
     populateCarbonProperties();
   }
 
@@ -134,23 +127,12 @@ public class CarbonTableReader {
    */
   public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
       Configuration config) {
-    if (!carbonCache.get().containsKey(table) || carbonCache.get().get(table) == null) {
-      updateSchemaTables(table, config);
-      parseCarbonMetadata(table, location, config);
-    }
-    if (carbonCache.get().containsKey(table)) {
-      return carbonCache.get().get(table);
-    } else {
-      return null;
+    updateSchemaTables(table, config);
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+    if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+      return parseCarbonMetadata(table, location, config);
     }
-  }
-
-  private void removeTableFromCache(SchemaTableName table) {
-    DataMapStoreManager.getInstance()
-        .clearDataMaps(carbonCache.get().get(table).carbonTable.getAbsoluteTableIdentifier());
-    carbonCache.get().remove(table);
-    tableList.remove(table);
-
+    return carbonTableCacheModel;
   }
 
   /**
@@ -159,22 +141,19 @@ public class CarbonTableReader {
    * is called, it clears this.tableList and populate the list by reading the files.
    */
   private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
-    // update logic determine later
-    boolean isKeyExists = carbonCache.get().containsKey(schemaTableName);
-
-    if (isKeyExists) {
-      CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
-      if (carbonTableCacheModel != null && carbonTableCacheModel.carbonTable.getTableInfo() != null
-          && carbonTableCacheModel.carbonTable.isTransactionalTable()) {
-        Long latestTime = FileFactory.getCarbonFile(CarbonTablePath
-                .getSchemaFilePath(
-                    carbonCache.get().get(schemaTableName).carbonTable.getTablePath()),
-            config).getLastModifiedTime();
-        Long oldTime = carbonTableCacheModel.carbonTable.getTableInfo().getLastUpdatedTime();
-        if (DateUtils.truncate(new Date(latestTime), Calendar.MINUTE)
-            .after(DateUtils.truncate(new Date(oldTime), Calendar.MINUTE))) {
-          removeTableFromCache(schemaTableName);
-        }
+    CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+    if (carbonTableCacheModel != null &&
+        carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+      CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+      long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+              .getSchemaFilePath(
+                  carbonTable.getTablePath()),
+          config).getLastModifiedTime();
+      carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+      if (!carbonTableCacheModel.isValid()) {
+        // Invalidate datamaps
+        DataMapStoreManager.getInstance()
+            .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
       }
     }
   }
@@ -184,26 +163,22 @@ public class CarbonTableReader {
    * and cache it in this.carbonCache (CarbonTableReader cache).
    *
    * @param table name of the given table.
-   * @return the CarbonTable instance which contains all the needed metadata for a table.
+   * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
    */
-  private CarbonTable parseCarbonMetadata(SchemaTableName table, String tablePath,
+  private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
       Configuration config) {
-    CarbonTable result;
     try {
       CarbonTableCacheModel cache = carbonCache.get().get(table);
-      if (cache == null) {
-        cache = new CarbonTableCacheModel();
+      if (cache != null && cache.isValid()) {
+        return cache;
       }
-      if (cache.isValid()) {
-        return cache.carbonTable;
-      }
-      // If table is not previously cached, then:
-
       // Step 1: get store path of the table and cache it.
-      String metadataPath = CarbonTablePath.getSchemaFilePath(tablePath);
+      String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath);
       // If metadata folder exists, it is a transactional table
-      boolean isTransactionalTable = FileFactory.getCarbonFile(metadataPath, config).exists();
+      CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+      boolean isTransactionalTable = schemaFile.exists();
       org.apache.carbondata.format.TableInfo tableInfo;
+      long modifiedTime = System.currentTimeMillis();
       if (isTransactionalTable) {
         //Step 2: read the metadata (tableInfo) of the table.
         ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
@@ -216,10 +191,11 @@ public class CarbonTableReader {
           }
         };
         ThriftReader thriftReader =
-            new ThriftReader(CarbonTablePath.getSchemaFilePath(tablePath), createTBase, config);
+            new ThriftReader(schemaFilePath, createTBase, config);
         thriftReader.open();
         tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
         thriftReader.close();
+        modifiedTime = schemaFile.getLastModifiedTime();
       } else {
         tableInfo = CarbonUtil
             .inferSchema(tablePath, table.getTableName(), false, config);
@@ -234,21 +210,24 @@ public class CarbonTableReader {
 
       wrapperTableInfo.setTransactionalTable(isTransactionalTable);
 
+      CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
       // Step 4: Load metadata info into CarbonMetadata
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
-
-      cache.carbonTable =
-          CarbonMetadata.getInstance().getCarbonTable(table.getSchemaName(), table.getTableName());
-
-      // cache the table
-      carbonCache.get().put(table, cache);
-
-      result = cache.carbonTable;
+      CarbonTable carbonTable = Objects.requireNonNull(
+          CarbonMetadata.getInstance().getCarbonTable(table.getSchemaName(), table.getTableName()),
+          "carbontable is null");
+      // If table is not previously cached, then:
+      if (cache == null) {
+        cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+        // cache the table
+        carbonCache.get().put(table, cache);
+      } else {
+        cache.setCarbonTable(carbonTable);
+      }
+      return cache;
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
-
-    return result;
   }
 
   public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
@@ -256,8 +235,8 @@ public class CarbonTableReader {
       throws IOException {
     List<CarbonLocalInputSplit> result = new ArrayList<>();
     List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
-    CarbonTable carbonTable = tableCacheModel.carbonTable;
-    TableInfo tableInfo = tableCacheModel.carbonTable.getTableInfo();
+    CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+    TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
     config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
     String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
@@ -268,7 +247,7 @@ public class CarbonTableReader {
     CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
 
     JobConf jobConf = new JobConf(config);
-    List<PartitionSpec> filteredPartitions = new ArrayList();
+    List<PartitionSpec> filteredPartitions = new ArrayList<>();
 
     PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
     LoadMetadataDetails[] loadMetadataDetails = null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
index 481ab27..0eee58a 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
@@ -82,6 +82,12 @@ public class BooleanStreamReader extends CarbonColumnVectorImpl
     builder.appendNull();
   }
 
+  @Override public void putNulls(int rowId, int count) {
+    for (int i = 0; i < count; i++) {
+      builder.appendNull();
+    }
+  }
+
   @Override public void reset() {
     builder = type.createBlockBuilder(null, batchSize);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
index 7bbf1ca..2976ca7 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
@@ -89,6 +89,18 @@ public class DecimalSliceStreamReader extends CarbonColumnVectorImpl
     decimalBlockWriter(value);
   }
 
+  @Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
+    for (int i = 0; i < count; i++) {
+      putDecimal(rowId++, value, precision);
+    }
+  }
+
+  @Override public void putNulls(int rowId, int count) {
+    for (int i = 0; i < count; i++) {
+      builder.appendNull();
+    }
+  }
+
   @Override public void putNull(int rowId) {
     builder.appendNull();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
index 563f1b7..ed9a202 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
@@ -70,10 +70,22 @@ public class DoubleStreamReader extends CarbonColumnVectorImpl implements Presto
     type.writeDouble(builder, value);
   }
 
+  @Override public void putDoubles(int rowId, int count, double value) {
+    for (int i = 0; i < count; i++) {
+      type.writeDouble(builder, value);
+    }
+  }
+
   @Override public void putNull(int rowId) {
     builder.appendNull();
   }
 
+  @Override public void putNulls(int rowId, int count) {
+    for (int i = 0; i < count; i++) {
+      builder.appendNull();
+    }
+  }
+
   @Override public void reset() {
     builder = type.createBlockBuilder(null, batchSize);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
index 6e15da6..52ddbb2 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
@@ -68,10 +68,22 @@ public class IntegerStreamReader extends CarbonColumnVectorImpl
     }
   }
 
+  @Override public void putInts(int rowId, int count, int value) {
+    for (int i = 0; i < count; i++) {
+      putInt(rowId++, value);
+    }
+  }
+
   @Override public void putNull(int rowId) {
     builder.appendNull();
   }
 
+  @Override public void putNulls(int rowId, int count) {
+    for (int i = 0; i < count; i++) {
+      builder.appendNull();
+    }
+  }
+
   @Override public void reset() {
     builder = type.createBlockBuilder(null, batchSize);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
index 494344c..81fdf88 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
@@ -67,6 +67,12 @@ public class LongStreamReader extends CarbonColumnVectorImpl implements PrestoVe
     type.writeLong(builder, value);
   }
 
+  @Override public void putLongs(int rowId, int count, long value) {
+    for (int i = 0; i < count; i++) {
+      type.writeLong(builder, value);
+    }
+  }
+
   @Override public void putNull(int rowId) {
     builder.appendNull();
   }
@@ -74,4 +80,10 @@ public class LongStreamReader extends CarbonColumnVectorImpl implements PrestoVe
   @Override public void reset() {
     builder = type.createBlockBuilder(null, batchSize);
   }
+
+  @Override public void putNulls(int rowId, int count) {
+    for (int i = 0; i < count; i++) {
+      builder.appendNull();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
index cdba4bf..a67071d 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
@@ -62,4 +62,10 @@ public class ObjectStreamReader extends CarbonColumnVectorImpl implements Presto
     builder = type.createBlockBuilder(null, batchSize);
   }
 
+  @Override public void putNulls(int rowId, int count) {
+    for (int i = 0; i < count; i++) {
+      builder.appendNull();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
index 198f82a..7411513 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
@@ -67,10 +67,22 @@ public class ShortStreamReader extends CarbonColumnVectorImpl implements PrestoV
     type.writeLong(builder, value);
   }
 
+  @Override public void putShorts(int rowId, int count, short value) {
+    for (int i = 0; i < count; i++) {
+      type.writeLong(builder, value);
+    }
+  }
+
   @Override public void putNull(int rowId) {
     builder.appendNull();
   }
 
+  @Override public void putNulls(int rowId, int count) {
+    for (int i = 0; i < count; i++) {
+      builder.appendNull();
+    }
+  }
+
   @Override public void reset() {
     builder = type.createBlockBuilder(null, batchSize);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index d9c7ad3..0d4b4f0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -106,14 +106,24 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
     values[rowId] = value;
   }
 
+  @Override public void putInts(int rowId, int count, int value) {
+    for (int i = 0; i < count; i++) {
+      values[rowId++] = value;
+    }
+  }
+
   @Override public void putByteArray(int rowId, byte[] value) {
     type.writeSlice(builder, wrappedBuffer(value));
   }
 
   @Override public void putByteArray(int rowId, int offset, int length, byte[] value) {
-    byte[] byteArr = new byte[length];
-    System.arraycopy(value, offset, byteArr, 0, length);
-    type.writeSlice(builder, wrappedBuffer(byteArr));
+    type.writeSlice(builder, wrappedBuffer(value), offset, length);
+  }
+
+  @Override public void putByteArray(int rowId, int count, byte[] value) {
+    for (int i = 0; i < count; i++) {
+      type.writeSlice(builder, wrappedBuffer(value));
+    }
   }
 
   @Override public void putNull(int rowId) {
@@ -122,6 +132,14 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
     }
   }
 
+  @Override public void putNulls(int rowId, int count) {
+    if (dictionaryBlock == null) {
+      for (int i = 0; i < count; ++i) {
+        builder.appendNull();
+      }
+    }
+  }
+
   @Override public void reset() {
     builder = type.createBlockBuilder(null, batchSize);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
index 43c471a..1052a74 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
@@ -68,10 +68,22 @@ public class TimestampStreamReader extends CarbonColumnVectorImpl
     type.writeLong(builder, value / 1000);
   }
 
+  @Override public void putLongs(int rowId, int count, long value) {
+    for (int i = 0; i < count; i++) {
+      type.writeLong(builder, value / 1000);
+    }
+  }
+
   @Override public void putNull(int rowId) {
     builder.appendNull();
   }
 
+  @Override public void putNulls(int rowId, int count) {
+    for (int i = 0; i < count; ++i) {
+      builder.appendNull();
+    }
+  }
+
   @Override public void reset() {
     builder = type.createBlockBuilder(null, batchSize);
   }