You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/12/10 14:01:29 UTC
carbondata git commit: [CARBONDATA-3145] Avoid duplicate decoding for
complex column pages while querying
Repository: carbondata
Updated Branches:
refs/heads/master 4c9f08217 -> 0c94559e2
[CARBONDATA-3145] Avoid duplicate decoding for complex column pages while querying
Problem:
Column page is decoded for getting each row of a complex primitive column.
Solution:
Decode a page it once then use the same.
This closes #2975
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0c94559e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0c94559e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0c94559e
Branch: refs/heads/master
Commit: 0c94559e2feaf3d5a001665c3da2bfc3bf941043
Parents: 4c9f082
Author: dhatchayani <dh...@gmail.com>
Authored: Wed Dec 5 12:40:56 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Mon Dec 10 19:31:12 2018 +0530
----------------------------------------------------------------------
.../core/scan/complextypes/ArrayQueryType.java | 11 ++--
.../scan/complextypes/ComplexQueryType.java | 14 +++-
.../scan/complextypes/PrimitiveQueryType.java | 11 ++--
.../core/scan/complextypes/StructQueryType.java | 14 ++--
.../core/scan/filter/GenericQueryType.java | 4 +-
.../executer/RowLevelFilterExecuterImpl.java | 7 +-
.../core/scan/result/BlockletScannedResult.java | 68 +++++++++++++-------
7 files changed, 86 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
index a5f4234..8538edb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
@@ -62,17 +63,17 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType
}
public void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks,
- int rowNumber, int pageNumber, DataOutputStream dataOutputStream) throws IOException {
- byte[] input = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber);
+ DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber,
+ DataOutputStream dataOutputStream) throws IOException {
+ byte[] input = copyBlockDataChunk(rawColumnChunks, dimensionColumnPages, rowNumber, pageNumber);
ByteBuffer byteArray = ByteBuffer.wrap(input);
int dataLength = byteArray.getInt();
dataOutputStream.writeInt(dataLength);
if (dataLength > 0) {
int dataOffset = byteArray.getInt();
for (int i = 0; i < dataLength; i++) {
- children
- .parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, dataOffset++, pageNumber,
- dataOutputStream);
+ children.parseBlocksAndReturnComplexColumnByteArray(rawColumnChunks, dimensionColumnPages,
+ dataOffset++, pageNumber, dataOutputStream);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
index 98f0715..704af89 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ComplexQueryType.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.scan.complextypes;
import java.io.IOException;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
@@ -40,9 +41,10 @@ public class ComplexQueryType {
* This method is also used by child.
*/
protected byte[] copyBlockDataChunk(DimensionRawColumnChunk[] rawColumnChunks,
- int rowNumber, int pageNumber) {
+ DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber) {
byte[] data =
- rawColumnChunks[blockIndex].decodeColumnPage(pageNumber).getChunkData(rowNumber);
+ getDecodedDimensionPage(dimensionColumnPages, rawColumnChunks[blockIndex], pageNumber)
+ .getChunkData(rowNumber);
byte[] output = new byte[data.length];
System.arraycopy(data, 0, output, 0, output.length);
return output;
@@ -57,4 +59,12 @@ public class ComplexQueryType {
.readDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
}
}
+
+ private DimensionColumnPage getDecodedDimensionPage(DimensionColumnPage[][] dimensionColumnPages,
+ DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) {
+ if (dimensionColumnPages == null || null == dimensionColumnPages[blockIndex]) {
+ return dimensionRawColumnChunk.decodeColumnPage(pageNumber);
+ }
+ return dimensionColumnPages[blockIndex][pageNumber];
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index abe33c4..6347397 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
@@ -93,10 +94,12 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
return 1;
}
- @Override public void parseBlocksAndReturnComplexColumnByteArray(
- DimensionRawColumnChunk[] rawColumnChunks, int rowNumber,
- int pageNumber, DataOutputStream dataOutputStream) throws IOException {
- byte[] currentVal = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber);
+ @Override
+ public void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks,
+ DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber,
+ DataOutputStream dataOutputStream) throws IOException {
+ byte[] currentVal =
+ copyBlockDataChunk(rawColumnChunks, dimensionColumnPages, rowNumber, pageNumber);
if (!this.isDictionary && !this.isDirectDictionary) {
dataOutputStream.writeShort(currentVal.length);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
index c607f84..7bccbc0 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
@@ -79,17 +80,18 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
}
@Override public void parseBlocksAndReturnComplexColumnByteArray(
- DimensionRawColumnChunk[] dimensionColumnDataChunks, int rowNumber,
- int pageNumber, DataOutputStream dataOutputStream) throws IOException {
- byte[] input = copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, pageNumber);
+ DimensionRawColumnChunk[] dimensionColumnDataChunks,
+ DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber,
+ DataOutputStream dataOutputStream) throws IOException {
+ byte[] input =
+ copyBlockDataChunk(dimensionColumnDataChunks, dimensionColumnPages, rowNumber, pageNumber);
ByteBuffer byteArray = ByteBuffer.wrap(input);
int childElement = byteArray.getShort();
dataOutputStream.writeShort(childElement);
if (childElement > 0) {
for (int i = 0; i < childElement; i++) {
- children.get(i)
- .parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks, rowNumber,
- pageNumber, dataOutputStream);
+ children.get(i).parseBlocksAndReturnComplexColumnByteArray(dimensionColumnDataChunks,
+ dimensionColumnPages, rowNumber, pageNumber, dataOutputStream);
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
index 6c087d7..b43062e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
@@ -41,7 +42,8 @@ public interface GenericQueryType {
int getColsCount();
void parseBlocksAndReturnComplexColumnByteArray(DimensionRawColumnChunk[] rawColumnChunks,
- int rowNumber, int pageNumber, DataOutputStream dataOutputStream) throws IOException;
+ DimensionColumnPage[][] dimensionColumnPages, int rowNumber, int pageNumber,
+ DataOutputStream dataOutputStream) throws IOException;
void fillRequiredBlockData(RawBlockletColumnChunks blockChunkHolder) throws IOException;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index 7ca2579..63ae0cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -457,9 +457,10 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteStream);
complexType.parseBlocksAndReturnComplexColumnByteArray(
- blockChunkHolder.getDimensionRawColumnChunks(), index, pageIndex, dataOutputStream);
- record[dimColumnEvaluatorInfo.getRowIndex()] = complexType
- .getDataBasedOnDataType(ByteBuffer.wrap(byteStream.toByteArray()));
+ blockChunkHolder.getDimensionRawColumnChunks(), null, index, pageIndex,
+ dataOutputStream);
+ record[dimColumnEvaluatorInfo.getRowIndex()] =
+ complexType.getDataBasedOnDataType(ByteBuffer.wrap(byteStream.toByteArray()));
byteStream.close();
} catch (IOException e) {
LOGGER.info(e.getMessage());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0c94559e/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index bb373eb..c04df52 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -17,7 +17,6 @@
package org.apache.carbondata.core.scan.result;
import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
@@ -46,6 +45,7 @@ import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
import org.apache.carbondata.core.stats.QueryStatisticsModel;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
import org.apache.log4j.Logger;
@@ -282,30 +282,38 @@ public abstract class BlockletScannedResult {
}
public void fillColumnarComplexBatch(ColumnVectorInfo[] vectorInfos) {
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ ReUsableByteArrayDataOutputStream reuseableDataOutput =
+ new ReUsableByteArrayDataOutputStream(byteStream);
+ boolean isExceptionThrown = false;
for (int i = 0; i < vectorInfos.length; i++) {
int offset = vectorInfos[i].offset;
int len = offset + vectorInfos[i].size;
int vectorOffset = vectorInfos[i].vectorOffset;
CarbonColumnVector vector = vectorInfos[i].vector;
for (int j = offset; j < len; j++) {
- ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
- DataOutputStream dataOutput = new DataOutputStream(byteStream);
try {
- vectorInfos[i].genericQueryType.parseBlocksAndReturnComplexColumnByteArray(
- dimRawColumnChunks,
- pageFilteredRowId == null ? j : pageFilteredRowId[pageCounter][j], pageCounter,
- dataOutput);
+ vectorInfos[i].genericQueryType
+ .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, dimensionColumnPages,
+ pageFilteredRowId == null ? j : pageFilteredRowId[pageCounter][j], pageCounter,
+ reuseableDataOutput);
Object data = vectorInfos[i].genericQueryType
- .getDataBasedOnDataType(ByteBuffer.wrap(byteStream.toByteArray()));
+ .getDataBasedOnDataType(ByteBuffer.wrap(reuseableDataOutput.getByteArray()));
vector.putObject(vectorOffset++, data);
+ reuseableDataOutput.reset();
} catch (IOException e) {
+ isExceptionThrown = true;
LOGGER.error(e);
} finally {
- CarbonUtil.closeStreams(dataOutput);
- CarbonUtil.closeStreams(byteStream);
+ if (isExceptionThrown) {
+ CarbonUtil.closeStreams(reuseableDataOutput);
+ CarbonUtil.closeStreams(byteStream);
+ }
}
}
}
+ CarbonUtil.closeStreams(reuseableDataOutput);
+ CarbonUtil.closeStreams(byteStream);
}
/**
@@ -541,6 +549,10 @@ public abstract class BlockletScannedResult {
*/
protected List<byte[][]> getComplexTypeKeyArrayBatch() {
List<byte[][]> complexTypeArrayList = new ArrayList<>(validRowIds.size());
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ ReUsableByteArrayDataOutputStream reUseableDataOutput =
+ new ReUsableByteArrayDataOutputStream(byteStream);
+ boolean isExceptionThrown = false;
byte[][] complexTypeData = null;
// everyTime it is initialized new as in case of prefetch it can modify the data
for (int i = 0; i < validRowIds.size(); i++) {
@@ -552,23 +564,27 @@ public abstract class BlockletScannedResult {
GenericQueryType genericQueryType =
complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]);
for (int j = 0; j < validRowIds.size(); j++) {
- ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
- DataOutputStream dataOutput = new DataOutputStream(byteStream);
try {
genericQueryType
- .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, validRowIds.get(j),
- pageCounter, dataOutput);
+ .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, dimensionColumnPages,
+ validRowIds.get(j), pageCounter, reUseableDataOutput);
// get the key array in columnar way
byte[][] complexKeyArray = complexTypeArrayList.get(j);
complexKeyArray[i] = byteStream.toByteArray();
+ reUseableDataOutput.reset();
} catch (IOException e) {
+ isExceptionThrown = true;
LOGGER.error(e);
} finally {
- CarbonUtil.closeStreams(dataOutput);
- CarbonUtil.closeStreams(byteStream);
+ if (isExceptionThrown) {
+ CarbonUtil.closeStreams(reUseableDataOutput);
+ CarbonUtil.closeStreams(byteStream);
+ }
}
}
}
+ CarbonUtil.closeStreams(reUseableDataOutput);
+ CarbonUtil.closeStreams(byteStream);
return complexTypeArrayList;
}
@@ -607,24 +623,32 @@ public abstract class BlockletScannedResult {
* @return complex type key array for all the complex dimension selected in query
*/
protected byte[][] getComplexTypeKeyArray(int rowId) {
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ ReUsableByteArrayDataOutputStream reUsableDataOutput =
+ new ReUsableByteArrayDataOutputStream(byteStream);
+ boolean isExceptionThrown = false;
byte[][] complexTypeData = new byte[complexParentBlockIndexes.length][];
for (int i = 0; i < complexTypeData.length; i++) {
GenericQueryType genericQueryType =
complexParentIndexToQueryMap.get(complexParentBlockIndexes[i]);
- ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
- DataOutputStream dataOutput = new DataOutputStream(byteStream);
try {
genericQueryType
- .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, rowId, pageCounter,
- dataOutput);
+ .parseBlocksAndReturnComplexColumnByteArray(dimRawColumnChunks, dimensionColumnPages,
+ rowId, pageCounter, reUsableDataOutput);
complexTypeData[i] = byteStream.toByteArray();
+ reUsableDataOutput.reset();
} catch (IOException e) {
+ isExceptionThrown = true;
LOGGER.error(e);
} finally {
- CarbonUtil.closeStreams(dataOutput);
- CarbonUtil.closeStreams(byteStream);
+ if (isExceptionThrown) {
+ CarbonUtil.closeStreams(reUsableDataOutput);
+ CarbonUtil.closeStreams(byteStream);
+ }
}
}
+ CarbonUtil.closeStreams(reUsableDataOutput);
+ CarbonUtil.closeStreams(byteStream);
return complexTypeData;
}