You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/01/09 12:51:42 UTC
carbondata git commit: [CARBONDATA-3237] Fix presto carbon issues in
dictionary include scenario
Repository: carbondata
Updated Branches:
refs/heads/master 1b45c41fe -> 8e6def9fa
[CARBONDATA-3237] Fix presto carbon issues in dictionary include scenario
problem1: Decimal column with dictionary include cannot be read in
presto
cause: int is typecasted to decimal for dictionary columns in decimal stream reader.
solution: keep original data type as well as new data type for decimal
stream reader.
problem2: Optimize presto query time for dictionary include string column
currently, for each query, presto carbon creates dictionary block for string columns.
cause: This happens for each query and if cardinality is more , it takes more time to build.
solution: dictionary block is not required. we can lookup using normal dictionary lookup.
This closes #3055
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8e6def9f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8e6def9f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8e6def9f
Branch: refs/heads/master
Commit: 8e6def9facc6c51de58ee655961ac4710c252bc0
Parents: 1b45c41
Author: ajantha-bhat <aj...@gmail.com>
Authored: Mon Jan 7 14:50:11 2019 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Wed Jan 9 18:21:10 2019 +0530
----------------------------------------------------------------------
.../carbondata/presto/CarbonVectorBatch.java | 12 ++---
.../readers/DecimalSliceStreamReader.java | 9 ++--
.../presto/readers/SliceStreamReader.java | 53 ++++++++++++--------
.../CarbonDictionaryDecodeReadSupport.scala | 22 +-------
.../presto/util/CarbonDataStoreCreator.scala | 1 +
5 files changed, 47 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e6def9f/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
index fb8300a..140e46b 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbonVectorBatch.java
@@ -37,8 +37,6 @@ import org.apache.carbondata.presto.readers.ShortStreamReader;
import org.apache.carbondata.presto.readers.SliceStreamReader;
import org.apache.carbondata.presto.readers.TimestampStreamReader;
-import com.facebook.presto.spi.block.Block;
-
public class CarbonVectorBatch {
private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
@@ -63,8 +61,7 @@ public class CarbonVectorBatch {
DataType[] dataTypes = readSupport.getDataTypes();
for (int i = 0; i < schema.length; ++i) {
- columns[i] = createDirectStreamReader(maxRows, dataTypes[i], schema[i], dictionaries[i],
- readSupport.getDictionaryBlock(i));
+ columns[i] = createDirectStreamReader(maxRows, dataTypes[i], schema[i], dictionaries[i]);
}
}
@@ -79,7 +76,7 @@ public class CarbonVectorBatch {
}
private CarbonColumnVectorImpl createDirectStreamReader(int batchSize, DataType dataType,
- StructField field, Dictionary dictionary, Block dictionaryBlock) {
+ StructField field, Dictionary dictionary) {
if (dataType == DataTypes.BOOLEAN) {
return new BooleanStreamReader(batchSize, field.getDataType(), dictionary);
} else if (dataType == DataTypes.SHORT) {
@@ -93,9 +90,10 @@ public class CarbonVectorBatch {
} else if (dataType == DataTypes.DOUBLE) {
return new DoubleStreamReader(batchSize, field.getDataType(), dictionary);
} else if (dataType == DataTypes.STRING) {
- return new SliceStreamReader(batchSize, field.getDataType(), dictionaryBlock);
+ return new SliceStreamReader(batchSize, field.getDataType(), dictionary);
} else if (DataTypes.isDecimal(dataType)) {
- return new DecimalSliceStreamReader(batchSize, (DecimalType) field.getDataType(), dictionary);
+ return new DecimalSliceStreamReader(batchSize, field.getDataType(), (DecimalType) dataType,
+ dictionary);
} else {
return new ObjectStreamReader(batchSize, field.getDataType());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e6def9f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
index 2976ca7..ddc855a 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/DecimalSliceStreamReader.java
@@ -24,6 +24,7 @@ import java.util.Objects;
import static java.math.RoundingMode.HALF_UP;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -57,10 +58,12 @@ public class DecimalSliceStreamReader extends CarbonColumnVectorImpl
protected BlockBuilder builder;
private Dictionary dictionary;
- public DecimalSliceStreamReader(int batchSize,
- org.apache.carbondata.core.metadata.datatype.DecimalType dataType, Dictionary dictionary) {
+ public DecimalSliceStreamReader(int batchSize, DataType dataType,
+ org.apache.carbondata.core.metadata.datatype.DecimalType decimalDataType,
+ Dictionary dictionary) {
super(batchSize, dataType);
- this.type = DecimalType.createDecimalType(dataType.getPrecision(), dataType.getScale());
+ this.type =
+ DecimalType.createDecimalType(decimalDataType.getPrecision(), decimalDataType.getScale());
this.batchSize = batchSize;
this.builder = type.createBlockBuilder(null, batchSize);
this.dictionary = dictionary;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e6def9f/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
index 0d4b4f0..1e4688f 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/readers/SliceStreamReader.java
@@ -17,11 +17,17 @@
package org.apache.carbondata.presto.readers;
+import java.nio.charset.Charset;
+import java.util.Objects;
import java.util.Optional;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonColumnVectorImpl;
+import org.apache.carbondata.core.util.DataTypeUtil;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
@@ -44,27 +50,31 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
protected BlockBuilder builder;
- int[] values;
-
private Block dictionaryBlock;
+ private boolean isLocalDict;
+
+ private Dictionary globalDictionary;
+
public SliceStreamReader(int batchSize, DataType dataType,
- Block dictionaryBlock) {
+ Dictionary dictionary) {
super(batchSize, dataType);
+ this.globalDictionary = dictionary;
this.batchSize = batchSize;
- if (dictionaryBlock == null) {
- this.builder = type.createBlockBuilder(null, batchSize);
- } else {
- this.dictionaryBlock = dictionaryBlock;
- this.values = new int[batchSize];
- }
+ this.builder = type.createBlockBuilder(null, batchSize);
}
@Override public Block buildBlock() {
if (dictionaryBlock == null) {
return builder.build();
} else {
- return new DictionaryBlock(batchSize, dictionaryBlock, values);
+ int[] dataArray;
+ if (isLocalDict) {
+ dataArray = (int[]) ((CarbonColumnVectorImpl) getDictionaryVector()).getDataArray();
+ } else {
+ dataArray = (int[]) getDataArray();
+ }
+ return new DictionaryBlock(batchSize, dictionaryBlock, dataArray);
}
}
@@ -95,22 +105,13 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
dictOffsets[dictOffsets.length - 1] = size;
dictionaryBlock = new VariableWidthBlock(dictionary.getDictionarySize(),
Slices.wrappedBuffer(singleArrayDictValues), dictOffsets, Optional.of(nulls));
- values = (int[]) ((CarbonColumnVectorImpl) getDictionaryVector()).getDataArray();
+ this.isLocalDict = true;
}
-
@Override public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
- @Override public void putInt(int rowId, int value) {
- values[rowId] = value;
- }
- @Override public void putInts(int rowId, int count, int value) {
- for (int i = 0; i < count; i++) {
- values[rowId++] = value;
- }
- }
@Override public void putByteArray(int rowId, byte[] value) {
type.writeSlice(builder, wrappedBuffer(value));
@@ -142,5 +143,17 @@ public class SliceStreamReader extends CarbonColumnVectorImpl implements PrestoV
@Override public void reset() {
builder = type.createBlockBuilder(null, batchSize);
+ this.isLocalDict = false;
+ }
+
+ @Override public void putInt(int rowId, int value) {
+ Object data = DataTypeUtil
+ .getDataBasedOnDataType(globalDictionary.getDictionaryValueForKey(value), DataTypes.STRING);
+ if (Objects.isNull(data)) {
+ builder.appendNull();
+ } else {
+ type.writeSlice(builder, wrappedBuffer(
+ ((String) data).getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e6def9f/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
index 97deb6f..4bbd931 100644
--- a/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
+++ b/integration/presto/src/main/scala/org/apache/carbondata/presto/CarbonDictionaryDecodeReadSupport.scala
@@ -37,7 +37,6 @@ import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
private var dictionaries: Array[Dictionary] = _
private var dataTypes: Array[DataType] = _
- private var dictionaryBlock: Array[Block] = _
/**
* This initialization is done inside executor task
@@ -50,7 +49,6 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
dictionaries = new Array[Dictionary](carbonColumns.length)
dataTypes = new Array[DataType](carbonColumns.length)
- dictionaryBlock = new Array[Block](carbonColumns.length)
carbonColumns.zipWithIndex.foreach {
case (carbonColumn, index) => if (carbonColumn.hasEncoding(Encoding.DICTIONARY) &&
@@ -66,13 +64,7 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
dictionaries(index) = forwardDictionaryCache
.get(new DictionaryColumnUniqueIdentifier(carbonTable.getAbsoluteTableIdentifier,
carbonColumn.getColumnIdentifier, dataTypes(index), dictionaryPath))
- // in case of string data type create dictionarySliceArray same as that of presto code
- if (dataTypes(index).equals(DataTypes.STRING)) {
- dictionaryBlock(index) = createDictionaryBlock(dictionaries(index))
- }
- }
-
- else {
+ } else {
dataTypes(index) = carbonColumn.getDataType
}
}
@@ -87,7 +79,7 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
*/
private def createDictionaryBlock(dictionaryData: Dictionary): Block = {
val chunks: DictionaryChunksWrapper = dictionaryData.getDictionaryChunks
- val positionCount = chunks.getSize;
+ val positionCount = chunks.getSize
// In dictionary there will be only one null and the key value will be 1 by default in carbon,
// hence the isNullVector will be populated only once with null value it has no bearing on
@@ -127,16 +119,6 @@ class CarbonDictionaryDecodeReadSupport[T] extends CarbonReadSupport[T] {
throw new RuntimeException("UnSupported Method")
}
- /**
- * Function to get the SliceArrayBlock with dictionary Data
- *
- * @param columnNo
- * @return
- */
- def getDictionaryBlock(columnNo: Int): Block = {
- dictionaryBlock(columnNo)
- }
-
def getDictionaries: Array[Dictionary] = {
dictionaries
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/8e6def9f/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 115e868..9172b31 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -239,6 +239,7 @@ object CarbonDataStoreCreator {
bonus.setDataType(DataTypes.createDecimalType(10, 4))
bonus.setPrecision(10)
bonus.setScale(4)
+ bonus.setEncodingList(dictionaryEncoding)
bonus.setEncodingList(invertedIndexEncoding)
bonus.setColumnUniqueId(UUID.randomUUID().toString)
bonus.setDimensionColumn(false)