You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/01/02 13:29:48 UTC
carbondata git commit: [CARBONDATA-3218] Fix schema refresh and wrong
query result issues in presto.
Repository: carbondata
Updated Branches:
refs/heads/master 7477527e9 -> f8697b106
[CARBONDATA-3218] Fix schema refresh and wrong query result issues in presto.
Problem:
Schema which is updated in spark is not reflecting in presto. which results in wrong query result in presto.
Solution:
Update the schema in presto whenever the schema changed in spark. And also override the putNulls method in all presto readers to work for null data scenarios.
This closes #3041
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f8697b10
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f8697b10
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f8697b10
Branch: refs/heads/master
Commit: f8697b1065cd76e3b96be571fd78761a44a58e7e
Parents: 7477527
Author: ravipesala <ra...@gmail.com>
Authored: Mon Dec 31 17:20:24 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Wed Jan 2 18:59:05 2019 +0530
----------------------------------------------------------------------
.../presto/CarbondataPageSourceProvider.java | 7 +-
.../presto/CarbondataSplitManager.java | 65 +++++-----
.../presto/impl/CarbonTableCacheModel.java | 29 ++++-
.../presto/impl/CarbonTableReader.java | 119 ++++++++-----------
.../presto/readers/BooleanStreamReader.java | 6 +
.../readers/DecimalSliceStreamReader.java | 12 ++
.../presto/readers/DoubleStreamReader.java | 12 ++
.../presto/readers/IntegerStreamReader.java | 12 ++
.../presto/readers/LongStreamReader.java | 12 ++
.../presto/readers/ObjectStreamReader.java | 6 +
.../presto/readers/ShortStreamReader.java | 12 ++
.../presto/readers/SliceStreamReader.java | 24 +++-
.../presto/readers/TimestampStreamReader.java | 12 ++
13 files changed, 215 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index d7b7266..c81e0c3 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -230,10 +230,11 @@ public class CarbondataPageSourceProvider extends HivePageSourceProvider {
.getCarbonCache(new SchemaTableName(carbonSplit.getDatabase(), carbonSplit.getTable()),
carbonSplit.getSchema().getProperty("tablePath"), configuration);
checkNotNull(tableCacheModel, "tableCacheModel should not be null");
- checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
- checkNotNull(tableCacheModel.carbonTable.getTableInfo(),
+ checkNotNull(tableCacheModel.getCarbonTable(),
+ "tableCacheModel.carbonTable should not be null");
+ checkNotNull(tableCacheModel.getCarbonTable().getTableInfo(),
"tableCacheModel.carbonTable.tableInfo should not be null");
- return tableCacheModel.carbonTable;
+ return tableCacheModel.getCarbonTable();
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
index ded00fc..6efef93 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -119,45 +119,40 @@ public class CarbondataSplitManager extends HiveSplitManager {
configuration = carbonTableReader.updateS3Properties(configuration);
CarbonTableCacheModel cache =
carbonTableReader.getCarbonCache(schemaTableName, location, configuration);
- if (null != cache) {
- Expression filters = PrestoFilterUtil.parseFilterExpression(predicate);
- try {
-
- List<CarbonLocalMultiBlockSplit> splits =
- carbonTableReader.getInputSplits2(cache, filters, predicate, configuration);
-
- ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
- long index = 0;
- for (CarbonLocalMultiBlockSplit split : splits) {
- index++;
- Properties properties = new Properties();
- for (Map.Entry<String, String> entry : table.getStorage().getSerdeParameters()
- .entrySet()) {
- properties.setProperty(entry.getKey(), entry.getValue());
- }
- properties.setProperty("tablePath", cache.carbonTable.getTablePath());
- properties.setProperty("carbonSplit", split.getJsonString());
- properties.setProperty("queryId", queryId);
- properties.setProperty("index", String.valueOf(index));
- cSplits.add(
- new HiveSplit(schemaTableName.getSchemaName(), schemaTableName.getTableName(),
- schemaTableName.getTableName(), "", 0, 0, 0, properties, new ArrayList(),
- getHostAddresses(split.getLocations()), OptionalInt.empty(), false, predicate,
- new HashMap<>(), Optional.empty()));
+ Expression filters = PrestoFilterUtil.parseFilterExpression(predicate);
+ try {
+
+ List<CarbonLocalMultiBlockSplit> splits =
+ carbonTableReader.getInputSplits2(cache, filters, predicate, configuration);
+
+ ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
+ long index = 0;
+ for (CarbonLocalMultiBlockSplit split : splits) {
+ index++;
+ Properties properties = new Properties();
+ for (Map.Entry<String, String> entry : table.getStorage().getSerdeParameters().entrySet()) {
+ properties.setProperty(entry.getKey(), entry.getValue());
}
+ properties.setProperty("tablePath", cache.getCarbonTable().getTablePath());
+ properties.setProperty("carbonSplit", split.getJsonString());
+ properties.setProperty("queryId", queryId);
+ properties.setProperty("index", String.valueOf(index));
+ cSplits.add(new HiveSplit(schemaTableName.getSchemaName(), schemaTableName.getTableName(),
+ schemaTableName.getTableName(), "", 0, 0, 0, properties, new ArrayList(),
+ getHostAddresses(split.getLocations()), OptionalInt.empty(), false, predicate,
+ new HashMap<>(), Optional.empty()));
+ }
- statisticRecorder.logStatisticsAsTableDriver();
+ statisticRecorder.logStatisticsAsTableDriver();
- statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
- System.currentTimeMillis());
- statisticRecorder.recordStatisticsForDriver(statistic, queryId);
- statisticRecorder.logStatisticsAsTableDriver();
- return new FixedSplitSource(cSplits.build());
- } catch (Exception ex) {
- throw new RuntimeException(ex.getMessage(), ex);
- }
+ statistic
+ .addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION, System.currentTimeMillis());
+ statisticRecorder.recordStatisticsForDriver(statistic, queryId);
+ statisticRecorder.logStatisticsAsTableDriver();
+ return new FixedSplitSource(cSplits.build());
+ } catch (Exception ex) {
+ throw new RuntimeException(ex.getMessage(), ex);
}
- return null;
}
private static List<HostAddress> getHostAddresses(String[] hosts) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
index 4984406..2f0d7b3 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
@@ -25,10 +25,35 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
*/
public class CarbonTableCacheModel {
- public CarbonTable carbonTable;
+ private long lastUpdatedTime;
+
+ private boolean isValid;
+
+ private CarbonTable carbonTable;
+
+ public CarbonTableCacheModel(long lastUpdatedTime, CarbonTable carbonTable) {
+ this.lastUpdatedTime = lastUpdatedTime;
+ this.carbonTable = carbonTable;
+ this.isValid = true;
+ }
+
+ public void setCurrentSchemaTime(long currentSchemaTime) {
+ if (lastUpdatedTime != currentSchemaTime) {
+ isValid = false;
+ }
+ this.lastUpdatedTime = currentSchemaTime;
+ }
+
+ public CarbonTable getCarbonTable() {
+ return carbonTable;
+ }
public boolean isValid() {
- return carbonTable != null;
+ return isValid;
}
+ public void setCarbonTable(CarbonTable carbonTable) {
+ this.carbonTable = carbonTable;
+ this.isValid = true;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 7ad6568..5ede272 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -20,19 +20,19 @@ package org.apache.carbondata.presto.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -57,13 +57,11 @@ import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
import org.apache.carbondata.presto.PrestoFilterUtil;
import com.facebook.presto.hadoop.$internal.com.google.gson.Gson;
-import com.facebook.presto.hadoop.$internal.io.netty.util.internal.ConcurrentSet;
import com.facebook.presto.hadoop.$internal.org.apache.commons.collections.CollectionUtils;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.google.inject.Inject;
-import org.apache.commons.lang.time.DateUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@@ -97,14 +95,10 @@ public class CarbonTableReader {
};
public CarbonTableConfig config;
/**
- * The names of the tables under the schema (this.carbonFileList).
- */
- private ConcurrentSet<SchemaTableName> tableList;
- /**
* A cache for Carbon reader, with this cache,
* metadata of a table is only read from file system once.
*/
- private AtomicReference<HashMap<SchemaTableName, CarbonTableCacheModel>> carbonCache;
+ private AtomicReference<Map<SchemaTableName, CarbonTableCacheModel>> carbonCache;
private String queryId;
@@ -121,8 +115,7 @@ public class CarbonTableReader {
@Inject public CarbonTableReader(CarbonTableConfig config) {
this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
- this.carbonCache = new AtomicReference(new HashMap());
- tableList = new ConcurrentSet<>();
+ this.carbonCache = new AtomicReference(new ConcurrentHashMap<>());
populateCarbonProperties();
}
@@ -134,23 +127,12 @@ public class CarbonTableReader {
*/
public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
Configuration config) {
- if (!carbonCache.get().containsKey(table) || carbonCache.get().get(table) == null) {
- updateSchemaTables(table, config);
- parseCarbonMetadata(table, location, config);
- }
- if (carbonCache.get().containsKey(table)) {
- return carbonCache.get().get(table);
- } else {
- return null;
+ updateSchemaTables(table, config);
+ CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(table);
+ if (carbonTableCacheModel == null || !carbonTableCacheModel.isValid()) {
+ return parseCarbonMetadata(table, location, config);
}
- }
-
- private void removeTableFromCache(SchemaTableName table) {
- DataMapStoreManager.getInstance()
- .clearDataMaps(carbonCache.get().get(table).carbonTable.getAbsoluteTableIdentifier());
- carbonCache.get().remove(table);
- tableList.remove(table);
-
+ return carbonTableCacheModel;
}
/**
@@ -159,22 +141,19 @@ public class CarbonTableReader {
* is called, it clears this.tableList and populate the list by reading the files.
*/
private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
- // update logic determine later
- boolean isKeyExists = carbonCache.get().containsKey(schemaTableName);
-
- if (isKeyExists) {
- CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
- if (carbonTableCacheModel != null && carbonTableCacheModel.carbonTable.getTableInfo() != null
- && carbonTableCacheModel.carbonTable.isTransactionalTable()) {
- Long latestTime = FileFactory.getCarbonFile(CarbonTablePath
- .getSchemaFilePath(
- carbonCache.get().get(schemaTableName).carbonTable.getTablePath()),
- config).getLastModifiedTime();
- Long oldTime = carbonTableCacheModel.carbonTable.getTableInfo().getLastUpdatedTime();
- if (DateUtils.truncate(new Date(latestTime), Calendar.MINUTE)
- .after(DateUtils.truncate(new Date(oldTime), Calendar.MINUTE))) {
- removeTableFromCache(schemaTableName);
- }
+ CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
+ if (carbonTableCacheModel != null &&
+ carbonTableCacheModel.getCarbonTable().isTransactionalTable()) {
+ CarbonTable carbonTable = carbonTableCacheModel.getCarbonTable();
+ long latestTime = FileFactory.getCarbonFile(CarbonTablePath
+ .getSchemaFilePath(
+ carbonTable.getTablePath()),
+ config).getLastModifiedTime();
+ carbonTableCacheModel.setCurrentSchemaTime(latestTime);
+ if (!carbonTableCacheModel.isValid()) {
+ // Invalidate datamaps
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(carbonTableCacheModel.getCarbonTable().getAbsoluteTableIdentifier());
}
}
}
@@ -184,26 +163,22 @@ public class CarbonTableReader {
* and cache it in this.carbonCache (CarbonTableReader cache).
*
* @param table name of the given table.
- * @return the CarbonTable instance which contains all the needed metadata for a table.
+ * @return the CarbonTableCacheModel instance which contains all the needed metadata for a table.
*/
- private CarbonTable parseCarbonMetadata(SchemaTableName table, String tablePath,
+ private CarbonTableCacheModel parseCarbonMetadata(SchemaTableName table, String tablePath,
Configuration config) {
- CarbonTable result;
try {
CarbonTableCacheModel cache = carbonCache.get().get(table);
- if (cache == null) {
- cache = new CarbonTableCacheModel();
+ if (cache != null && cache.isValid()) {
+ return cache;
}
- if (cache.isValid()) {
- return cache.carbonTable;
- }
- // If table is not previously cached, then:
-
// Step 1: get store path of the table and cache it.
- String metadataPath = CarbonTablePath.getSchemaFilePath(tablePath);
+ String schemaFilePath = CarbonTablePath.getSchemaFilePath(tablePath);
// If metadata folder exists, it is a transactional table
- boolean isTransactionalTable = FileFactory.getCarbonFile(metadataPath, config).exists();
+ CarbonFile schemaFile = FileFactory.getCarbonFile(schemaFilePath, config);
+ boolean isTransactionalTable = schemaFile.exists();
org.apache.carbondata.format.TableInfo tableInfo;
+ long modifiedTime = System.currentTimeMillis();
if (isTransactionalTable) {
//Step 2: read the metadata (tableInfo) of the table.
ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
@@ -216,10 +191,11 @@ public class CarbonTableReader {
}
};
ThriftReader thriftReader =
- new ThriftReader(CarbonTablePath.getSchemaFilePath(tablePath), createTBase, config);
+ new ThriftReader(schemaFilePath, createTBase, config);
thriftReader.open();
tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
thriftReader.close();
+ modifiedTime = schemaFile.getLastModifiedTime();
} else {
tableInfo = CarbonUtil
.inferSchema(tablePath, table.getTableName(), false, config);
@@ -234,21 +210,24 @@ public class CarbonTableReader {
wrapperTableInfo.setTransactionalTable(isTransactionalTable);
+ CarbonMetadata.getInstance().removeTable(wrapperTableInfo.getTableUniqueName());
// Step 4: Load metadata info into CarbonMetadata
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
-
- cache.carbonTable =
- CarbonMetadata.getInstance().getCarbonTable(table.getSchemaName(), table.getTableName());
-
- // cache the table
- carbonCache.get().put(table, cache);
-
- result = cache.carbonTable;
+ CarbonTable carbonTable = Objects.requireNonNull(
+ CarbonMetadata.getInstance().getCarbonTable(table.getSchemaName(), table.getTableName()),
+ "carbontable is null");
+ // If table is not previously cached, then:
+ if (cache == null) {
+ cache = new CarbonTableCacheModel(modifiedTime, carbonTable);
+ // cache the table
+ carbonCache.get().put(table, cache);
+ } else {
+ cache.setCarbonTable(carbonTable);
+ }
+ return cache;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
-
- return result;
}
public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
@@ -256,8 +235,8 @@ public class CarbonTableReader {
throws IOException {
List<CarbonLocalInputSplit> result = new ArrayList<>();
List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
- CarbonTable carbonTable = tableCacheModel.carbonTable;
- TableInfo tableInfo = tableCacheModel.carbonTable.getTableInfo();
+ CarbonTable carbonTable = tableCacheModel.getCarbonTable();
+ TableInfo tableInfo = tableCacheModel.getCarbonTable().getTableInfo();
config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
@@ -268,7 +247,7 @@ public class CarbonTableReader {
CarbonInputFormat.setTableInfo(config, carbonTable.getTableInfo());
JobConf jobConf = new JobConf(config);
- List<PartitionSpec> filteredPartitions = new ArrayList();
+ List<PartitionSpec> filteredPartitions = new ArrayList<>();
PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
LoadMetadataDetails[] loadMetadataDetails = null;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
index 481ab27..0eee58a 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/BooleanStreamReader.java
@@ -82,6 +82,12 @@ public class BooleanStreamReader extends CarbonColumnVectorImpl
builder.appendNull();
}
+ @Override public void putNulls(int rowId, int count) {
+ for (int i = 0; i < count; i++) {
+ builder.appendNull();
+ }
+ }
+
@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
index 7bbf1ca..2976ca7 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
@@ -89,6 +89,18 @@ public class DecimalSliceStreamReader extends CarbonColumnVectorImpl
decimalBlockWriter(value);
}
+ @Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
+ for (int i = 0; i < count; i++) {
+ putDecimal(rowId++, value, precision);
+ }
+ }
+
+ @Override public void putNulls(int rowId, int count) {
+ for (int i = 0; i < count; i++) {
+ builder.appendNull();
+ }
+ }
+
@Override public void putNull(int rowId) {
builder.appendNull();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
index 563f1b7..ed9a202 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DoubleStreamReader.java
@@ -70,10 +70,22 @@ public class DoubleStreamReader extends CarbonColumnVectorImpl implements Presto
type.writeDouble(builder, value);
}
+ @Override public void putDoubles(int rowId, int count, double value) {
+ for (int i = 0; i < count; i++) {
+ type.writeDouble(builder, value);
+ }
+ }
+
@Override public void putNull(int rowId) {
builder.appendNull();
}
+ @Override public void putNulls(int rowId, int count) {
+ for (int i = 0; i < count; i++) {
+ builder.appendNull();
+ }
+ }
+
@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
index 6e15da6..52ddbb2 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/IntegerStreamReader.java
@@ -68,10 +68,22 @@ public class IntegerStreamReader extends CarbonColumnVectorImpl
}
}
+ @Override public void putInts(int rowId, int count, int value) {
+ for (int i = 0; i < count; i++) {
+ putInt(rowId++, value);
+ }
+ }
+
@Override public void putNull(int rowId) {
builder.appendNull();
}
+ @Override public void putNulls(int rowId, int count) {
+ for (int i = 0; i < count; i++) {
+ builder.appendNull();
+ }
+ }
+
@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
index 494344c..81fdf88 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/LongStreamReader.java
@@ -67,6 +67,12 @@ public class LongStreamReader extends CarbonColumnVectorImpl implements PrestoVe
type.writeLong(builder, value);
}
+ @Override public void putLongs(int rowId, int count, long value) {
+ for (int i = 0; i < count; i++) {
+ type.writeLong(builder, value);
+ }
+ }
+
@Override public void putNull(int rowId) {
builder.appendNull();
}
@@ -74,4 +80,10 @@ public class LongStreamReader extends CarbonColumnVectorImpl implements PrestoVe
@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
}
+
+ @Override public void putNulls(int rowId, int count) {
+ for (int i = 0; i < count; i++) {
+ builder.appendNull();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
index cdba4bf..a67071d 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ObjectStreamReader.java
@@ -62,4 +62,10 @@ public class ObjectStreamReader extends CarbonColumnVectorImpl implements Presto
builder = type.createBlockBuilder(null, batchSize);
}
+ @Override public void putNulls(int rowId, int count) {
+ for (int i = 0; i < count; i++) {
+ builder.appendNull();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
index 198f82a..7411513 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/ShortStreamReader.java
@@ -67,10 +67,22 @@ public class ShortStreamReader extends CarbonColumnVectorImpl implements PrestoV
type.writeLong(builder, value);
}
+ @Override public void putShorts(int rowId, int count, short value) {
+ for (int i = 0; i < count; i++) {
+ type.writeLong(builder, value);
+ }
+ }
+
@Override public void putNull(int rowId) {
builder.appendNull();
}
+ @Override public void putNulls(int rowId, int count) {
+ for (int i = 0; i < count; i++) {
+ builder.appendNull();
+ }
+ }
+
@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index d9c7ad3..0d4b4f0 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -106,14 +106,24 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
values[rowId] = value;
}
+ @Override public void putInts(int rowId, int count, int value) {
+ for (int i = 0; i < count; i++) {
+ values[rowId++] = value;
+ }
+ }
+
@Override public void putByteArray(int rowId, byte[] value) {
type.writeSlice(builder, wrappedBuffer(value));
}
@Override public void putByteArray(int rowId, int offset, int length, byte[] value) {
- byte[] byteArr = new byte[length];
- System.arraycopy(value, offset, byteArr, 0, length);
- type.writeSlice(builder, wrappedBuffer(byteArr));
+ type.writeSlice(builder, wrappedBuffer(value), offset, length);
+ }
+
+ @Override public void putByteArray(int rowId, int count, byte[] value) {
+ for (int i = 0; i < count; i++) {
+ type.writeSlice(builder, wrappedBuffer(value));
+ }
}
@Override public void putNull(int rowId) {
@@ -122,6 +132,14 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
}
}
+ @Override public void putNulls(int rowId, int count) {
+ if (dictionaryBlock == null) {
+ for (int i = 0; i < count; ++i) {
+ builder.appendNull();
+ }
+ }
+ }
+
@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8697b10/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
index 43c471a..1052a74 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/TimestampStreamReader.java
@@ -68,10 +68,22 @@ public class TimestampStreamReader extends CarbonColumnVectorImpl
type.writeLong(builder, value / 1000);
}
+ @Override public void putLongs(int rowId, int count, long value) {
+ for (int i = 0; i < count; i++) {
+ type.writeLong(builder, value / 1000);
+ }
+ }
+
@Override public void putNull(int rowId) {
builder.appendNull();
}
+ @Override public void putNulls(int rowId, int count) {
+ for (int i = 0; i < count; ++i) {
+ builder.appendNull();
+ }
+ }
+
@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
}