You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/10/25 16:55:39 UTC
[3/3] carbondata git commit: [CARBONDATA-3012] Added support for full
scan queries for vector direct fill.
[CARBONDATA-3012] Added support for full scan queries for vector direct fill.
After decompressing the page in our V3 reader we can immediately fill the data to a vector without any condition checks inside loops.
So here complete column page data is set to column vector in a single batch and gives back data to Spark/Presto.
For this purpose, a new method is added in ColumnPageDecoder
ColumnPage decodeAndFillVector(byte[] input, int offset, int length, ColumnVectorInfo vectorInfo,
BitSet nullBits, boolean isLVEncoded)
The above method takes vector fill it in a single loop without any checks inside loop.
And also added new method inside DimensionDataChunkStore
void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
ColumnVectorInfo vectorInfo);
The above method takes vector fill it in a single loop without any checks inside loop.
This closes #2818
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3d3b6ff1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3d3b6ff1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3d3b6ff1
Branch: refs/heads/master
Commit: 3d3b6ff1615e08131f6bcaea23dec0116a18081d
Parents: e0baa9b
Author: ravipesala <ra...@gmail.com>
Authored: Tue Oct 16 11:30:43 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Thu Oct 25 22:24:24 2018 +0530
----------------------------------------------------------------------
.../chunk/impl/DimensionRawColumnChunk.java | 17 ++
.../impl/FixedLengthDimensionColumnPage.java | 29 +-
.../chunk/impl/MeasureRawColumnChunk.java | 17 ++
.../impl/VariableLengthDimensionColumnPage.java | 29 +-
.../reader/DimensionColumnChunkReader.java | 7 +
.../chunk/reader/MeasureColumnChunkReader.java | 7 +
.../reader/dimension/AbstractChunkReader.java | 11 +
...essedDimChunkFileBasedPageLevelReaderV3.java | 2 +-
...mpressedDimensionChunkFileBasedReaderV3.java | 78 +++--
.../measure/AbstractMeasureChunkReader.java | 12 +
...CompressedMeasureChunkFileBasedReaderV3.java | 45 ++-
...essedMsrChunkFileBasedPageLevelReaderV3.java | 6 +-
.../chunk/store/DimensionChunkStoreFactory.java | 16 +-
.../chunk/store/DimensionDataChunkStore.java | 7 +
.../impl/LocalDictDimensionDataChunkStore.java | 25 ++
.../safe/AbstractNonDictionaryVectorFiller.java | 282 ++++++++++++++++++
.../SafeFixedLengthDimensionDataChunkStore.java | 51 +++-
...feVariableLengthDimensionDataChunkStore.java | 17 +-
.../UnsafeAbstractDimensionDataChunkStore.java | 6 +
.../datastore/columnar/BlockIndexerStorage.java | 5 +-
.../BlockIndexerStorageForNoDictionary.java | 3 +-
.../columnar/BlockIndexerStorageForShort.java | 3 +-
.../core/datastore/columnar/UnBlockIndexer.java | 3 +
.../core/datastore/impl/FileReaderImpl.java | 1 +
.../core/datastore/page/ColumnPage.java | 130 ++++----
.../page/ColumnPageValueConverter.java | 3 +
.../datastore/page/SafeDecimalColumnPage.java | 25 ++
.../datastore/page/VarLengthColumnPageBase.java | 17 +-
.../page/encoding/ColumnPageDecoder.java | 8 +
.../page/encoding/ColumnPageEncoderMeta.java | 11 +
.../page/encoding/EncodingFactory.java | 44 ++-
.../adaptive/AdaptiveDeltaFloatingCodec.java | 82 +++++
.../adaptive/AdaptiveDeltaIntegralCodec.java | 194 +++++++++++-
.../adaptive/AdaptiveFloatingCodec.java | 84 +++++-
.../adaptive/AdaptiveIntegralCodec.java | 157 ++++++++++
.../encoding/compress/DirectCompressCodec.java | 170 ++++++++++-
.../datastore/page/encoding/rle/RLECodec.java | 9 +
.../DateDirectDictionaryGenerator.java | 2 +-
.../datatype/DecimalConverterFactory.java | 91 +++++-
.../carbondata/core/mutate/DeleteDeltaVo.java | 4 +
.../DictionaryBasedVectorResultCollector.java | 112 +++++--
.../executor/impl/AbstractQueryExecutor.java | 13 +
.../scan/executor/infos/BlockExecutionInfo.java | 13 +
.../core/scan/executor/util/QueryUtil.java | 2 +-
.../carbondata/core/scan/model/QueryModel.java | 6 +-
.../core/scan/result/BlockletScannedResult.java | 76 ++++-
.../scan/result/vector/CarbonColumnVector.java | 18 +-
.../scan/result/vector/CarbonColumnarBatch.java | 4 +-
.../scan/result/vector/CarbonDictionary.java | 2 +
.../scan/result/vector/ColumnVectorInfo.java | 5 +
.../vector/impl/CarbonColumnVectorImpl.java | 67 ++++-
.../vector/impl/CarbonDictionaryImpl.java | 3 +
.../scan/scanner/impl/BlockletFullScanner.java | 4 +-
.../core/stats/QueryStatisticsModel.java | 13 +
.../apache/carbondata/core/util/ByteUtil.java | 8 +
.../executer/IncludeFilterExecuterImplTest.java | 6 +-
.../carbondata/core/util/CarbonUtilTest.java | 2 +-
.../presto/CarbonColumnVectorWrapper.java | 65 +++-
.../presto/readers/SliceStreamReader.java | 4 +-
.../filterexpr/AllDataTypesTestCaseFilter.scala | 18 +-
.../vectorreader/ColumnarVectorWrapper.java | 70 ++++-
.../ColumnarVectorWrapperDirect.java | 229 ++++++++++++++
.../VectorizedCarbonRecordReader.java | 30 +-
.../org/apache/spark/sql/CarbonVectorProxy.java | 295 ++++++++++--------
.../spark/sql/CarbonDictionaryWrapper.java | 5 +-
.../org/apache/spark/sql/CarbonVectorProxy.java | 297 +++++++++++--------
.../stream/CarbonStreamRecordReader.java | 2 +-
67 files changed, 2616 insertions(+), 463 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
index 7b1aca1..d84434e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/DimensionRawColumnChunk.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.scan.result.vector.impl.CarbonDictionaryImpl;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.format.Encoding;
@@ -121,6 +122,22 @@ public class DimensionRawColumnChunk extends AbstractRawColumnChunk {
}
}
+ /**
+ * Convert raw data with specified page number processed to DimensionColumnDataChunk and fill
+ * the vector
+ *
+ * @param pageNumber page number to decode and fill the vector
+ * @param vectorInfo vector to be filled with column page
+ */
+ public void convertToDimColDataChunkAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
+ assert pageNumber < pagesCount;
+ try {
+ chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override public void freeMemory() {
super.freeMemory();
if (null != dataChunks) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
index c815e4d..e650e0e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
@@ -46,11 +46,38 @@ public class FixedLengthDimensionColumnPage extends AbstractDimensionColumnPage
dataChunk.length;
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
- DimensionStoreType.FIXED_LENGTH, null);
+ DimensionStoreType.FIXED_LENGTH, null, false);
dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
}
/**
+ * Constructor
+ *
+ * @param dataChunk data chunk
+ * @param invertedIndex inverted index
+ * @param invertedIndexReverse reverse inverted index
+ * @param numberOfRows number of rows
+ * @param columnValueSize size of each column value
+ * @param vectorInfo vector to be filled with decoded column page.
+ */
+ public FixedLengthDimensionColumnPage(byte[] dataChunk, int[] invertedIndex,
+ int[] invertedIndexReverse, int numberOfRows, int columnValueSize,
+ ColumnVectorInfo vectorInfo) {
+ boolean isExplicitSorted = isExplicitSorted(invertedIndex);
+ long totalSize = isExplicitSorted ?
+ dataChunk.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) :
+ dataChunk.length;
+ dataChunkStore = DimensionChunkStoreFactory.INSTANCE
+ .getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
+ DimensionStoreType.FIXED_LENGTH, null, vectorInfo != null);
+ if (vectorInfo == null) {
+ dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
+ } else {
+ dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunk, vectorInfo);
+ }
+ }
+
+ /**
* Below method will be used to fill the data based on offset and row id
*
* @param rowId row id of the chunk
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
index 9448f30..6a90569 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/MeasureRawColumnChunk.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.datastore.chunk.AbstractRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
/**
* Contains raw measure data
@@ -105,6 +106,22 @@ public class MeasureRawColumnChunk extends AbstractRawColumnChunk {
}
}
+ /**
+ * Convert raw data with specified page number processed to DimensionColumnDataChunk and fill the
+ * vector
+ *
+ * @param pageNumber page number to decode and fill the vector
+ * @param vectorInfo vector to be filled with column page
+ */
+ public void convertToColumnPageAndFillVector(int pageNumber, ColumnVectorInfo vectorInfo) {
+ assert pageNumber < pagesCount;
+ try {
+ chunkReader.decodeColumnPageAndFillVector(this, pageNumber, vectorInfo);
+ } catch (IOException | MemoryException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override public void freeMemory() {
super.freeMemory();
if (null != columnPages) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
index a404ff7..6cb8174 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
@@ -30,10 +30,31 @@ public class VariableLengthDimensionColumnPage extends AbstractDimensionColumnPa
/**
* Constructor for this class
+ * @param dataChunks data chunk
+ * @param invertedIndex inverted index
+ * @param invertedIndexReverse reverse inverted index
+ * @param numberOfRows number of rows
+ * @param dictionary carbon local dictionary for string column.
*/
public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
CarbonDictionary dictionary) {
+ this(dataChunks, invertedIndex, invertedIndexReverse, numberOfRows, dimStoreType, dictionary,
+ null);
+ }
+
+ /**
+ * Constructor for this class
+ * @param dataChunks data chunk
+ * @param invertedIndex inverted index
+ * @param invertedIndexReverse reverse inverted index
+ * @param numberOfRows number of rows
+ * @param dictionary carbon local dictionary for string column.
+ * @param vectorInfo vector to be filled with decoded column page.
+ */
+ public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
+ int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType,
+ CarbonDictionary dictionary, ColumnVectorInfo vectorInfo) {
boolean isExplicitSorted = isExplicitSorted(invertedIndex);
long totalSize = 0;
switch (dimStoreType) {
@@ -54,8 +75,12 @@ public class VariableLengthDimensionColumnPage extends AbstractDimensionColumnPa
}
dataChunkStore = DimensionChunkStoreFactory.INSTANCE
.getDimensionChunkStore(0, isExplicitSorted, numberOfRows, totalSize, dimStoreType,
- dictionary);
- dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
+ dictionary, vectorInfo != null);
+ if (vectorInfo != null) {
+ dataChunkStore.fillVector(invertedIndex, invertedIndexReverse, dataChunks, vectorInfo);
+ } else {
+ dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
index fd81973..e2d6be7 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/DimensionColumnChunkReader.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnPage;
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
/**
* Interface for reading the data chunk
@@ -60,4 +61,10 @@ public interface DimensionColumnChunkReader {
*/
DimensionColumnPage decodeColumnPage(DimensionRawColumnChunk dimensionRawColumnChunk,
int pageNumber) throws IOException, MemoryException;
+
+ /**
+ * Decodes the raw data chunk of given page number and fill the vector with decoded data.
+ */
+ void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
+ int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
index f1392d0..0fbbe6b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/MeasureColumnChunkReader.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
/**
* Reader interface for reading the measure blocks from file
@@ -58,4 +59,10 @@ public interface MeasureColumnChunkReader {
ColumnPage decodeColumnPage(MeasureRawColumnChunk measureRawColumnChunk,
int pageNumber) throws IOException, MemoryException;
+ /**
+ * Decode raw data and fill the vector
+ */
+ void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
+ int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException;
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
index b08f9ed..2c42abe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReader.java
@@ -16,10 +16,15 @@
*/
package org.apache.carbondata.core.datastore.chunk.reader.dimension;
+import java.io.IOException;
+
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.DimensionColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.CarbonProperties;
/**
@@ -79,4 +84,10 @@ public abstract class AbstractChunkReader implements DimensionColumnChunkReader
this.numberOfRows = numberOfRows;
}
+ @Override
+ public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
+ int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+ throw new UnsupportedOperationException(
+ "This operation is not supported in this reader " + this.getClass().getName());
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
index 6efaf8a..86a4334 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimChunkFileBasedPageLevelReaderV3.java
@@ -171,6 +171,6 @@ public class CompressedDimChunkFileBasedPageLevelReaderV3
ByteBuffer rawData = dimensionRawColumnChunk.getFileReader()
.readByteBuffer(filePath, offset, length);
- return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0);
+ return decodeDimension(dimensionRawColumnChunk, rawData, pageMetadata, 0, null);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index b96e52e..a9f9338 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.dimension.v3;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.List;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -39,6 +40,7 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
@@ -207,6 +209,12 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
*/
@Override public DimensionColumnPage decodeColumnPage(
DimensionRawColumnChunk rawColumnPage, int pageNumber) throws IOException, MemoryException {
+ return decodeColumnPage(rawColumnPage, pageNumber, null);
+ }
+
+ private DimensionColumnPage decodeColumnPage(
+ DimensionRawColumnChunk rawColumnPage, int pageNumber,
+ ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
// data chunk of blocklet column
DataChunk3 dataChunk3 = rawColumnPage.getDataChunkV3();
// get the data buffer
@@ -221,49 +229,65 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
int offset = (int) rawColumnPage.getOffSet() + dimensionChunksLength
.get(rawColumnPage.getColumnIndex()) + dataChunk3.getPage_offset().get(pageNumber);
// first read the data and uncompressed it
- return decodeDimension(rawColumnPage, rawData, pageMetadata, offset);
+ return decodeDimension(rawColumnPage, rawData, pageMetadata, offset, vectorInfo);
+ }
+
+ @Override
+ public void decodeColumnPageAndFillVector(DimensionRawColumnChunk dimensionRawColumnChunk,
+ int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+ DimensionColumnPage columnPage =
+ decodeColumnPage(dimensionRawColumnChunk, pageNumber, vectorInfo);
+ columnPage.freeMemory();
}
- private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata,
- ByteBuffer pageData, int offset, boolean isLocalDictEncodedPage)
+ private ColumnPage decodeDimensionByMeta(DataChunk2 pageMetadata, ByteBuffer pageData, int offset,
+ boolean isLocalDictEncodedPage, ColumnVectorInfo vectorInfo, BitSet nullBitSet)
throws IOException, MemoryException {
List<Encoding> encodings = pageMetadata.getEncoders();
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
pageMetadata.getChunk_meta());
ColumnPageDecoder decoder = encodingFactory.createDecoder(encodings, encoderMetas,
- compressorName);
- return decoder
- .decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
+ compressorName, vectorInfo != null);
+ if (vectorInfo != null) {
+ decoder
+ .decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
+ nullBitSet, isLocalDictEncodedPage);
+ return null;
+ } else {
+ return decoder
+ .decode(pageData.array(), offset, pageMetadata.data_page_length, isLocalDictEncodedPage);
+ }
}
protected DimensionColumnPage decodeDimension(DimensionRawColumnChunk rawColumnPage,
- ByteBuffer pageData, DataChunk2 pageMetadata, int offset)
+ ByteBuffer pageData, DataChunk2 pageMetadata, int offset, ColumnVectorInfo vectorInfo)
throws IOException, MemoryException {
List<Encoding> encodings = pageMetadata.getEncoders();
if (CarbonUtil.isEncodedWithMeta(encodings)) {
- ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, offset,
- null != rawColumnPage.getLocalDictionary());
- decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
int[] invertedIndexes = new int[0];
int[] invertedIndexesReverse = new int[0];
// in case of no dictionary measure data types, if it is included in sort columns
// then inverted index to be uncompressed
- if (encodings.contains(Encoding.INVERTED_INDEX)) {
+ boolean isExplicitSorted =
+ CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX);
+ int dataOffset = offset;
+ if (isExplicitSorted) {
offset += pageMetadata.data_page_length;
- if (CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX)) {
- invertedIndexes = CarbonUtil
- .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
- // get the reverse index
- invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
- }
+ invertedIndexes = CarbonUtil
+ .getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
+ // get the reverse index
+ invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
}
+ BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
+ ColumnPage decodedPage = decodeDimensionByMeta(pageMetadata, pageData, dataOffset,
+ null != rawColumnPage.getLocalDictionary(), vectorInfo, nullBitSet);
+ decodedPage.setNullBits(nullBitSet);
return new ColumnPageWrapper(decodedPage, rawColumnPage.getLocalDictionary(), invertedIndexes,
- invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata),
- CarbonUtil.hasEncoding(pageMetadata.encoders, Encoding.INVERTED_INDEX));
+ invertedIndexesReverse, isEncodedWithAdaptiveMeta(pageMetadata), isExplicitSorted);
} else {
// following code is for backward compatibility
- return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset);
+ return decodeDimensionLegacy(rawColumnPage, pageData, pageMetadata, offset, vectorInfo);
}
}
@@ -283,8 +307,8 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
}
private DimensionColumnPage decodeDimensionLegacy(DimensionRawColumnChunk rawColumnPage,
- ByteBuffer pageData, DataChunk2 pageMetadata, int offset) throws IOException,
- MemoryException {
+ ByteBuffer pageData, DataChunk2 pageMetadata, int offset, ColumnVectorInfo vectorInfo)
+ throws IOException, MemoryException {
byte[] dataPage;
int[] rlePage;
int[] invertedIndexes = new int[0];
@@ -296,8 +320,10 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
invertedIndexes = CarbonUtil
.getUnCompressColumnIndex(pageMetadata.rowid_page_length, pageData, offset);
offset += pageMetadata.rowid_page_length;
- // get the reverse index
- invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
+ if (vectorInfo == null) {
+ // get the reverse index
+ invertedIndexesReverse = CarbonUtil.getInvertedReverseIndex(invertedIndexes);
+ }
}
// if rle is applied then read the rle block chunk and then uncompress
//then actual data based on rle block
@@ -324,13 +350,13 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
columnDataChunk =
new VariableLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
pageMetadata.getNumberOfRowsInpage(), dimStoreType,
- rawColumnPage.getLocalDictionary());
+ rawColumnPage.getLocalDictionary(), vectorInfo);
} else {
// to store fixed length column chunk values
columnDataChunk =
new FixedLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
pageMetadata.getNumberOfRowsInpage(),
- eachColumnValueSize[rawColumnPage.getColumnIndex()]);
+ eachColumnValueSize[rawColumnPage.getColumnIndex()], vectorInfo);
}
return columnDataChunk;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
index 6774fcb..cd233d2 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReader.java
@@ -16,10 +16,15 @@
*/
package org.apache.carbondata.core.datastore.chunk.reader.measure;
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.reader.MeasureColumnChunkReader;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
/**
* Measure block reader abstract class
@@ -48,4 +53,11 @@ public abstract class AbstractMeasureChunkReader implements MeasureColumnChunkRe
this.filePath = filePath;
this.numberOfRows = numberOfRows;
}
+
+ @Override
+ public void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
+ int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+ throw new UnsupportedOperationException(
+ "This operation is not supported in this class " + getClass().getName());
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index 240771a..8394029 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.BitSet;
import java.util.List;
import org.apache.carbondata.core.datastore.FileReader;
@@ -29,6 +30,7 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
@@ -190,6 +192,18 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
public ColumnPage decodeColumnPage(
MeasureRawColumnChunk rawColumnChunk, int pageNumber)
throws IOException, MemoryException {
+ return decodeColumnPage(rawColumnChunk, pageNumber, null);
+ }
+
+ @Override
+ public void decodeColumnPageAndFillVector(MeasureRawColumnChunk measureRawColumnChunk,
+ int pageNumber, ColumnVectorInfo vectorInfo) throws IOException, MemoryException {
+ decodeColumnPage(measureRawColumnChunk, pageNumber, vectorInfo);
+ }
+
+ private ColumnPage decodeColumnPage(
+ MeasureRawColumnChunk rawColumnChunk, int pageNumber, ColumnVectorInfo vectorInfo)
+ throws IOException, MemoryException {
// data chunk of blocklet column
DataChunk3 dataChunk3 = rawColumnChunk.getDataChunkV3();
// data chunk of page
@@ -203,23 +217,34 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
int offset = (int) rawColumnChunk.getOffSet() +
measureColumnChunkLength.get(rawColumnChunk.getColumnIndex()) +
dataChunk3.getPage_offset().get(pageNumber);
- ColumnPage decodedPage = decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset);
- decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
+ BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
+ ColumnPage decodedPage =
+ decodeMeasure(pageMetadata, rawColumnChunk.getRawData(), offset, vectorInfo, nullBitSet);
+ if (decodedPage == null) {
+ return null;
+ }
+ decodedPage.setNullBits(nullBitSet);
return decodedPage;
}
/**
* Decode measure column page with page header and raw data starting from offset
*/
- protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData, int offset)
- throws MemoryException, IOException {
+ protected ColumnPage decodeMeasure(DataChunk2 pageMetadata, ByteBuffer pageData, int offset,
+ ColumnVectorInfo vectorInfo, BitSet nullBitSet) throws MemoryException, IOException {
List<Encoding> encodings = pageMetadata.getEncoders();
List<ByteBuffer> encoderMetas = pageMetadata.getEncoder_meta();
- String compressorName = CarbonMetadataUtil.getCompressorNameFromChunkMeta(
- pageMetadata.getChunk_meta());
- ColumnPageDecoder codec = encodingFactory.createDecoder(encodings, encoderMetas,
- compressorName);
- return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
+ String compressorName =
+ CarbonMetadataUtil.getCompressorNameFromChunkMeta(pageMetadata.getChunk_meta());
+ ColumnPageDecoder codec =
+ encodingFactory.createDecoder(encodings, encoderMetas, compressorName, vectorInfo != null);
+ if (vectorInfo != null) {
+ codec
+ .decodeAndFillVector(pageData.array(), offset, pageMetadata.data_page_length, vectorInfo,
+ nullBitSet, false);
+ return null;
+ } else {
+ return codec.decode(pageData.array(), offset, pageMetadata.data_page_length);
+ }
}
-
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
index 924a206..b092350 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMsrChunkFileBasedPageLevelReaderV3.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.BitSet;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
@@ -151,8 +152,9 @@ public class CompressedMsrChunkFileBasedPageLevelReaderV3
ByteBuffer buffer = rawColumnPage.getFileReader()
.readByteBuffer(filePath, offset, pageMetadata.data_page_length);
- ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0);
- decodedPage.setNullBits(QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor));
+ BitSet nullBitSet = QueryUtil.getNullBitSet(pageMetadata.presence, this.compressor);
+ ColumnPage decodedPage = decodeMeasure(pageMetadata, buffer, 0, null, nullBitSet);
+ decodedPage.setNullBits(nullBitSet);
return decodedPage;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
index c7bcef1..5346f35 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
@@ -65,8 +65,8 @@ public class DimensionChunkStoreFactory {
*/
public DimensionDataChunkStore getDimensionChunkStore(int columnValueSize,
boolean isInvertedIndex, int numberOfRows, long totalSize, DimensionStoreType storeType,
- CarbonDictionary dictionary) {
- if (isUnsafe) {
+ CarbonDictionary dictionary, boolean fillDirectVector) {
+ if (isUnsafe && !fillDirectVector) {
switch (storeType) {
case FIXED_LENGTH:
return new UnsafeFixedLengthDimensionDataChunkStore(totalSize, columnValueSize,
@@ -79,24 +79,24 @@ public class DimensionChunkStoreFactory {
numberOfRows);
case LOCAL_DICT:
return new LocalDictDimensionDataChunkStore(
- new UnsafeFixedLengthDimensionDataChunkStore(totalSize,
- 3, isInvertedIndex, numberOfRows),
- dictionary);
+ new UnsafeFixedLengthDimensionDataChunkStore(totalSize, 3, isInvertedIndex,
+ numberOfRows), dictionary);
default:
throw new UnsupportedOperationException("Invalid dimension store type");
}
} else {
switch (storeType) {
case FIXED_LENGTH:
- return new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, columnValueSize);
+ return new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, columnValueSize,
+ numberOfRows);
case VARIABLE_SHORT_LENGTH:
return new SafeVariableShortLengthDimensionDataChunkStore(isInvertedIndex, numberOfRows);
case VARIABLE_INT_LENGTH:
return new SafeVariableIntLengthDimensionDataChunkStore(isInvertedIndex, numberOfRows);
case LOCAL_DICT:
return new LocalDictDimensionDataChunkStore(
- new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex,
- 3), dictionary);
+ new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, 3, numberOfRows),
+ dictionary);
default:
throw new UnsupportedOperationException("Invalid dimension store type");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
index 28aed5b..8972ddb 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionDataChunkStore.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.core.datastore.chunk.store;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
/**
* Interface responsibility is to store dimension data in memory.
@@ -35,6 +36,12 @@ public interface DimensionDataChunkStore {
void putArray(int[] invertedIndex, int[] invertedIndexReverse, byte[] data);
/**
+ * Fill the vector with decoded data.
+ */
+ void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+ ColumnVectorInfo vectorInfo);
+
+ /**
* Below method will be used to get the row
* based on row id passed
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
index 0d06f61..e70424f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/LocalDictDimensionDataChunkStore.java
@@ -21,6 +21,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
/**
* Dimension chunk store for local dictionary encoded data.
@@ -49,6 +51,29 @@ public class LocalDictDimensionDataChunkStore implements DimensionDataChunkStore
this.dimensionDataChunkStore.putArray(invertedIndex, invertedIndexReverse, data);
}
+ @Override
+ public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+ ColumnVectorInfo vectorInfo) {
+ int columnValueSize = dimensionDataChunkStore.getColumnValueSize();
+ int rowsNum = data.length / columnValueSize;
+ CarbonColumnVector vector = vectorInfo.vector;
+ if (!dictionary.isDictionaryUsed()) {
+ vector.setDictionary(dictionary);
+ dictionary.setDictionaryUsed();
+ }
+ for (int i = 0; i < rowsNum; i++) {
+ int surrogate = CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
+ if (surrogate == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
+ vector.putNull(i);
+ vector.getDictionaryVector().putNull(i);
+ } else {
+ vector.putNotNull(i);
+ vector.getDictionaryVector().putInt(i, surrogate);
+ }
+
+ }
+ }
+
@Override public byte[] getRow(int rowId) {
return dictionary.getDictionaryValue(dimensionDataChunkStore.getSurrogate(rowId));
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
new file mode 100644
index 0000000..ddfa470
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/AbstractNonDictionaryVectorFiller.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+@InterfaceAudience.Internal
+@InterfaceStability.Stable
+public abstract class AbstractNonDictionaryVectorFiller {
+
+ protected int lengthSize;
+ protected int numberOfRows;
+
+ public AbstractNonDictionaryVectorFiller(int lengthSize, int numberOfRows) {
+ this.lengthSize = lengthSize;
+ this.numberOfRows = numberOfRows;
+ }
+
+ public abstract void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer);
+
+ public int getLengthFromBuffer(ByteBuffer buffer) {
+ return buffer.getShort();
+ }
+}
+
+class NonDictionaryVectorFillerFactory {
+
+ public static AbstractNonDictionaryVectorFiller getVectorFiller(DataType type, int lengthSize,
+ int numberOfRows) {
+ if (type == DataTypes.STRING) {
+ return new StringVectorFiller(lengthSize, numberOfRows);
+ } else if (type == DataTypes.VARCHAR) {
+ return new LongStringVectorFiller(lengthSize, numberOfRows);
+ } else if (type == DataTypes.TIMESTAMP) {
+ return new TimeStampVectorFiller(lengthSize, numberOfRows);
+ } else if (type == DataTypes.BOOLEAN) {
+ return new BooleanVectorFiller(lengthSize, numberOfRows);
+ } else if (type == DataTypes.SHORT) {
+ return new ShortVectorFiller(lengthSize, numberOfRows);
+ } else if (type == DataTypes.INT) {
+ return new IntVectorFiller(lengthSize, numberOfRows);
+ } else if (type == DataTypes.LONG) {
+ return new LongVectorFiller(lengthSize, numberOfRows);
+ } else {
+ throw new UnsupportedOperationException("Not supported datatype : " + type);
+ }
+
+ }
+
+}
+
+class StringVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+ public StringVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+ // start position will be used to store the current data position
+ int startOffset = 0;
+ // as first position will be start from length of bytes as data is stored first in the memory
+ // block we need to skip first two bytes this is because first two bytes will be length of the
+ // data which we have to skip
+ int currentOffset = lengthSize;
+ ByteUtil.UnsafeComparer comparator = ByteUtil.UnsafeComparer.INSTANCE;
+ for (int i = 0; i < numberOfRows - 1; i++) {
+ buffer.position(startOffset);
+ startOffset += getLengthFromBuffer(buffer) + lengthSize;
+ int length = startOffset - (currentOffset);
+ if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+ CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentOffset, length)) {
+ vector.putNull(i);
+ } else {
+ vector.putByteArray(i, currentOffset, length, data);
+ }
+ currentOffset = startOffset + lengthSize;
+ }
+ // Handle last row
+ int length = (data.length - currentOffset);
+ if (comparator.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, 0,
+ CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length, data, currentOffset, length)) {
+ vector.putNull(numberOfRows - 1);
+ } else {
+ vector.putByteArray(numberOfRows - 1, currentOffset, length, data);
+ }
+ }
+}
+
+class LongStringVectorFiller extends StringVectorFiller {
+ public LongStringVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public int getLengthFromBuffer(ByteBuffer buffer) {
+ return buffer.getInt();
+ }
+}
+
+class BooleanVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+ public BooleanVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+ // start position will be used to store the current data position
+ int startOffset = 0;
+ int currentOffset = lengthSize;
+ for (int i = 0; i < numberOfRows - 1; i++) {
+ buffer.position(startOffset);
+ startOffset += getLengthFromBuffer(buffer) + lengthSize;
+ int length = startOffset - (currentOffset);
+ if (length == 0) {
+ vector.putNull(i);
+ } else {
+ vector.putBoolean(i, ByteUtil.toBoolean(data[currentOffset]));
+ }
+ currentOffset = startOffset + lengthSize;
+ }
+ int length = (data.length - currentOffset);
+ if (length == 0) {
+ vector.putNull(numberOfRows - 1);
+ } else {
+ vector.putBoolean(numberOfRows - 1, ByteUtil.toBoolean(data[currentOffset]));
+ }
+ }
+}
+
+class ShortVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+ public ShortVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+ // start position will be used to store the current data position
+ int startOffset = 0;
+ int currentOffset = lengthSize;
+ for (int i = 0; i < numberOfRows - 1; i++) {
+ buffer.position(startOffset);
+ startOffset += getLengthFromBuffer(buffer) + lengthSize;
+ int length = startOffset - (currentOffset);
+ if (length == 0) {
+ vector.putNull(i);
+ } else {
+ vector.putShort(i, ByteUtil.toXorShort(data, currentOffset, length));
+ }
+ currentOffset = startOffset + lengthSize;
+ }
+ int length = (data.length - currentOffset);
+ if (length == 0) {
+ vector.putNull(numberOfRows - 1);
+ } else {
+ vector.putShort(numberOfRows - 1, ByteUtil.toXorShort(data, currentOffset, length));
+ }
+ }
+}
+
+class IntVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+ public IntVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+ // start position will be used to store the current data position
+ int startOffset = 0;
+ int currentOffset = lengthSize;
+ for (int i = 0; i < numberOfRows - 1; i++) {
+ buffer.position(startOffset);
+ startOffset += getLengthFromBuffer(buffer) + lengthSize;
+ int length = startOffset - (currentOffset);
+ if (length == 0) {
+ vector.putNull(i);
+ } else {
+ vector.putInt(i, ByteUtil.toXorInt(data, currentOffset, length));
+ }
+ currentOffset = startOffset + lengthSize;
+ }
+ int length = (data.length - currentOffset);
+ if (length == 0) {
+ vector.putNull(numberOfRows - 1);
+ } else {
+ vector.putInt(numberOfRows - 1, ByteUtil.toXorInt(data, currentOffset, length));
+ }
+ }
+}
+
+class LongVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+ public LongVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+ // start position will be used to store the current data position
+ int startOffset = 0;
+ int currentOffset = lengthSize;
+ for (int i = 0; i < numberOfRows - 1; i++) {
+ buffer.position(startOffset);
+ startOffset += getLengthFromBuffer(buffer) + lengthSize;
+ int length = startOffset - (currentOffset);
+ if (length == 0) {
+ vector.putNull(i);
+ } else {
+ vector.putLong(i, DataTypeUtil
+ .getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), currentOffset,
+ length));
+ }
+ currentOffset = startOffset + lengthSize;
+ }
+ int length = (data.length - currentOffset);
+ if (length == 0) {
+ vector.putNull(numberOfRows - 1);
+ } else {
+ vector.putLong(numberOfRows - 1, DataTypeUtil
+ .getDataBasedOnRestructuredDataType(data, vector.getBlockDataType(), currentOffset,
+ length));
+ }
+ }
+}
+
+class TimeStampVectorFiller extends AbstractNonDictionaryVectorFiller {
+
+ public TimeStampVectorFiller(int lengthSize, int numberOfRows) {
+ super(lengthSize, numberOfRows);
+ }
+
+ @Override
+ public void fillVector(byte[] data, CarbonColumnVector vector, ByteBuffer buffer) {
+ // start position will be used to store the current data position
+ int startOffset = 0;
+ int currentOffset = lengthSize;
+ for (int i = 0; i < numberOfRows - 1; i++) {
+ buffer.position(startOffset);
+ startOffset += getLengthFromBuffer(buffer) + lengthSize;
+ int length = startOffset - (currentOffset);
+ if (length == 0) {
+ vector.putNull(i);
+ } else {
+ vector.putLong(i, ByteUtil.toXorLong(data, currentOffset, length) * 1000L);
+ }
+ currentOffset = startOffset + lengthSize;
+ }
+ int length = (data.length - currentOffset);
+ if (length == 0) {
+ vector.putNull(numberOfRows - 1);
+ } else {
+ vector.putLong(numberOfRows - 1, ByteUtil.toXorLong(data, currentOffset, length) * 1000L);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
index 41218d0..d30650d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
@@ -17,6 +17,12 @@
package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -30,9 +36,52 @@ public class SafeFixedLengthDimensionDataChunkStore extends SafeAbsractDimension
*/
private int columnValueSize;
- public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int columnValueSize) {
+ private int numOfRows;
+
+ public SafeFixedLengthDimensionDataChunkStore(boolean isInvertedIndex, int columnValueSize,
+ int numOfRows) {
super(isInvertedIndex);
this.columnValueSize = columnValueSize;
+ this.numOfRows = numOfRows;
+ }
+
+ @Override
+ public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+ ColumnVectorInfo vectorInfo) {
+ CarbonColumnVector vector = vectorInfo.vector;
+ fillVector(data, vectorInfo, vector);
+ }
+
+ private void fillVector(byte[] data, ColumnVectorInfo vectorInfo, CarbonColumnVector vector) {
+ DataType dataType = vectorInfo.vector.getBlockDataType();
+ if (dataType == DataTypes.DATE) {
+ for (int i = 0; i < numOfRows; i++) {
+ int surrogateInternal =
+ CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
+ if (surrogateInternal == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
+ vector.putNull(i);
+ } else {
+ vector.putInt(i, surrogateInternal - DateDirectDictionaryGenerator.cutOffDate);
+ }
+ }
+ } else if (dataType == DataTypes.TIMESTAMP) {
+ for (int i = 0; i < numOfRows; i++) {
+ int surrogateInternal =
+ CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize);
+ if (surrogateInternal == CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY) {
+ vector.putNull(i);
+ } else {
+ Object valueFromSurrogate =
+ vectorInfo.directDictionaryGenerator.getValueFromSurrogate(surrogateInternal);
+ vector.putLong(i, (long)valueFromSurrogate);
+ }
+ }
+ } else {
+ for (int i = 0; i < numOfRows; i++) {
+ vector.putInt(i,
+ CarbonUtil.getSurrogateInternal(data, i * columnValueSize, columnValueSize));
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index 8553506..0fb4854 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -23,6 +23,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -91,6 +92,20 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
}
}
+ @Override
+ public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+ ColumnVectorInfo vectorInfo) {
+ this.invertedIndexReverse = invertedIndex;
+ int lengthSize = getLengthSize();
+ CarbonColumnVector vector = vectorInfo.vector;
+ DataType dt = vector.getType();
+ // creating a byte buffer which will wrap the length of the row
+ ByteBuffer buffer = ByteBuffer.wrap(data);
+ AbstractNonDictionaryVectorFiller vectorFiller =
+ NonDictionaryVectorFillerFactory.getVectorFiller(dt, lengthSize, numberOfRows);
+ vectorFiller.fillVector(data, vector, buffer);
+ }
+
protected abstract int getLengthSize();
protected abstract int getLengthFromBuffer(ByteBuffer buffer);
@@ -150,7 +165,7 @@ public abstract class SafeVariableLengthDimensionDataChunkStore
vector.putNull(vectorRow);
} else {
if (dt == DataTypes.STRING) {
- vector.putBytes(vectorRow, currentDataOffset, length, data);
+ vector.putByteArray(vectorRow, currentDataOffset, length, data);
} else if (dt == DataTypes.BOOLEAN) {
vector.putBoolean(vectorRow, ByteUtil.toBoolean(data[currentDataOffset]));
} else if (dt == DataTypes.SHORT) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
index 89bce2d..57e9de5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.core.memory.MemoryBlock;
import org.apache.carbondata.core.memory.MemoryException;
import org.apache.carbondata.core.memory.UnsafeMemoryManager;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
/**
@@ -115,6 +116,11 @@ public abstract class UnsafeAbstractDimensionDataChunkStore implements Dimension
}
}
+ @Override public void fillVector(int[] invertedIndex, int[] invertedIndexReverse, byte[] data,
+ ColumnVectorInfo vectorInfo) {
+ throw new UnsupportedOperationException("This method not supposed to be called here");
+ }
+
/**
* Below method will be used to free the memory occupied by the column chunk
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
index 6f3f139..44b3c12 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorage.java
@@ -50,8 +50,9 @@ public abstract class BlockIndexerStorage<T> {
*
* @param rowIds
*/
- protected Map<String, short[]> rleEncodeOnRowId(short[] rowIds, short[] rowIdPage,
- short[] rowIdRlePage) {
+ protected Map<String, short[]> rleEncodeOnRowId(short[] rowIds) {
+ short[] rowIdPage;
+ short[] rowIdRlePage;
List<Short> list = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
int k = 0;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
index b3e25d3..bcf5432 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoDictionary.java
@@ -39,8 +39,7 @@ public class BlockIndexerStorageForNoDictionary extends BlockIndexerStorage<Obje
Arrays.sort(dataWithRowId);
}
short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
- Map<String, short[]> rowIdAndRleRowIdPages =
- rleEncodeOnRowId(rowIds, getRowIdPage(), getRowIdRlePage());
+ Map<String, short[]> rowIdAndRleRowIdPages = rleEncodeOnRowId(rowIds);
rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
index f1b9af2..b30396c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
@@ -43,8 +43,7 @@ public class BlockIndexerStorageForShort extends BlockIndexerStorage<byte[][]> {
Arrays.sort(dataWithRowId);
}
short[] rowIds = extractDataAndReturnRowId(dataWithRowId, dataPage);
- Map<String, short[]> rowIdAndRleRowIdPages =
- rleEncodeOnRowId(rowIds, getRowIdPage(), getRowIdRlePage());
+ Map<String, short[]> rowIdAndRleRowIdPages = rleEncodeOnRowId(rowIds);
rowIdPage = rowIdAndRleRowIdPages.get("rowIdPage");
rowIdRlePage = rowIdAndRleRowIdPages.get("rowRlePage");
if (rleOnData) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
index a7f38cd..48484ce 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/UnBlockIndexer.java
@@ -28,6 +28,9 @@ public final class UnBlockIndexer {
public static int[] uncompressIndex(int[] indexData, int[] indexMap) {
int actualSize = indexData.length;
int mapLength = indexMap.length;
+ if (indexMap.length == 0) {
+ return indexData;
+ }
for (int i = 0; i < mapLength; i++) {
actualSize += indexData[indexMap[i] + 1] - indexData[indexMap[i]] - 1;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
index 6fef278..9f0abd9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileReaderImpl.java
@@ -76,6 +76,7 @@ public class FileReaderImpl implements FileReader {
channel.close();
}
}
+ fileNameAndStreamCache.clear();
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index e8097da..e5312f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -37,14 +37,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.CarbonProperties;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.BYTE_ARRAY;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.FLOAT;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.INT;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.LONG;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT;
-import static org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT_INT;
+import static org.apache.carbondata.core.metadata.datatype.DataTypes.*;
public abstract class ColumnPage {
@@ -90,7 +83,7 @@ public abstract class ColumnPage {
private static ColumnPage createDecimalPage(ColumnPageEncoderMeta columnPageEncoderMeta,
int pageSize) {
- if (unsafe) {
+ if (isUnsafeEnabled(columnPageEncoderMeta)) {
try {
return new UnsafeDecimalColumnPage(columnPageEncoderMeta, pageSize);
} catch (MemoryException e) {
@@ -103,7 +96,7 @@ public abstract class ColumnPage {
private static ColumnPage createVarLengthPage(ColumnPageEncoderMeta columnPageEncoderMeta,
int pageSize) {
- if (unsafe) {
+ if (isUnsafeEnabled(columnPageEncoderMeta)) {
try {
return new UnsafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
} catch (MemoryException e) {
@@ -116,7 +109,7 @@ public abstract class ColumnPage {
private static ColumnPage createFixLengthPage(
ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize) {
- if (unsafe) {
+ if (isUnsafeEnabled(columnPageEncoderMeta)) {
try {
return new UnsafeFixLengthColumnPage(columnPageEncoderMeta, pageSize);
} catch (MemoryException e) {
@@ -129,7 +122,7 @@ public abstract class ColumnPage {
private static ColumnPage createFixLengthByteArrayPage(
ColumnPageEncoderMeta columnPageEncoderMeta, int pageSize, int eachValueSize) {
- if (unsafe) {
+ if (isUnsafeEnabled(columnPageEncoderMeta)) {
try {
return new UnsafeFixLengthColumnPage(columnPageEncoderMeta, pageSize, eachValueSize);
} catch (MemoryException e) {
@@ -163,7 +156,7 @@ public abstract class ColumnPage {
CarbonCommonConstants.LOCAL_DICTIONARY_DECODER_BASED_FALLBACK_DEFAULT));
ColumnPage actualPage;
ColumnPage encodedPage;
- if (unsafe) {
+ if (isUnsafeEnabled(columnPageEncoderMeta)) {
actualPage = new UnsafeVarLengthColumnPage(columnPageEncoderMeta, pageSize);
encodedPage = new UnsafeFixLengthColumnPage(
new ColumnPageEncoderMeta(columnPageEncoderMeta.getColumnSpec(), DataTypes.BYTE_ARRAY,
@@ -190,7 +183,7 @@ public abstract class ColumnPage {
DataType dataType = columnPageEncoderMeta.getStoreDataType();
TableSpec.ColumnSpec columnSpec = columnPageEncoderMeta.getColumnSpec();
String compressorName = columnPageEncoderMeta.getCompressorName();
- if (unsafe) {
+ if (isUnsafeEnabled(columnPageEncoderMeta)) {
if (dataType == DataTypes.BOOLEAN) {
instance = new UnsafeFixLengthColumnPage(
new ColumnPageEncoderMeta(columnSpec, BYTE, compressorName), pageSize);
@@ -219,21 +212,23 @@ public abstract class ColumnPage {
}
} else {
if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
- instance = newBytePage(columnSpec, new byte[pageSize], compressorName);
+ instance = newBytePage(columnPageEncoderMeta, new byte[pageSize]);
} else if (dataType == DataTypes.SHORT) {
- instance = newShortPage(columnSpec, new short[pageSize], compressorName);
+ instance = newShortPage(columnPageEncoderMeta, new short[pageSize]);
} else if (dataType == DataTypes.SHORT_INT) {
- instance = newShortIntPage(columnSpec, new byte[pageSize * 3], compressorName);
+ instance = newShortIntPage(columnPageEncoderMeta, new byte[pageSize * 3]);
} else if (dataType == DataTypes.INT) {
- instance = newIntPage(columnSpec, new int[pageSize], compressorName);
+ instance = newIntPage(columnPageEncoderMeta, new int[pageSize]);
} else if (dataType == DataTypes.LONG || dataType == DataTypes.TIMESTAMP) {
- instance = newLongPage(columnSpec, new long[pageSize], compressorName);
+ instance = newLongPage(
+ new ColumnPageEncoderMeta(columnPageEncoderMeta.getColumnSpec(), LONG,
+ columnPageEncoderMeta.getCompressorName()), new long[pageSize]);
} else if (dataType == DataTypes.FLOAT) {
- instance = newFloatPage(columnSpec, new float[pageSize], compressorName);
+ instance = newFloatPage(columnPageEncoderMeta, new float[pageSize]);
} else if (dataType == DataTypes.DOUBLE) {
- instance = newDoublePage(columnSpec, new double[pageSize], compressorName);
+ instance = newDoublePage(columnPageEncoderMeta, new double[pageSize]);
} else if (DataTypes.isDecimal(dataType)) {
- instance = newDecimalPage(columnSpec, new byte[pageSize][], compressorName);
+ instance = newDecimalPage(columnPageEncoderMeta, new byte[pageSize][]);
} else if (dataType == DataTypes.STRING
|| dataType == DataTypes.BYTE_ARRAY
|| dataType == DataTypes.VARCHAR) {
@@ -253,75 +248,67 @@ public abstract class ColumnPage {
return columnPage;
}
- private static ColumnPage newBytePage(TableSpec.ColumnSpec columnSpec, byte[] byteData,
- String compressorName) {
+ private static ColumnPage newBytePage(ColumnPageEncoderMeta meta, byte[] byteData) {
+ ColumnPageEncoderMeta encoderMeta =
+ new ColumnPageEncoderMeta(meta.getColumnSpec(), BYTE, meta.getCompressorName());
+ encoderMeta.setFillCompleteVector(meta.isFillCompleteVector());
ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, BYTE, compressorName), byteData.length);
+ encoderMeta, byteData.length);
columnPage.setBytePage(byteData);
return columnPage;
}
- private static ColumnPage newShortPage(TableSpec.ColumnSpec columnSpec, short[] shortData,
- String compressorName) {
- ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, SHORT, compressorName), shortData.length);
+ private static ColumnPage newShortPage(ColumnPageEncoderMeta meta, short[] shortData) {
+ ColumnPage columnPage = createPage(meta, shortData.length);
columnPage.setShortPage(shortData);
return columnPage;
}
- private static ColumnPage newShortIntPage(TableSpec.ColumnSpec columnSpec, byte[] shortIntData,
- String compressorName) {
- ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, SHORT_INT, compressorName), shortIntData.length / 3);
+ private static ColumnPage newShortIntPage(ColumnPageEncoderMeta meta, byte[] shortIntData) {
+ ColumnPage columnPage = createPage(meta, shortIntData.length / 3);
columnPage.setShortIntPage(shortIntData);
return columnPage;
}
- private static ColumnPage newIntPage(TableSpec.ColumnSpec columnSpec, int[] intData,
- String compressorName) {
- ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, INT, compressorName), intData.length);
+ private static ColumnPage newIntPage(ColumnPageEncoderMeta meta, int[] intData) {
+ ColumnPage columnPage = createPage(meta, intData.length);
columnPage.setIntPage(intData);
return columnPage;
}
- private static ColumnPage newLongPage(TableSpec.ColumnSpec columnSpec, long[] longData,
- String compressorName) {
- ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, LONG, compressorName), longData.length);
+ private static ColumnPage newLongPage(ColumnPageEncoderMeta meta, long[] longData) {
+ ColumnPage columnPage = createPage(meta, longData.length);
columnPage.setLongPage(longData);
return columnPage;
}
- private static ColumnPage newFloatPage(TableSpec.ColumnSpec columnSpec, float[] floatData,
- String compressorName) {
- ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, FLOAT, compressorName), floatData.length);
+ private static ColumnPage newFloatPage(ColumnPageEncoderMeta meta, float[] floatData) {
+ ColumnPage columnPage = createPage(meta, floatData.length);
columnPage.setFloatPage(floatData);
return columnPage;
}
- private static ColumnPage newDoublePage(TableSpec.ColumnSpec columnSpec, double[] doubleData,
- String compressorName) {
- ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, DOUBLE, compressorName), doubleData.length);
+ private static ColumnPage newDoublePage(ColumnPageEncoderMeta meta, double[] doubleData) {
+ ColumnPage columnPage = createPage(meta, doubleData.length);
columnPage.setDoublePage(doubleData);
return columnPage;
}
- private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec, byte[][] byteArray,
- String compressorName) {
+ private static ColumnPage newDecimalPage(ColumnPageEncoderMeta meta, byte[][] byteArray) {
+ ColumnPageEncoderMeta encoderMeta =
+ new ColumnPageEncoderMeta(meta.getColumnSpec(), meta.getColumnSpec().getSchemaDataType(),
+ meta.getCompressorName());
+ encoderMeta.setFillCompleteVector(meta.isFillCompleteVector());
ColumnPage columnPage = createPage(
- new ColumnPageEncoderMeta(columnSpec, columnSpec.getSchemaDataType(), compressorName),
+ encoderMeta,
byteArray.length);
columnPage.setByteArrayPage(byteArray);
return columnPage;
}
- private static ColumnPage newDecimalPage(TableSpec.ColumnSpec columnSpec,
- byte[] lvEncodedByteArray, String compressorName) throws MemoryException {
- return VarLengthColumnPageBase.newDecimalColumnPage(
- columnSpec, lvEncodedByteArray, compressorName);
+ private static ColumnPage newDecimalPage(ColumnPageEncoderMeta meta,
+ byte[] lvEncodedByteArray) throws MemoryException {
+ return VarLengthColumnPageBase.newDecimalColumnPage(meta, lvEncodedByteArray);
}
private static ColumnPage newLVBytesPage(TableSpec.ColumnSpec columnSpec,
@@ -813,25 +800,25 @@ public abstract class ColumnPage {
DataType storeDataType = meta.getStoreDataType();
if (storeDataType == DataTypes.BOOLEAN || storeDataType == DataTypes.BYTE) {
byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
- return newBytePage(columnSpec, byteData, meta.getCompressorName());
+ return newBytePage(meta, byteData);
} else if (storeDataType == DataTypes.SHORT) {
short[] shortData = compressor.unCompressShort(compressedData, offset, length);
- return newShortPage(columnSpec, shortData, meta.getCompressorName());
+ return newShortPage(meta, shortData);
} else if (storeDataType == DataTypes.SHORT_INT) {
byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
- return newShortIntPage(columnSpec, shortIntData, meta.getCompressorName());
+ return newShortIntPage(meta, shortIntData);
} else if (storeDataType == DataTypes.INT) {
int[] intData = compressor.unCompressInt(compressedData, offset, length);
- return newIntPage(columnSpec, intData, meta.getCompressorName());
+ return newIntPage(meta, intData);
} else if (storeDataType == DataTypes.LONG) {
long[] longData = compressor.unCompressLong(compressedData, offset, length);
- return newLongPage(columnSpec, longData, meta.getCompressorName());
+ return newLongPage(meta, longData);
} else if (storeDataType == DataTypes.FLOAT) {
float[] floatData = compressor.unCompressFloat(compressedData, offset, length);
- return newFloatPage(columnSpec, floatData, meta.getCompressorName());
+ return newFloatPage(meta, floatData);
} else if (storeDataType == DataTypes.DOUBLE) {
double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
- return newDoublePage(columnSpec, doubleData, meta.getCompressorName());
+ return newDoublePage(meta, doubleData);
} else if (!isLVEncoded && storeDataType == DataTypes.BYTE_ARRAY && (
columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE
|| columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) {
@@ -873,8 +860,7 @@ public abstract class ColumnPage {
public static ColumnPage decompressDecimalPage(ColumnPageEncoderMeta meta, byte[] compressedData,
int offset, int length) throws MemoryException {
Compressor compressor = CompressorFactory.getInstance().getCompressor(meta.getCompressorName());
- TableSpec.ColumnSpec columnSpec = meta.getColumnSpec();
- ColumnPage decimalPage = null;
+ ColumnPage decimalPage;
DataType storeDataType = meta.getStoreDataType();
if (storeDataType == DataTypes.BYTE) {
byte[] byteData = compressor.unCompressByte(compressedData, offset, length);
@@ -888,7 +874,7 @@ public abstract class ColumnPage {
return decimalPage;
} else if (storeDataType == DataTypes.SHORT_INT) {
byte[] shortIntData = compressor.unCompressByte(compressedData, offset, length);
- decimalPage = createDecimalPage(meta, shortIntData.length);
+ decimalPage = createDecimalPage(meta, shortIntData.length / 3);
decimalPage.setShortIntPage(shortIntData);
return decimalPage;
} else if (storeDataType == DataTypes.INT) {
@@ -903,10 +889,20 @@ public abstract class ColumnPage {
return decimalPage;
} else {
byte[] lvEncodedBytes = compressor.unCompressByte(compressedData, offset, length);
- return newDecimalPage(columnSpec, lvEncodedBytes, meta.getCompressorName());
+ return newDecimalPage(meta, lvEncodedBytes);
}
}
+ /**
+ * Whether unsafe enabled or not. In case of filling complete vector flow there is no need to use
+ * unsafe flow as we don't store the data in memory for long time.
+ * @param meta ColumnPageEncoderMeta
+ * @return boolean Whether unsafe enabled or not
+ */
+ protected static boolean isUnsafeEnabled(ColumnPageEncoderMeta meta) {
+ return unsafe && !meta.isFillCompleteVector();
+ }
+
public BitSet getNullBits() {
return nullBitSet;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
index 53ad956..82ccd22 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPageValueConverter.java
@@ -17,6 +17,8 @@
package org.apache.carbondata.core.datastore.page;
+import org.apache.carbondata.core.scan.result.vector.ColumnVectorInfo;
+
// Transformation type that can be applied to ColumnPage
public interface ColumnPageValueConverter {
void encode(int rowId, byte value);
@@ -35,4 +37,5 @@ public interface ColumnPageValueConverter {
double decodeDouble(long value);
double decodeDouble(float value);
double decodeDouble(double value);
+ void decodeAndFillVector(ColumnPage columnPage, ColumnVectorInfo vectorInfo);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d3b6ff1/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
index d3e945d..1867354 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeDecimalColumnPage.java
@@ -194,6 +194,31 @@ public class SafeDecimalColumnPage extends DecimalColumnPage {
}
@Override
+ public byte[] getBytePage() {
+ return byteData;
+ }
+
+ @Override public short[] getShortPage() {
+ return shortData;
+ }
+
+ @Override public byte[] getShortIntPage() {
+ return shortIntData;
+ }
+
+ @Override public int[] getIntPage() {
+ return intData;
+ }
+
+ @Override public long[] getLongPage() {
+ return longData;
+ }
+
+ @Override public byte[][] getByteArrayPage() {
+ return byteArrayData;
+ }
+
+ @Override
public void freeMemory() {
byteArrayData = null;
super.freeMemory();