You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/26 14:54:29 UTC
[1/4] incubator-carbondata git commit: Added V3 Format Writer and
Reader Code
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 922683eb8 -> 740358c13
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index eafcd9f..289c156 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -59,10 +59,12 @@ import org.apache.carbondata.core.util.CarbonMergerUtil;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NodeHolder;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
import org.apache.carbondata.format.BlockIndex;
+import org.apache.carbondata.format.BlockletInfo3;
import org.apache.carbondata.format.IndexHeader;
import org.apache.carbondata.processing.mdkeygen.file.FileData;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
@@ -137,8 +139,20 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* size reserved in one file for writing block meta data. It will be in percentage
*/
private int spaceReservedForBlockMetaSize;
- private FileOutputStream fileOutputStream;
- private List<BlockIndexInfo> blockIndexInfoList;
+
+ protected FileOutputStream fileOutputStream;
+
+ protected List<BlockIndexInfo> blockIndexInfoList;
+
+ /**
+ * list of metadata for V3 format
+ */
+ protected List<BlockletInfo3> blockletMetadata;
+
+ /**
+ * list of blocklet index
+ */
+ protected List<org.apache.carbondata.format.BlockletIndex> blockletIndex;
public AbstractFactDataWriter(CarbonDataWriterVo dataWriterVo) {
this.dataWriterVo = dataWriterVo;
@@ -187,6 +201,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)));
this.dataChunksOffsets = new ArrayList<>();
this.dataChunksLength = new ArrayList<>();
+ blockletMetadata = new ArrayList<BlockletInfo3>();
+ blockletIndex = new ArrayList<>();
}
/**
@@ -242,12 +258,14 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
LOGGER.info("Writing data to file as max file size reached for file: " + fileName
+ " .Data block size: " + currentFileSize);
// write meta data to end of the existing file
- writeBlockletInfoToFile(blockletInfoList, fileChannel, fileName);
+ writeBlockletInfoToFile(fileChannel, fileName);
this.currentFileSize = 0;
blockletInfoList =
new ArrayList<BlockletInfoColumnar>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
this.dataChunksOffsets = new ArrayList<>();
this.dataChunksLength = new ArrayList<>();
+ this.blockletMetadata = new ArrayList<>();
+ this.blockletIndex = new ArrayList<>();
CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
// rename carbon data file from in progress status to actual
renameCarbonDataFile();
@@ -316,7 +334,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
/**
* This method will write metadata at the end of file file format in thrift format
*/
- protected abstract void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList,
+ protected abstract void writeBlockletInfoToFile(
FileChannel channel, String filePath) throws CarbonDataWriterException;
/**
@@ -327,19 +345,19 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* @param filePath file path
* @param currentPosition current offset
*/
- protected void fillBlockIndexInfoDetails(List<BlockletInfoColumnar> infoList, long numberOfRows,
+ protected void fillBlockIndexInfoDetails(long numberOfRows,
String filePath, long currentPosition) {
// as min-max will change for each blocklet and second blocklet min-max can be lesser than
// the first blocklet so we need to calculate the complete block level min-max by taking
// the min value of each column and max value of each column
- byte[][] currentMinValue = infoList.get(0).getColumnMinData().clone();
- byte[][] currentMaxValue = infoList.get(0).getColumnMaxData().clone();
+ byte[][] currentMinValue = blockletInfoList.get(0).getColumnMinData().clone();
+ byte[][] currentMaxValue = blockletInfoList.get(0).getColumnMaxData().clone();
byte[][] minValue = null;
byte[][] maxValue = null;
- for (int i = 1; i < infoList.size(); i++) {
- minValue = infoList.get(i).getColumnMinData();
- maxValue = infoList.get(i).getColumnMaxData();
+ for (int i = 1; i < blockletInfoList.size(); i++) {
+ minValue = blockletInfoList.get(i).getColumnMinData();
+ maxValue = blockletInfoList.get(i).getColumnMaxData();
for (int j = 0; j < maxValue.length; j++) {
if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue[j]) > 0) {
currentMinValue[j] = minValue[j].clone();
@@ -352,8 +370,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
// start and end key we can take based on first blocklet
// start key will be the block start key as
// it is the least key and end blocklet end key will be the block end key as it is the max key
- BlockletBTreeIndex btree = new BlockletBTreeIndex(infoList.get(0).getStartKey(),
- infoList.get(infoList.size() - 1).getEndKey());
+ BlockletBTreeIndex btree = new BlockletBTreeIndex(blockletInfoList.get(0).getStartKey(),
+ blockletInfoList.get(blockletInfoList.size() - 1).getEndKey());
BlockletMinMaxIndex minmax = new BlockletMinMaxIndex();
minmax.setMinValues(currentMinValue);
minmax.setMaxValues(currentMaxValue);
@@ -414,7 +432,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* @throws IOException throws io exception if any problem while writing
* @throws CarbonDataWriterException data writing
*/
- private void writeIndexFile() throws IOException, CarbonDataWriterException {
+ protected void writeIndexFile() throws IOException, CarbonDataWriterException {
// get the header
IndexHeader indexHeader = CarbonMetadataUtil
.getIndexHeader(localCardinality, thriftColumnSchemaList, dataWriterVo.getBucketNumber());
@@ -444,7 +462,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*
* @throws CarbonDataWriterException
*/
- private void closeExecutorService() throws CarbonDataWriterException {
+ protected void closeExecutorService() throws CarbonDataWriterException {
executorService.shutdown();
try {
executorService.awaitTermination(2, TimeUnit.HOURS);
@@ -467,7 +485,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*
* @throws CarbonDataWriterException
*/
- private void renameCarbonDataFile() throws CarbonDataWriterException {
+ protected void renameCarbonDataFile() throws CarbonDataWriterException {
File origFile = new File(this.fileName.substring(0, this.fileName.lastIndexOf('.')));
File curFile = new File(this.fileName);
if (!curFile.renameTo(origFile)) {
@@ -481,7 +499,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* @param localFileName local file name with full path
* @throws CarbonDataWriterException
*/
- private void copyCarbonDataFileToCarbonStorePath(String localFileName)
+ protected void copyCarbonDataFileToCarbonStorePath(String localFileName)
throws CarbonDataWriterException {
long copyStartTime = System.currentTimeMillis();
LOGGER.info("Copying " + localFileName + " --> " + dataWriterVo.getCarbonDataDirectoryPath());
@@ -537,7 +555,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*/
@Override public void writeBlockletInfoToFile() throws CarbonDataWriterException {
if (this.blockletInfoList.size() > 0) {
- writeBlockletInfoToFile(this.blockletInfoList, fileChannel, fileName);
+ writeBlockletInfoToFile(fileChannel, fileName);
}
}
@@ -551,7 +569,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*/
public abstract void writeBlockletData(NodeHolder nodeHolder) throws CarbonDataWriterException;
- protected byte[][] fillAndCompressedKeyBlockData(IndexStorage<int[]>[] keyStorageArray,
+ protected byte[][] fillAndCompressedKeyBlockData(IndexStorage[] keyStorageArray,
int entryCount) {
byte[][] keyBlockData = new byte[keyStorageArray.length][];
int destPos = 0;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index ddcdfcd..94d2727 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.processing.store.writer;
import org.apache.carbondata.core.datastore.columnar.IndexStorage;
import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.util.NodeHolder;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
public interface CarbonFactDataWriter<T> {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
deleted file mode 100644
index 3b90869..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.writer;
-
-import java.util.BitSet;
-
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-
-public class NodeHolder {
- /**
- * keyArray
- */
- private byte[][] keyArray;
-
- /**
- * dataArray
- */
- private byte[][] dataArray;
-
- /**
- * measureLenght
- */
- private int[] measureLenght;
-
- /**
- * startKey
- */
- private byte[] startKey;
-
- /**
- * endKey
- */
- private byte[] endKey;
-
- /**
- * entryCount
- */
- private int entryCount;
- /**
- * keyLenghts
- */
- private int[] keyLengths;
-
- /**
- * dataAfterCompression
- */
- private short[][] dataAfterCompression;
-
- /**
- * indexMap
- */
- private short[][] indexMap;
-
- /**
- * keyIndexBlockLenght
- */
- private int[] keyBlockIndexLength;
-
- /**
- * isSortedKeyBlock
- */
- private boolean[] isSortedKeyBlock;
-
- private byte[][] compressedIndex;
-
- private byte[][] compressedIndexMap;
-
- /**
- * dataIndexMap
- */
- private int[] dataIndexMapLength;
-
- /**
- * dataIndexMap
- */
- private int[] dataIndexMapOffsets;
-
- /**
- * compressedDataIndex
- */
- private byte[][] compressedDataIndex;
-
- /**
- * column max data
- */
- private byte[][] columnMaxData;
-
- /**
- * column min data
- */
- private byte[][] columnMinData;
-
- /**
- * compression model for numbers data block.
- */
- private WriterCompressModel compressionModel;
-
- /**
- * array of aggBlocks flag to identify the aggBlocks
- */
- private boolean[] aggBlocks;
-
- /**
- * all columns max value
- */
- private byte[][] allMaxValue;
-
- /**
- * all column max value
- */
- private byte[][] allMinValue;
-
- /**
- * true if given index is colgroup block
- */
- private boolean[] colGrpBlock;
-
- /**
- * bit set which will holds the measure
- * indexes which are null
- */
- private BitSet[] measureNullValueIndex;
-
- /**
- * total length of dimension values
- */
- private int totalDimensionArrayLength;
-
- /**
- * total length of all measure values
- */
- private int totalMeasureArrayLength;
-
- /**
- * @return the keyArray
- */
- public byte[][] getKeyArray() {
- return keyArray;
- }
-
- /**
- * @param keyArray the keyArray to set
- */
- public void setKeyArray(byte[][] keyArray) {
- this.keyArray = keyArray;
- }
-
- /**
- * @return the dataArray
- */
- public byte[][] getDataArray() {
- return dataArray;
- }
-
- /**
- * @param dataArray the dataArray to set
- */
- public void setDataArray(byte[][] dataArray) {
- this.dataArray = dataArray;
- }
-
- /**
- * @return the measureLenght
- */
- public int[] getMeasureLenght() {
- return measureLenght;
- }
-
- /**
- * @param measureLenght the measureLenght to set
- */
- public void setMeasureLenght(int[] measureLenght) {
- this.measureLenght = measureLenght;
- }
-
- /**
- * @return the startKey
- */
- public byte[] getStartKey() {
- return startKey;
- }
-
- /**
- * @param startKey the startKey to set
- */
- public void setStartKey(byte[] startKey) {
- this.startKey = startKey;
- }
-
- /**
- * @return the endKey
- */
- public byte[] getEndKey() {
- return endKey;
- }
-
- /**
- * @param endKey the endKey to set
- */
- public void setEndKey(byte[] endKey) {
- this.endKey = endKey;
- }
-
- /**
- * @return the entryCount
- */
- public int getEntryCount() {
- return entryCount;
- }
-
- /**
- * @param entryCount the entryCount to set
- */
- public void setEntryCount(int entryCount) {
- this.entryCount = entryCount;
- }
-
- /**
- * @return the keyLenghts
- */
- public int[] getKeyLengths() {
- return keyLengths;
- }
-
- public void setKeyLengths(int[] keyLengths) {
- this.keyLengths = keyLengths;
- }
-
- /**
- * @return the keyBlockIndexLength
- */
- public int[] getKeyBlockIndexLength() {
- return keyBlockIndexLength;
- }
-
- /**
- * @param keyBlockIndexLength the keyBlockIndexLength to set
- */
- public void setKeyBlockIndexLength(int[] keyBlockIndexLength) {
- this.keyBlockIndexLength = keyBlockIndexLength;
- }
-
- /**
- * @return the isSortedKeyBlock
- */
- public boolean[] getIsSortedKeyBlock() {
- return isSortedKeyBlock;
- }
-
- /**
- * @param isSortedKeyBlock the isSortedKeyBlock to set
- */
- public void setIsSortedKeyBlock(boolean[] isSortedKeyBlock) {
- this.isSortedKeyBlock = isSortedKeyBlock;
- }
-
- /**
- * @return the compressedIndexex
- */
- public byte[][] getCompressedIndex() {
- return compressedIndex;
- }
-
- public void setCompressedIndex(byte[][] compressedIndex) {
- this.compressedIndex = compressedIndex;
- }
-
- /**
- * @return the compressedIndexMap
- */
- public byte[][] getCompressedIndexMap() {
- return compressedIndexMap;
- }
-
- /**
- * @param compressedIndexMap the compressedIndexMap to set
- */
- public void setCompressedIndexMap(byte[][] compressedIndexMap) {
- this.compressedIndexMap = compressedIndexMap;
- }
-
- /**
- * @return the compressedDataIndex
- */
- public byte[][] getCompressedDataIndex() {
- return compressedDataIndex;
- }
-
- /**
- * @param compressedDataIndex the compressedDataIndex to set
- */
- public void setCompressedDataIndex(byte[][] compressedDataIndex) {
- this.compressedDataIndex = compressedDataIndex;
- }
-
- /**
- * @return the dataIndexMapLength
- */
- public int[] getDataIndexMapLength() {
- return dataIndexMapLength;
- }
-
- /**
- * @param dataIndexMapLength the dataIndexMapLength to set
- */
- public void setDataIndexMapLength(int[] dataIndexMapLength) {
- this.dataIndexMapLength = dataIndexMapLength;
- }
-
- public byte[][] getColumnMaxData() {
- return this.columnMaxData;
- }
-
- public void setColumnMaxData(byte[][] columnMaxData) {
- this.columnMaxData = columnMaxData;
- }
-
- public byte[][] getColumnMinData() {
- return this.columnMinData;
- }
-
- public void setColumnMinData(byte[][] columnMinData) {
- this.columnMinData = columnMinData;
- }
-
- public WriterCompressModel getCompressionModel() {
- return compressionModel;
- }
-
- public void setCompressionModel(WriterCompressModel compressionModel) {
- this.compressionModel = compressionModel;
- }
-
- /**
- * returns array of aggBlocks flag to identify the aag blocks
- *
- * @return
- */
- public boolean[] getAggBlocks() {
- return aggBlocks;
- }
-
- /**
- * set array of aggBlocks flag to identify the aggBlocks
- *
- * @param aggBlocks
- */
- public void setAggBlocks(boolean[] aggBlocks) {
- this.aggBlocks = aggBlocks;
- }
-
- /**
- * @return
- */
- public boolean[] getColGrpBlocks() {
- return this.colGrpBlock;
- }
-
- /**
- * @param colGrpBlock true if block is column group
- */
- public void setColGrpBlocks(boolean[] colGrpBlock) {
- this.colGrpBlock = colGrpBlock;
- }
-
- /**
- * @return the measureNullValueIndex
- */
- public BitSet[] getMeasureNullValueIndex() {
- return measureNullValueIndex;
- }
-
- /**
- * @param measureNullValueIndex the measureNullValueIndex to set
- */
- public void setMeasureNullValueIndex(BitSet[] measureNullValueIndex) {
- this.measureNullValueIndex = measureNullValueIndex;
- }
-
- public int getTotalDimensionArrayLength() {
- return totalDimensionArrayLength;
- }
-
- public void setTotalDimensionArrayLength(int totalDimensionArrayLength) {
- this.totalDimensionArrayLength = totalDimensionArrayLength;
- }
-
- public int getTotalMeasureArrayLength() {
- return totalMeasureArrayLength;
- }
-
- public void setTotalMeasureArrayLength(int totalMeasureArrayLength) {
- this.totalMeasureArrayLength = totalMeasureArrayLength;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
index b7f8b4f..3218a51 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.store.writer.v1;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.List;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -29,12 +28,12 @@ import org.apache.carbondata.core.datastore.columnar.IndexStorage;
import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.NodeHolder;
import org.apache.carbondata.core.writer.CarbonFooterWriter;
import org.apache.carbondata.format.FileFooter;
import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
-import org.apache.carbondata.processing.store.writer.NodeHolder;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
@@ -363,15 +362,15 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
/**
* This method will write metadata at the end of file file format in thrift format
*/
- protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
+ protected void writeBlockletInfoToFile(FileChannel channel,
String filePath) throws CarbonDataWriterException {
try {
long currentPosition = channel.size();
CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
FileFooter convertFileMeta = CarbonMetadataUtil
- .convertFileFooter(infoList, localCardinality.length, localCardinality,
+ .convertFileFooter(blockletInfoList, localCardinality.length, localCardinality,
thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
- fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
+ fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), filePath, currentPosition);
writer.writeFooter(convertFileMeta, currentPosition);
} catch (IOException e) {
throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
index 185d98d..a9c4ce0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
@@ -30,11 +30,11 @@ import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NodeHolder;
import org.apache.carbondata.core.writer.CarbonFooterWriter;
import org.apache.carbondata.format.DataChunk2;
import org.apache.carbondata.format.FileFooter;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
-import org.apache.carbondata.processing.store.writer.NodeHolder;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
@@ -265,7 +265,7 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
/**
* This method will write metadata at the end of file file format in thrift format
*/
- protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
+ protected void writeBlockletInfoToFile(FileChannel channel,
String filePath) throws CarbonDataWriterException {
try {
// get the current file position
@@ -273,10 +273,10 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
// get thrift file footer instance
FileFooter convertFileMeta = CarbonMetadataUtil
- .convertFilterFooter2(infoList, localCardinality, thriftColumnSchemaList,
+ .convertFilterFooter2(blockletInfoList, localCardinality, thriftColumnSchemaList,
dataChunksOffsets, dataChunksLength);
// fill the carbon index details
- fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
+ fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), filePath, currentPosition);
// write the footer
writer.writeFooter(convertFileMeta, currentPosition);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
new file mode 100644
index 0000000..fa7bf27
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.store.writer.v3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NodeHolder;
+import org.apache.carbondata.core.writer.CarbonFooterWriter;
+import org.apache.carbondata.format.BlockletInfo3;
+import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
+import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
+import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+
+/**
+ * Below class will be used to write the data in V3 format
+ */
+public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> {
+
+ /**
+ * number of pages in one column
+ */
+ private int numberOfChunksInBlocklet;
+
+ /**
+ * persist the page data to be written in the file
+ */
+ private DataWriterHolder dataWriterHolder;
+
+ public CarbonFactDataWriterImplV3(CarbonDataWriterVo dataWriterVo) {
+ super(dataWriterVo);
+ this.numberOfChunksInBlocklet = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+ CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE));
+ dataWriterHolder = new DataWriterHolder();
+ }
+
+ /**
+ * Below method will be used to build the node holder object
+ * This node holder object will be used to persist data which will
+ * be written in carbon data file
+ */
+ @Override public NodeHolder buildDataNodeHolder(IndexStorage<short[]>[] keyStorageArray,
+ byte[][] dataArray, int entryCount, byte[] startKey, byte[] endKey,
+ WriterCompressModel compressionModel, byte[] noDictionaryStartKey, byte[] noDictionaryEndKey)
+ throws CarbonDataWriterException {
+ // if there are no NO-Dictionary column present in the table then
+ // set the empty byte array
+ if (null == noDictionaryEndKey) {
+ noDictionaryEndKey = new byte[0];
+ }
+ if (null == noDictionaryStartKey) {
+ noDictionaryStartKey = new byte[0];
+ }
+ // total measure length;
+ int totalMsrArrySize = 0;
+ // current measure length;
+ int currentMsrLenght = 0;
+ int totalKeySize = 0;
+ int keyBlockSize = 0;
+
+ boolean[] isSortedData = new boolean[keyStorageArray.length];
+ int[] keyLengths = new int[keyStorageArray.length];
+
+ // below will calculate min and max value for each column
+ // for below 2d array, first index will be for column and second will be min
+ // max
+ // value for same column
+ // byte[][] columnMinMaxData = new byte[keyStorageArray.length][];
+
+ byte[][] dimensionMinValue = new byte[keyStorageArray.length][];
+ byte[][] dimensionMaxValue = new byte[keyStorageArray.length][];
+
+ byte[][] measureMinValue = new byte[dataArray.length][];
+ byte[][] measureMaxValue = new byte[dataArray.length][];
+
+ byte[][] keyBlockData = fillAndCompressedKeyBlockData(keyStorageArray, entryCount);
+ boolean[] colGrpBlock = new boolean[keyStorageArray.length];
+
+ for (int i = 0; i < keyLengths.length; i++) {
+ keyLengths[i] = keyBlockData[i].length;
+ isSortedData[i] = keyStorageArray[i].isAlreadySorted();
+ keyBlockSize++;
+ totalKeySize += keyLengths[i];
+ if (dataWriterVo.getIsComplexType()[i] || dataWriterVo.getIsDictionaryColumn()[i]) {
+ dimensionMinValue[i] = keyStorageArray[i].getMin();
+ dimensionMaxValue[i] = keyStorageArray[i].getMax();
+ } else {
+ dimensionMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
+ dimensionMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
+ }
+ // if keyStorageArray is instance of ColGroupBlockStorage than it's
+ // colGroup chunk
+ if (keyStorageArray[i] instanceof ColGroupBlockStorage) {
+ colGrpBlock[i] = true;
+ }
+ }
+ for (int i = 0; i < dataArray.length; i++) {
+ measureMaxValue[i] = CarbonMetadataUtil
+ .getByteValueForMeasure(compressionModel.getMaxValue()[i],
+ dataWriterVo.getSegmentProperties().getMeasures().get(i).getDataType());
+ measureMinValue[i] = CarbonMetadataUtil
+ .getByteValueForMeasure(compressionModel.getMinValue()[i],
+ dataWriterVo.getSegmentProperties().getMeasures().get(i).getDataType());
+ }
+ int[] keyBlockIdxLengths = new int[keyBlockSize];
+ byte[][] dataAfterCompression = new byte[keyBlockSize][];
+ byte[][] indexMap = new byte[keyBlockSize][];
+ for (int i = 0; i < isSortedData.length; i++) {
+ if (!isSortedData[i]) {
+ dataAfterCompression[i] = getByteArray(keyStorageArray[i].getDataAfterComp());
+ if (null != keyStorageArray[i].getIndexMap()
+ && keyStorageArray[i].getIndexMap().length > 0) {
+ indexMap[i] = getByteArray(keyStorageArray[i].getIndexMap());
+ } else {
+ indexMap[i] = new byte[0];
+ }
+ keyBlockIdxLengths[i] = (dataAfterCompression[i].length + indexMap[i].length)
+ + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+ }
+ }
+ byte[][] compressedDataIndex = new byte[keyBlockSize][];
+ int[] dataIndexMapLength = new int[keyBlockSize];
+ for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
+ if (dataWriterVo.getAggBlocks()[i]) {
+ try {
+ compressedDataIndex[i] = getByteArray(keyStorageArray[i].getDataIndexMap());
+ dataIndexMapLength[i] = compressedDataIndex[i].length;
+ } catch (Exception e) {
+ throw new CarbonDataWriterException(e.getMessage());
+ }
+ }
+ }
+ int[] msrLength = new int[dataWriterVo.getMeasureCount()];
+ // calculate the total size required for all the measure and get the
+ // each measure size
+ for (int i = 0; i < dataArray.length; i++) {
+ currentMsrLenght = dataArray[i].length;
+ totalMsrArrySize += currentMsrLenght;
+ msrLength[i] = currentMsrLenght;
+ }
+ NodeHolder holder = new NodeHolder();
+ holder.setDataArray(dataArray);
+ holder.setKeyArray(keyBlockData);
+ // end key format will be <length of dictionary key><length of no
+ // dictionary key><DictionaryKey><No Dictionary key>
+ byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
+ ByteBuffer buffer = ByteBuffer.allocate(
+ CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
+ + endKey.length + updatedNoDictionaryEndKey.length);
+ buffer.putInt(endKey.length);
+ buffer.putInt(updatedNoDictionaryEndKey.length);
+ buffer.put(endKey);
+ buffer.put(updatedNoDictionaryEndKey);
+ buffer.rewind();
+ holder.setEndKey(buffer.array());
+ holder.setMeasureLenght(msrLength);
+ byte[] updatedNoDictionaryStartKey = updateNoDictionaryStartAndEndKey(noDictionaryStartKey);
+ // start key format will be <length of dictionary key><length of no
+ // dictionary key><DictionaryKey><No Dictionary key>
+ buffer = ByteBuffer.allocate(
+ CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
+ + startKey.length + updatedNoDictionaryStartKey.length);
+ buffer.putInt(startKey.length);
+ buffer.putInt(updatedNoDictionaryStartKey.length);
+ buffer.put(startKey);
+ buffer.put(updatedNoDictionaryStartKey);
+ buffer.rewind();
+ holder.setStartKey(buffer.array());
+ holder.setEntryCount(entryCount);
+ holder.setKeyLengths(keyLengths);
+ holder.setKeyBlockIndexLength(keyBlockIdxLengths);
+ holder.setIsSortedKeyBlock(isSortedData);
+ holder.setCompressedIndex(dataAfterCompression);
+ holder.setCompressedIndexMap(indexMap);
+ holder.setDataIndexMapLength(dataIndexMapLength);
+ holder.setCompressedDataIndex(compressedDataIndex);
+ holder.setCompressionModel(compressionModel);
+ holder.setTotalDimensionArrayLength(totalKeySize);
+ holder.setTotalMeasureArrayLength(totalMsrArrySize);
+ holder.setMeasureColumnMaxData(measureMaxValue);
+ holder.setMeasureColumnMinData(measureMinValue);
+ // setting column min max value
+ holder.setColumnMaxData(dimensionMaxValue);
+ holder.setColumnMinData(dimensionMinValue);
+ holder.setAggBlocks(dataWriterVo.getAggBlocks());
+ holder.setColGrpBlocks(colGrpBlock);
+ return holder;
+ }
+
+ /**
+ * Below method will be used to convert short array to byte array
+ *
+ * @param data in short data
+ * @return byte array
+ */
+ private byte[] getByteArray(short[] data) {
+ ByteBuffer buffer = ByteBuffer.allocate(data.length * 2);
+ for (short i = 0; i < data.length; i++) {
+ buffer.putShort(data[i]);
+ }
+ buffer.flip();
+ return buffer.array();
+ }
+
+ @Override protected void writeBlockletInfoToFile(FileChannel channel, String filePath)
+ throws CarbonDataWriterException {
+ try {
+ // get the current file position
+ long currentPosition = channel.size();
+ CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
+ // get thrift file footer instance
+ FileFooter convertFileMeta = CarbonMetadataUtil
+ .convertFileFooter3(blockletMetadata, blockletIndex, localCardinality,
+ thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
+ // fill the carbon index details
+ fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), filePath, currentPosition);
+ // write the footer
+ writer.writeFooter(convertFileMeta, currentPosition);
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
+ }
+ }
+
+ /**
+ * Below method will be used to write blocklet data to file
+ */
+ @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
+ // check the number of pages present in data holder, if pages is exceeding threshold
+ // it will write the pages to file
+ if (dataWriterHolder.getNumberOfPagesAdded() == numberOfChunksInBlocklet) {
+ writeDataToFile(fileChannel);
+ }
+ dataWriterHolder.addNodeHolder(holder);
+ }
+
+ private void writeDataToFile(FileChannel channel) {
+ // get the list of node holder list
+ List<NodeHolder> nodeHolderList = dataWriterHolder.getNodeHolder();
+ long blockletDataSize = 0;
+ // get data chunks for all the column
+ byte[][] dataChunkBytes =
+ new byte[nodeHolderList.get(0).getKeyArray().length + nodeHolderList.get(0)
+ .getDataArray().length][];
+ int measureStartIndex = nodeHolderList.get(0).getKeyArray().length;
+ // calculate the size of data chunks
+ try {
+ for (int i = 0; i < nodeHolderList.get(0).getKeyArray().length; i++) {
+ dataChunkBytes[i] = CarbonUtil.getByteArray(CarbonMetadataUtil
+ .getDataChunk3(nodeHolderList, thriftColumnSchemaList,
+ dataWriterVo.getSegmentProperties(), i, true));
+ blockletDataSize += dataChunkBytes[i].length;
+ }
+ for (int i = 0; i < nodeHolderList.get(0).getDataArray().length; i++) {
+ dataChunkBytes[measureStartIndex] = CarbonUtil.getByteArray(CarbonMetadataUtil
+ .getDataChunk3(nodeHolderList, thriftColumnSchemaList,
+ dataWriterVo.getSegmentProperties(), i, false));
+ blockletDataSize += dataChunkBytes[measureStartIndex].length;
+ measureStartIndex++;
+ }
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while getting the data chunks", e);
+ }
+ // calculate the total size of data to be written
+ blockletDataSize += dataWriterHolder.getSize();
+ // to check if data size will exceed the block size then create a new file
+ updateBlockletFileChannel(blockletDataSize);
+ // write data to file
+ writeDataToFile(fileChannel, dataChunkBytes);
+ // clear the data holder
+ dataWriterHolder.clear();
+ }
+
+ /**
+ * Below method will be used to write data in carbon data file
+ * Data Format
+ * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
+ * <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
+ * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
+ * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
+ * Each page will contain column data, Inverted index and rle index
+ *
+ * @param channel
+ * @param dataChunkBytes
+ */
+ private void writeDataToFile(FileChannel channel, byte[][] dataChunkBytes) {
+ long offset = 0;
+ // write the header
+ try {
+ if (fileChannel.size() == 0) {
+ ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+ byte[] header = (CarbonCommonConstants.CARBON_DATA_VERSION_HEADER + version).getBytes();
+ ByteBuffer buffer = ByteBuffer.allocate(header.length);
+ buffer.put(header);
+ buffer.rewind();
+ fileChannel.write(buffer);
+ }
+ offset = channel.size();
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while getting the file channel size");
+ }
+ // to maintain the offset of each data chunk in blocklet
+ List<Long> currentDataChunksOffset = new ArrayList<>();
+ // to maintain the length of each data chunk in blocklet
+ List<Integer> currentDataChunksLength = new ArrayList<>();
+ // get the node holder list
+ List<NodeHolder> nodeHolderList = dataWriterHolder.getNodeHolder();
+ int numberOfDimension = nodeHolderList.get(0).getKeyArray().length;
+ int numberOfMeasures = nodeHolderList.get(0).getDataArray().length;
+ NodeHolder nodeHolder = null;
+ ByteBuffer buffer = null;
+ int bufferSize = 0;
+ long dimensionOffset = 0;
+ long measureOffset = 0;
+ int numberOfRows = 0;
+ // calculate the number of rows in each blocklet
+ for (int j = 0; j < nodeHolderList.size(); j++) {
+ numberOfRows += nodeHolderList.get(j).getEntryCount();
+ }
+ try {
+ for (int i = 0; i < numberOfDimension; i++) {
+ currentDataChunksOffset.add(offset);
+ currentDataChunksLength.add(dataChunkBytes[i].length);
+ buffer = ByteBuffer.allocate(dataChunkBytes[i].length);
+ buffer.put(dataChunkBytes[i]);
+ buffer.flip();
+ fileChannel.write(buffer);
+ offset += dataChunkBytes[i].length;
+ for (int j = 0; j < nodeHolderList.size(); j++) {
+ nodeHolder = nodeHolderList.get(j);
+ bufferSize = nodeHolder.getKeyLengths()[i] + (!nodeHolder.getIsSortedKeyBlock()[i] ?
+ nodeHolder.getKeyBlockIndexLength()[i] :
+ 0) + (dataWriterVo.getAggBlocks()[i] ?
+ nodeHolder.getCompressedDataIndex()[i].length :
+ 0);
+ buffer = ByteBuffer.allocate(bufferSize);
+ buffer.put(nodeHolder.getKeyArray()[i]);
+ if (!nodeHolder.getIsSortedKeyBlock()[i]) {
+ buffer.putInt(nodeHolder.getCompressedIndex()[i].length);
+ buffer.put(nodeHolder.getCompressedIndex()[i]);
+ if (nodeHolder.getCompressedIndexMap()[i].length > 0) {
+ buffer.put(nodeHolder.getCompressedIndexMap()[i]);
+ }
+ }
+ if (nodeHolder.getAggBlocks()[i]) {
+ buffer.put(nodeHolder.getCompressedDataIndex()[i]);
+ }
+ buffer.flip();
+ fileChannel.write(buffer);
+ offset += bufferSize;
+ }
+ }
+ dimensionOffset = offset;
+ int dataChunkStartIndex = nodeHolderList.get(0).getKeyArray().length;
+ for (int i = 0; i < numberOfMeasures; i++) {
+ nodeHolderList = dataWriterHolder.getNodeHolder();
+ currentDataChunksOffset.add(offset);
+ currentDataChunksLength.add(dataChunkBytes[dataChunkStartIndex].length);
+ buffer = ByteBuffer.allocate(dataChunkBytes[dataChunkStartIndex].length);
+ buffer.put(dataChunkBytes[dataChunkStartIndex]);
+ buffer.flip();
+ fileChannel.write(buffer);
+ offset += dataChunkBytes[dataChunkStartIndex].length;
+ dataChunkStartIndex++;
+ for (int j = 0; j < nodeHolderList.size(); j++) {
+ nodeHolder = nodeHolderList.get(j);
+ bufferSize = nodeHolder.getDataArray()[i].length;
+ buffer = ByteBuffer.allocate(bufferSize);
+ buffer.put(nodeHolder.getDataArray()[i]);
+ buffer.flip();
+ fileChannel.write(buffer);
+ offset += bufferSize;
+ }
+ }
+ measureOffset = offset;
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while writing the data", e);
+ }
+ blockletIndex.add(CarbonMetadataUtil
+ .getBlockletIndex(nodeHolderList, dataWriterVo.getSegmentProperties().getMeasures()));
+ BlockletInfo3 blockletInfo3 =
+ new BlockletInfo3(numberOfRows, currentDataChunksOffset, currentDataChunksLength,
+ dimensionOffset, measureOffset);
+ blockletMetadata.add(blockletInfo3);
+ }
+
+ /**
+ * Below method will be used to fill the block info details
+ *
+ * @param numberOfRows number of rows in file
+ * @param filePath file path
+ * @param currentPosition current offset
+ */
+ protected void fillBlockIndexInfoDetails(long numberOfRows, String filePath,
+ long currentPosition) {
+ byte[][] currentMinValue = new byte[blockletIndex.get(0).min_max_index.max_values.size()][];
+ byte[][] currentMaxValue = new byte[blockletIndex.get(0).min_max_index.max_values.size()][];
+ for (int i = 0; i < currentMaxValue.length; i++) {
+ currentMinValue[i] = blockletIndex.get(0).min_max_index.getMin_values().get(i).array();
+ currentMaxValue[i] = blockletIndex.get(0).min_max_index.getMax_values().get(i).array();
+ }
+ byte[] minValue = null;
+ byte[] maxValue = null;
+ int measureStartIndex = currentMinValue.length - dataWriterVo.getMeasureCount();
+ for (int i = 1; i < blockletIndex.size(); i++) {
+ for (int j = 0; j < measureStartIndex; j++) {
+ minValue = blockletIndex.get(i).min_max_index.getMin_values().get(j).array();
+ maxValue = blockletIndex.get(i).min_max_index.getMax_values().get(j).array();
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue) > 0) {
+ currentMinValue[j] = minValue.clone();
+ }
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMaxValue[j], maxValue) < 0) {
+ currentMaxValue[j] = maxValue.clone();
+ }
+ }
+ int measureIndex = 0;
+ for (int j = measureStartIndex; j < currentMinValue.length; j++) {
+ minValue = blockletIndex.get(i).min_max_index.getMin_values().get(j).array();
+ maxValue = blockletIndex.get(i).min_max_index.getMax_values().get(j).array();
+
+ if (CarbonMetadataUtil.compareMeasureData(currentMinValue[j], minValue,
+ dataWriterVo.getSegmentProperties().getMeasures().get(measureIndex).getDataType())
+ > 0) {
+ currentMinValue[j] = minValue.clone();
+ }
+ if (CarbonMetadataUtil.compareMeasureData(currentMaxValue[j], maxValue,
+ dataWriterVo.getSegmentProperties().getMeasures().get(measureIndex).getDataType())
+ < 0) {
+ currentMaxValue[j] = maxValue.clone();
+ }
+ }
+ }
+ BlockletBTreeIndex btree =
+ new BlockletBTreeIndex(blockletIndex.get(0).b_tree_index.getStart_key(),
+ blockletIndex.get(blockletIndex.size() - 1).b_tree_index.getEnd_key());
+ BlockletMinMaxIndex minmax = new BlockletMinMaxIndex();
+ minmax.setMinValues(currentMinValue);
+ minmax.setMaxValues(currentMaxValue);
+ org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex blockletIndex =
+ new org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex(btree, minmax);
+ BlockIndexInfo blockIndexInfo =
+ new BlockIndexInfo(numberOfRows, filePath.substring(0, filePath.lastIndexOf('.')),
+ currentPosition, blockletIndex);
+ blockIndexInfoList.add(blockIndexInfo);
+ }
+
+ /**
+ * Method will be used to close the open file channel
+ *
+ * @throws CarbonDataWriterException
+ */
+ public void closeWriter() throws CarbonDataWriterException {
+ if (dataWriterHolder.getNodeHolder().size() > 0) {
+ writeDataToFile(fileChannel);
+ writeBlockletInfoToFile(fileChannel, fileName);
+ CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
+ renameCarbonDataFile();
+ copyCarbonDataFileToCarbonStorePath(
+ this.fileName.substring(0, this.fileName.lastIndexOf('.')));
+ try {
+ writeIndexFile();
+ } catch (IOException e) {
+ throw new CarbonDataWriterException("Problem while writing the index file", e);
+ }
+ }
+ closeExecutorService();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
new file mode 100644
index 0000000..0827bd0
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.store.writer.v3;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.util.NodeHolder;
+
+public class DataWriterHolder {
+ private List<NodeHolder> nodeHolder;
+ private long currentSize;
+
+ public DataWriterHolder() {
+ this.nodeHolder = new ArrayList<NodeHolder>();
+ }
+
+ public void clear() {
+ nodeHolder.clear();
+ currentSize = 0;
+ }
+
+ public void addNodeHolder(NodeHolder holder) {
+ this.nodeHolder.add(holder);
+
+ int size = 0;
+ // add row id index length
+ for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
+ if (!holder.getIsSortedKeyBlock()[i]) {
+ size += holder.getKeyBlockIndexLength()[i];
+ }
+ }
+ // add rle index length
+ for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
+ if (holder.getAggBlocks()[i]) {
+ size += holder.getDataIndexMapLength()[i];
+ }
+ }
+ currentSize +=
+ holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength() + size;
+ }
+
+ public long getSize() {
+ return currentSize;
+ }
+
+ public int getNumberOfPagesAdded() {
+ return nodeHolder.size();
+ }
+
+ public List<NodeHolder> getNodeHolder() {
+ return nodeHolder;
+ }
+}
[4/4] incubator-carbondata git commit: [CARBONDATA-726] Added code
for V3 format Writer and Reader This closes #609
Posted by ja...@apache.org.
[CARBONDATA-726] Added code for V3 format Writer and Reader This closes #609
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/740358c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/740358c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/740358c1
Branch: refs/heads/master
Commit: 740358c1344a03a2795c38f1c49548b6413a60e5
Parents: 922683e 2cf1104
Author: jackylk <ja...@huawei.com>
Authored: Sun Feb 26 22:52:27 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun Feb 26 22:52:27 2017 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 38 +-
.../constants/CarbonV3DataFormatConstants.java | 84 ++++
.../datastore/chunk/AbstractRawColumnChunk.java | 11 +
.../chunk/reader/CarbonDataReaderFactory.java | 14 +-
.../AbstractChunkReaderV2V3Format.java | 126 +++++
...mpressedDimensionChunkFileBasedReaderV2.java | 127 ++---
...mpressedDimensionChunkFileBasedReaderV3.java | 268 ++++++++++
.../AbstractMeasureChunkReaderV2V3Format.java | 124 +++++
...CompressedMeasureChunkFileBasedReaderV2.java | 106 +---
...CompressedMeasureChunkFileBasedReaderV3.java | 239 +++++++++
.../SafeFixedLengthDimensionDataChunkStore.java | 9 +-
.../columnar/BlockIndexerStorageForShort.java | 228 +++++++++
.../columnar/ColumnWithShortIndex.java | 76 +++
.../ColumnWithShortIndexForNoDictionay.java | 46 ++
.../core/metadata/ColumnarFormatVersion.java | 9 +-
.../executor/impl/AbstractQueryExecutor.java | 10 +-
.../IncludeColGroupFilterExecuterImpl.java | 24 +
...velRangeLessThanEqualFilterExecuterImpl.java | 14 +-
.../RowLevelRangeLessThanFiterExecuterImpl.java | 14 +-
.../carbondata/core/util/BitSetGroup.java | 6 +-
.../core/util/CarbonMetadataUtil.java | 347 ++++++++++++-
.../carbondata/core/util/CarbonProperties.java | 110 +++-
.../apache/carbondata/core/util/CarbonUtil.java | 110 ++++
.../util/DataFileFooterConverterFactory.java | 5 +-
.../core/util/DataFileFooterConverterV3.java | 141 ++++++
.../apache/carbondata/core/util/NodeHolder.java | 430 ++++++++++++++++
.../core/util/CarbonMetadataUtilTest.java | 2 +-
format/src/main/thrift/carbondata.thrift | 27 +-
.../store/CarbonDataWriterFactory.java | 7 +-
.../store/CarbonFactDataHandlerColumnar.java | 56 ++-
.../store/writer/AbstractFactDataWriter.java | 54 +-
.../store/writer/CarbonFactDataWriter.java | 1 +
.../processing/store/writer/NodeHolder.java | 410 ---------------
.../writer/v1/CarbonFactDataWriterImplV1.java | 9 +-
.../writer/v2/CarbonFactDataWriterImplV2.java | 8 +-
.../writer/v3/CarbonFactDataWriterImplV3.java | 499 +++++++++++++++++++
.../store/writer/v3/DataWriterHolder.java | 68 +++
37 files changed, 3114 insertions(+), 743 deletions(-)
----------------------------------------------------------------------
[2/4] incubator-carbondata git commit: Added V3 Format Writer and
Reader Code
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 0c2e8ab..1e8207c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.impl.StandardLogService;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.BlockIndexStore;
import org.apache.carbondata.core.datastore.IndexKey;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
@@ -60,6 +61,7 @@ import org.apache.carbondata.core.scan.model.QueryMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
@@ -336,6 +338,10 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
int[] dimensionsBlockIndexes = QueryUtil.getDimensionsBlockIndexes(updatedQueryDimension,
segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions,
queryProperties.complexFilterDimension, allProjectionListDimensionIdexes);
+ int numberOfColumnToBeReadInOneIO = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+ CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE));
+
if (dimensionsBlockIndexes.length > 0) {
numberOfElementToConsider = dimensionsBlockIndexes[dimensionsBlockIndexes.length - 1]
== segmentProperties.getBlockTodimensionOrdinalMapping().size() - 1 ?
@@ -343,7 +349,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
dimensionsBlockIndexes.length;
blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(CarbonUtil
.getRangeIndex(dimensionsBlockIndexes, numberOfElementToConsider,
- CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO));
+ numberOfColumnToBeReadInOneIO));
} else {
blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(new int[0][0]);
}
@@ -362,7 +368,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// setting all the measure chunk indexes to be read from file
blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(CarbonUtil
.getRangeIndex(measureBlockIndexes, numberOfElementToConsider,
- CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO));
+ numberOfColumnToBeReadInOneIO));
} else {
blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(new int[0][0]);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
index d2523a1..c64f498 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.core.scan.filter.executer;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
@@ -24,11 +25,14 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
import org.apache.carbondata.core.util.ByteUtil;
/**
@@ -80,6 +84,26 @@ public class IncludeColGroupFilterExecuterImpl extends IncludeFilterExecuterImpl
return bitSet;
}
+ @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {
+ int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+ .get(dimColumnEvaluatorInfo.getColumnIndex());
+ if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+ blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+ .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+ }
+ DimensionRawColumnChunk dimensionRawColumnChunk =
+ blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+ BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
+ for (int i = 0; i < dimensionRawColumnChunk.getPagesCount(); i++) {
+ if (dimensionRawColumnChunk.getMaxValues() != null) {
+ BitSet bitSet = getFilteredIndexes(dimensionRawColumnChunk.convertToDimColDataChunk(i),
+ dimensionRawColumnChunk.getRowCount()[i]);
+ bitSetGroup.setBitSet(bitSet, i);
+ }
+ }
+ return bitSetGroup;
+ }
+
/**
* It is required for extracting column data from columngroup chunk
*
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 2bdce8d..c7e2acc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -96,17 +96,9 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
if (rawColumnChunk.getMinValues() != null) {
if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
- int compare = ByteUtil.UnsafeComparer.INSTANCE
- .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]);
- if (compare >= 0) {
- BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
- bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
- bitSetGroup.setBitSet(bitSet, i);
- } else {
- BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
- rawColumnChunk.getRowCount()[i]);
- bitSetGroup.setBitSet(bitSet, i);
- }
+ BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+ rawColumnChunk.getRowCount()[i]);
+ bitSetGroup.setBitSet(bitSet, i);
}
} else {
BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index ae9ba8a..d9795eb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -96,17 +96,9 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
if (rawColumnChunk.getMinValues() != null) {
if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
- int compare = ByteUtil.UnsafeComparer.INSTANCE
- .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]);
- if (compare > 0) {
- BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
- bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
- bitSetGroup.setBitSet(bitSet, i);
- } else {
- BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
- rawColumnChunk.getRowCount()[i]);
- bitSetGroup.setBitSet(bitSet, i);
- }
+ BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+ rawColumnChunk.getRowCount()[i]);
+ bitSetGroup.setBitSet(bitSet, i);
}
} else {
BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
index 07e3487..323042a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
@@ -52,9 +52,11 @@ public class BitSetGroup {
public void and(BitSetGroup group) {
int i = 0;
for (BitSet bitSet : bitSets) {
- BitSet otherSet = group.getBitSet(i);
+ BitSet otherSet = group.getBitSet(i);
if (bitSet != null && otherSet != null) {
bitSet.and(otherSet);
+ } else {
+ bitSets[i] = null;
}
i++;
}
@@ -63,7 +65,7 @@ public class BitSetGroup {
public void or(BitSetGroup group) {
int i = 0;
for (BitSet bitSet : bitSets) {
- BitSet otherSet = group.getBitSet(i);
+ BitSet otherSet = group.getBitSet(i);
if (bitSet != null && otherSet != null) {
bitSet.or(otherSet);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 25e7cfe..55c0302 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -35,18 +36,22 @@ import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.BlockletBTreeIndex;
import org.apache.carbondata.format.BlockletIndex;
import org.apache.carbondata.format.BlockletInfo;
import org.apache.carbondata.format.BlockletInfo2;
+import org.apache.carbondata.format.BlockletInfo3;
import org.apache.carbondata.format.BlockletMinMaxIndex;
import org.apache.carbondata.format.ChunkCompressionMeta;
import org.apache.carbondata.format.ColumnSchema;
import org.apache.carbondata.format.CompressionCodec;
import org.apache.carbondata.format.DataChunk;
import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
import org.apache.carbondata.format.Encoding;
import org.apache.carbondata.format.FileFooter;
import org.apache.carbondata.format.IndexHeader;
@@ -109,6 +114,50 @@ public class CarbonMetadataUtil {
}
/**
+ * It converts list of BlockletInfoColumnar to FileFooter thrift objects
+ *
+ * @param infoList
+ * @param numCols
+ * @param cardinalities
+ * @return FileFooter
+ */
+ public static FileFooter convertFileFooter3(List<BlockletInfo3> infoList,
+ List<BlockletIndex> blockletIndexs, int[] cardinalities, List<ColumnSchema> columnSchemaList,
+ SegmentProperties segmentProperties) throws IOException {
+ FileFooter footer = getFileFooter3(infoList, blockletIndexs, cardinalities, columnSchemaList);
+ for (BlockletInfo3 info : infoList) {
+ footer.addToBlocklet_info_list3(info);
+ }
+ return footer;
+ }
+
+ /**
+ * Below method will be used to get the file footer object
+ *
+ * @param infoList blocklet info
+ * @param cardinalities cardinlaity of dimension columns
+ * @param columnSchemaList column schema list
+ * @return file footer
+ */
+ private static FileFooter getFileFooter3(List<BlockletInfo3> infoList,
+ List<BlockletIndex> blockletIndexs, int[] cardinalities,
+ List<ColumnSchema> columnSchemaList) {
+ SegmentInfo segmentInfo = new SegmentInfo();
+ segmentInfo.setNum_cols(columnSchemaList.size());
+ segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(cardinalities));
+ ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+ FileFooter footer = new FileFooter();
+ footer.setVersion(version.number());
+ footer.setNum_rows(getNumberOfRowForFooter(infoList));
+ footer.setSegment_info(segmentInfo);
+ footer.setTable_columns(columnSchemaList);
+ for (BlockletIndex info : blockletIndexs) {
+ footer.addToBlocklet_index_list(info);
+ }
+ return footer;
+ }
+
+ /**
* Below method will be used to get the file footer object for
*
* @param infoList blocklet info
@@ -162,6 +211,20 @@ public class CarbonMetadataUtil {
return numberOfRows;
}
+ /**
+ * Get total number of rows for the file.
+ *
+ * @param infoList
+ * @return
+ */
+ private static long getNumberOfRowForFooter(List<BlockletInfo3> infoList) {
+ long numberOfRows = 0;
+ for (BlockletInfo3 info : infoList) {
+ numberOfRows += info.num_rows;
+ }
+ return numberOfRows;
+ }
+
private static BlockletIndex getBlockletIndex(BlockletInfoColumnar info) {
BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
@@ -181,9 +244,52 @@ public class CarbonMetadataUtil {
return blockletIndex;
}
+ public static BlockletIndex getBlockletIndex(List<NodeHolder> nodeHolderList,
+ List<CarbonMeasure> carbonMeasureList) {
+ BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
+ for (byte[] max : nodeHolderList.get(nodeHolderList.size() - 1).getColumnMaxData()) {
+ blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(max));
+ }
+ for (byte[] min : nodeHolderList.get(0).getColumnMinData()) {
+ blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
+ }
+ byte[][] measureMaxValue = nodeHolderList.get(0).getMeasureColumnMaxData().clone();
+ byte[][] measureMinValue = nodeHolderList.get(0).getMeasureColumnMinData().clone();
+ byte[] minVal = null;
+ byte[] maxVal = null;
+ for (int i = 1; i < nodeHolderList.size(); i++) {
+ for (int j = 0; j < measureMinValue.length; j++) {
+ minVal = nodeHolderList.get(i).getMeasureColumnMinData()[j];
+ maxVal = nodeHolderList.get(i).getMeasureColumnMaxData()[j];
+ if (compareMeasureData(measureMaxValue[j], maxVal, carbonMeasureList.get(j).getDataType())
+ < 0) {
+ measureMaxValue[j] = maxVal.clone();
+ }
+ if (compareMeasureData(measureMinValue[j], minVal, carbonMeasureList.get(j).getDataType())
+ > 0) {
+ measureMinValue[j] = minVal.clone();
+ }
+ }
+ }
+
+ for (byte[] max : measureMaxValue) {
+ blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(max));
+ }
+ for (byte[] min : measureMinValue) {
+ blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
+ }
+ BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
+ blockletBTreeIndex.setStart_key(nodeHolderList.get(0).getStartKey());
+ blockletBTreeIndex.setEnd_key(nodeHolderList.get(nodeHolderList.size() - 1).getEndKey());
+ BlockletIndex blockletIndex = new BlockletIndex();
+ blockletIndex.setMin_max_index(blockletMinMaxIndex);
+ blockletIndex.setB_tree_index(blockletBTreeIndex);
+ return blockletIndex;
+ }
+
/**
- * Below method will be used to get the blocklet info object for
- * data version 2 file
+ * Below method will be used to get the blocklet info object for data version
+ * 2 file
*
* @param blockletInfoColumnar blocklet info
* @param dataChunkOffsets data chunks offsets
@@ -222,7 +328,8 @@ public class CarbonMetadataUtil {
encodings.add(Encoding.DIRECT_DICTIONARY);
}
dataChunk.setRowMajor(colGrpblock[i]);
- //TODO : Once schema PR is merged and information needs to be passed here.
+ // TODO : Once schema PR is merged and information needs to be passed
+ // here.
dataChunk.setColumn_ids(new ArrayList<Integer>());
dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]);
dataChunk.setData_page_offset(blockletInfoColumnar.getKeyOffSets()[i]);
@@ -242,7 +349,8 @@ public class CarbonMetadataUtil {
j++;
}
- //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+ // TODO : Right now the encodings are happening at runtime. change as per
+ // this encoders.
dataChunk.setEncoders(encodings);
colDataChunks.add(dataChunk);
@@ -252,24 +360,26 @@ public class CarbonMetadataUtil {
DataChunk dataChunk = new DataChunk();
dataChunk.setChunk_meta(getChunkCompressionMeta());
dataChunk.setRowMajor(false);
- //TODO : Once schema PR is merged and information needs to be passed here.
+ // TODO : Once schema PR is merged and information needs to be passed
+ // here.
dataChunk.setColumn_ids(new ArrayList<Integer>());
dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]);
dataChunk.setData_page_offset(blockletInfoColumnar.getMeasureOffset()[i]);
- //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+ // TODO : Right now the encodings are happening at runtime. change as per
+ // this encoders.
List<Encoding> encodings = new ArrayList<Encoding>();
encodings.add(Encoding.DELTA);
dataChunk.setEncoders(encodings);
- //TODO writing dummy presence meta need to set actual presence
- //meta
+ // TODO writing dummy presence meta need to set actual presence
+ // meta
PresenceMeta presenceMeta = new PresenceMeta();
presenceMeta.setPresent_bit_streamIsSet(true);
presenceMeta
.setPresent_bit_stream(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray());
dataChunk.setPresence(presenceMeta);
- //TODO : PresenceMeta needs to be implemented and set here
+ // TODO : PresenceMeta needs to be implemented and set here
// dataChunk.setPresence(new PresenceMeta());
- //TODO : Need to write ValueCompression meta here.
+ // TODO : Need to write ValueCompression meta here.
List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i))));
@@ -291,7 +401,7 @@ public class CarbonMetadataUtil {
private static boolean containsEncoding(int blockIndex, Encoding encoding,
List<ColumnSchema> columnSchemas, SegmentProperties segmentProperties) {
Set<Integer> dimOrdinals = segmentProperties.getDimensionOrdinalForBlock(blockIndex);
- //column groups will always have dictionary encoding
+ // column groups will always have dictionary encoding
if (dimOrdinals.size() > 1 && Encoding.DICTIONARY == encoding) {
return true;
}
@@ -336,7 +446,8 @@ public class CarbonMetadataUtil {
}
/**
- * It converts FileFooter thrift object to list of BlockletInfoColumnar objects
+ * It converts FileFooter thrift object to list of BlockletInfoColumnar
+ * objects
*
* @param footer
* @return
@@ -486,8 +597,8 @@ public class CarbonMetadataUtil {
}
/**
- * Below method will be used to get the block index info thrift object for each block
- * present in the segment
+ * Below method will be used to get the block index info thrift object for
+ * each block present in the segment
*
* @param blockIndexInfoList block index info list
* @return list of block index
@@ -508,8 +619,7 @@ public class CarbonMetadataUtil {
}
/**
- * Below method will be used to get the data chunk object for all the
- * columns
+ * Below method will be used to get the data chunk object for all the columns
*
* @param blockletInfoColumnar blocklet info
* @param columnSchenma list of columns
@@ -536,7 +646,8 @@ public class CarbonMetadataUtil {
encodings.add(Encoding.DIRECT_DICTIONARY);
}
dataChunk.setRowMajor(colGrpblock[i]);
- //TODO : Once schema PR is merged and information needs to be passed here.
+ // TODO : Once schema PR is merged and information needs to be passed
+ // here.
dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]);
if (aggKeyBlock[i]) {
dataChunk.setRle_page_length(blockletInfoColumnar.getDataIndexMapLength()[aggregateIndex]);
@@ -552,7 +663,8 @@ public class CarbonMetadataUtil {
rowIdIndex++;
}
- //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+ // TODO : Right now the encodings are happening at runtime. change as per
+ // this encoders.
dataChunk.setEncoders(encodings);
colDataChunks.add(dataChunk);
@@ -562,22 +674,24 @@ public class CarbonMetadataUtil {
DataChunk2 dataChunk = new DataChunk2();
dataChunk.setChunk_meta(getChunkCompressionMeta());
dataChunk.setRowMajor(false);
- //TODO : Once schema PR is merged and information needs to be passed here.
+ // TODO : Once schema PR is merged and information needs to be passed
+ // here.
dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]);
- //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+ // TODO : Right now the encodings are happening at runtime. change as per
+ // this encoders.
List<Encoding> encodings = new ArrayList<Encoding>();
encodings.add(Encoding.DELTA);
dataChunk.setEncoders(encodings);
- //TODO writing dummy presence meta need to set actual presence
- //meta
+ // TODO writing dummy presence meta need to set actual presence
+ // meta
PresenceMeta presenceMeta = new PresenceMeta();
presenceMeta.setPresent_bit_streamIsSet(true);
presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor()
.compressByte(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray()));
dataChunk.setPresence(presenceMeta);
- //TODO : PresenceMeta needs to be implemented and set here
+ // TODO : PresenceMeta needs to be implemented and set here
// dataChunk.setPresence(new PresenceMeta());
- //TODO : Need to write ValueCompression meta here.
+ // TODO : Need to write ValueCompression meta here.
List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i))));
@@ -586,4 +700,189 @@ public class CarbonMetadataUtil {
}
return colDataChunks;
}
+
+ /**
+ * Below method will be used to get the data chunk object for all the columns
+ *
+ * @param blockletInfoColumnar blocklet info
+ * @param columnSchenma list of columns
+ * @param segmentProperties segment properties
+ * @return list of data chunks
+ * @throws IOException
+ */
+ private static List<DataChunk2> getDatachunk2(List<NodeHolder> nodeHolderList,
+ List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties, int index,
+ boolean isDimensionColumn) throws IOException {
+ List<DataChunk2> colDataChunks = new ArrayList<DataChunk2>();
+ DataChunk2 dataChunk = null;
+ NodeHolder nodeHolder = null;
+ for (int i = 0; i < nodeHolderList.size(); i++) {
+ nodeHolder = nodeHolderList.get(i);
+ dataChunk = new DataChunk2();
+ dataChunk.min_max = new BlockletMinMaxIndex();
+ dataChunk.setChunk_meta(getChunkCompressionMeta());
+ dataChunk.setNumberOfRowsInpage(nodeHolder.getEntryCount());
+ List<Encoding> encodings = new ArrayList<Encoding>();
+ if (isDimensionColumn) {
+ dataChunk.setData_page_length(nodeHolder.getKeyLengths()[index]);
+ if (containsEncoding(index, Encoding.DICTIONARY, columnSchenma, segmentProperties)) {
+ encodings.add(Encoding.DICTIONARY);
+ }
+ if (containsEncoding(index, Encoding.DIRECT_DICTIONARY, columnSchenma, segmentProperties)) {
+ encodings.add(Encoding.DIRECT_DICTIONARY);
+ }
+ dataChunk.setRowMajor(nodeHolder.getColGrpBlocks()[index]);
+ // TODO : Once schema PR is merged and information needs to be passed
+ // here.
+ if (nodeHolder.getAggBlocks()[index]) {
+ dataChunk.setRle_page_length(nodeHolder.getDataIndexMapLength()[index]);
+ encodings.add(Encoding.RLE);
+ }
+ dataChunk.setSort_state(nodeHolder.getIsSortedKeyBlock()[index] ?
+ SortState.SORT_EXPLICIT :
+ SortState.SORT_NATIVE);
+
+ if (!nodeHolder.getIsSortedKeyBlock()[index]) {
+ dataChunk.setRowid_page_length(nodeHolder.getKeyBlockIndexLength()[index]);
+ encodings.add(Encoding.INVERTED_INDEX);
+ }
+ dataChunk.min_max.addToMax_values(ByteBuffer.wrap(nodeHolder.getColumnMaxData()[index]));
+ dataChunk.min_max.addToMin_values(ByteBuffer.wrap(nodeHolder.getColumnMinData()[index]));
+ } else {
+ dataChunk.setData_page_length(nodeHolder.getDataArray()[index].length);
+ // TODO : Right now the encodings are happening at runtime. change as
+ // per this encoders.
+ dataChunk.setEncoders(encodings);
+
+ dataChunk.setRowMajor(false);
+ // TODO : Right now the encodings are happening at runtime. change as
+ // per this encoders.
+ encodings.add(Encoding.DELTA);
+ dataChunk.setEncoders(encodings);
+ // TODO writing dummy presence meta need to set actual presence
+ // meta
+ PresenceMeta presenceMeta = new PresenceMeta();
+ presenceMeta.setPresent_bit_streamIsSet(true);
+ presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor()
+ .compressByte(nodeHolder.getMeasureNullValueIndex()[index].toByteArray()));
+ dataChunk.setPresence(presenceMeta);
+ List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
+ encoderMetaList.add(ByteBuffer.wrap(serializeEncodeMetaUsingByteBuffer(
+ createValueEncoderMeta(nodeHolder.getCompressionModel(), index))));
+ dataChunk.setEncoder_meta(encoderMetaList);
+ dataChunk.min_max
+ .addToMax_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMaxData()[index]));
+ dataChunk.min_max
+ .addToMin_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMinData()[index]));
+ }
+ dataChunk.setEncoders(encodings);
+ colDataChunks.add(dataChunk);
+ }
+ return colDataChunks;
+ }
+
+ public static DataChunk3 getDataChunk3(List<NodeHolder> nodeHolderList,
+ List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties, int index,
+ boolean isDimensionColumn) throws IOException {
+ List<DataChunk2> dataChunksList =
+ getDatachunk2(nodeHolderList, columnSchenma, segmentProperties, index, isDimensionColumn);
+ int offset = 0;
+ DataChunk3 dataChunk = new DataChunk3();
+ List<Integer> pageOffsets = new ArrayList<>();
+ List<Integer> pageLengths = new ArrayList<>();
+ int length = 0;
+ for (int i = 0; i < dataChunksList.size(); i++) {
+ pageOffsets.add(offset);
+ length =
+ dataChunksList.get(i).getData_page_length() + dataChunksList.get(i).getRle_page_length()
+ + dataChunksList.get(i).getRowid_page_length();
+ pageLengths.add(length);
+ offset += length;
+ }
+ dataChunk.setData_chunk_list(dataChunksList);
+ dataChunk.setPage_length(pageLengths);
+ dataChunk.setPage_offset(pageOffsets);
+ return dataChunk;
+ }
+
+ public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta) {
+ ByteBuffer buffer = null;
+ if (valueEncoderMeta.getType() == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ buffer = ByteBuffer.allocate(
+ (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+ + 3);
+ buffer.putChar(valueEncoderMeta.getType());
+ buffer.putDouble((Double) valueEncoderMeta.getMaxValue());
+ buffer.putDouble((Double) valueEncoderMeta.getMinValue());
+ buffer.putDouble((Double) valueEncoderMeta.getUniqueValue());
+ } else if (valueEncoderMeta.getType() == CarbonCommonConstants.BIG_INT_MEASURE) {
+ buffer = ByteBuffer.allocate(
+ (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+ + 3);
+ buffer.putChar(valueEncoderMeta.getType());
+ buffer.putLong((Long) valueEncoderMeta.getMaxValue());
+ buffer.putLong((Long) valueEncoderMeta.getMinValue());
+ buffer.putLong((Long) valueEncoderMeta.getUniqueValue());
+ } else {
+ buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + 3);
+ buffer.putChar(valueEncoderMeta.getType());
+ }
+ buffer.putInt(valueEncoderMeta.getDecimal());
+ buffer.put(valueEncoderMeta.getDataTypeSelected());
+ buffer.flip();
+ return buffer.array();
+ }
+
+ public static byte[] getByteValueForMeasure(Object data, DataType dataType) {
+ ByteBuffer b = null;
+ switch (dataType) {
+ case DOUBLE:
+ b = ByteBuffer.allocate(8);
+ b.putDouble((Double) data);
+ b.flip();
+ return b.array();
+ case LONG:
+ case INT:
+ case SHORT:
+ b = ByteBuffer.allocate(8);
+ b.putLong((Long) data);
+ b.flip();
+ return b.array();
+ case DECIMAL:
+ return DataTypeUtil.bigDecimalToByte((BigDecimal)data);
+ default:
+ throw new IllegalArgumentException("Invalid data type");
+ }
+ }
+
+ public static int compareMeasureData(byte[] first, byte[] second, DataType dataType) {
+ ByteBuffer firstBuffer = null;
+ ByteBuffer secondBuffer = null;
+ switch (dataType) {
+ case DOUBLE:
+ firstBuffer = ByteBuffer.allocate(8);
+ firstBuffer.put(first);
+ secondBuffer = ByteBuffer.allocate(8);
+ secondBuffer.put(first);
+ firstBuffer.flip();
+ secondBuffer.flip();
+ return (int) (firstBuffer.getDouble() - secondBuffer.getDouble());
+ case LONG:
+ case INT:
+ case SHORT:
+ firstBuffer = ByteBuffer.allocate(8);
+ firstBuffer.put(first);
+ secondBuffer = ByteBuffer.allocate(8);
+ secondBuffer.put(first);
+ firstBuffer.flip();
+ secondBuffer.flip();
+ return (int) (firstBuffer.getLong() - secondBuffer.getLong());
+ case DECIMAL:
+ return DataTypeUtil.byteToBigDecimal(first)
+ .compareTo(DataTypeUtil.byteToBigDecimal(second));
+ default:
+ throw new IllegalArgumentException("Invalid data type");
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 962d352..39c36ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -26,6 +26,7 @@ import java.util.Properties;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
public final class CarbonProperties {
@@ -85,6 +86,9 @@ public final class CarbonProperties {
validateCarbonDataFileVersion();
validateExecutorStartUpTime();
validatePrefetchBufferSize();
+ validateNumberOfPagesPerBlocklet();
+ validateNumberOfColumnPerIORead();
+ validateNumberOfRowsPerBlockletColumnPage();
}
private void validatePrefetchBufferSize() {
@@ -107,6 +111,93 @@ public final class CarbonProperties {
}
}
+ /**
+ * This method validates the number of pages per blocklet column
+ */
+ private void validateNumberOfPagesPerBlocklet() {
+ String numberOfPagePerBlockletColumnString = carbonProperties
+ .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+ CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+ try {
+ short numberOfPagePerBlockletColumn = Short.parseShort(numberOfPagePerBlockletColumnString);
+ if (numberOfPagePerBlockletColumn
+ < CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MIN
+ || numberOfPagePerBlockletColumn
+ > CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MAX) {
+ LOGGER.info(
+ "The Number Of pages per blocklet column value \"" + numberOfPagePerBlockletColumnString
+ + "\" is invalid. Using the default value \""
+ + CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+ carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+ CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.info(
+ "The Number Of pages per blocklet column value \"" + numberOfPagePerBlockletColumnString
+ + "\" is invalid. Using the default value \""
+ + CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+ carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+ CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+ }
+ }
+
+ /**
+ * This method validates the number of column read in one IO
+ */
+ private void validateNumberOfColumnPerIORead() {
+ String numberofColumnPerIOString = carbonProperties
+ .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+ CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+ try {
+ short numberofColumnPerIO = Short.parseShort(numberofColumnPerIOString);
+ if (numberofColumnPerIO < CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_MIN
+ || numberofColumnPerIO > CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_MAX) {
+ LOGGER.info("The Number Of pages per blocklet column value \"" + numberofColumnPerIOString
+ + "\" is invalid. Using the default value \""
+ + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+ carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+ CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.info("The Number Of pages per blocklet column value \"" + numberofColumnPerIOString
+ + "\" is invalid. Using the default value \""
+ + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+ carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+ CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+ }
+ }
+
+ /**
+ * This method validates the number of column read in one IO
+ */
+ private void validateNumberOfRowsPerBlockletColumnPage() {
+ String numberOfRowsPerBlockletColumnPageString = carbonProperties
+ .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+ CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+ try {
+ short numberOfRowsPerBlockletColumnPage =
+ Short.parseShort(numberOfRowsPerBlockletColumnPageString);
+ if (numberOfRowsPerBlockletColumnPage
+ < CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MIN
+ || numberOfRowsPerBlockletColumnPage
+ > CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MAX) {
+ LOGGER.info("The Number Of rows per blocklet column pages value \""
+ + numberOfRowsPerBlockletColumnPageString + "\" is invalid. Using the default value \""
+ + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+ carbonProperties
+ .setProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+ CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.info("The Number Of rows per blocklet column pages value \""
+ + numberOfRowsPerBlockletColumnPageString + "\" is invalid. Using the default value \""
+ + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+ carbonProperties
+ .setProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+ CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+ }
+ }
+
private void validateBadRecordsLocation() {
String badRecordsLocation =
carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
@@ -288,17 +379,16 @@ public final class CarbonProperties {
carbonProperties.getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
if (carbondataFileVersionString == null) {
// use default property if user does not specify version property
- carbonProperties
- .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
- CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+ carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+ CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
} else {
try {
ColumnarFormatVersion.valueOf(carbondataFileVersionString);
} catch (IllegalArgumentException e) {
// use default property if user specifies an invalid version property
- LOGGER.warn("Specified file version property is invalid: " +
- carbondataFileVersionString + ". Using " +
- CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION + " as default file version");
+ LOGGER.warn("Specified file version property is invalid: " + carbondataFileVersionString
+ + ". Using " + CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION
+ + " as default file version");
carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
}
@@ -569,6 +659,7 @@ public final class CarbonProperties {
/**
* Returns configured update deleta files value for IUD compaction
+ *
* @return numberOfDeltaFilesThreshold
*/
public int getNoUpdateDeltaFilesThresholdForIUDCompaction() {
@@ -588,8 +679,7 @@ public final class CarbonProperties {
}
} catch (NumberFormatException e) {
LOGGER.error("The specified value for property "
- + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
- + "is incorrect."
+ + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
+ " Correct value should be in range of 0 -10000. Taking the default value.");
numberOfDeltaFilesThreshold = Integer
.parseInt(CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
@@ -599,6 +689,7 @@ public final class CarbonProperties {
/**
* Returns configured delete deleta files value for IUD compaction
+ *
* @return numberOfDeltaFilesThreshold
*/
public int getNoDeleteDeltaFilesThresholdForIUDCompaction() {
@@ -618,8 +709,7 @@ public final class CarbonProperties {
}
} catch (NumberFormatException e) {
LOGGER.error("The specified value for property "
- + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
- + "is incorrect."
+ + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
+ " Correct value should be in range of 0 -10000. Taking the default value.");
numberOfDeltaFilesThreshold = Integer
.parseInt(CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b9a96d2..5a656e0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -73,6 +73,7 @@ import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
@@ -471,6 +472,28 @@ public final class CarbonUtil {
numberCompressor.unCompress(indexMap, 0, indexMap.length));
}
+ public static int[] getUnCompressColumnIndex(int totalLength, ByteBuffer buffer, int offset) {
+ buffer.position(offset);
+ int indexDataLength = buffer.getInt();
+ int indexMapLength = totalLength - indexDataLength - CarbonCommonConstants.INT_SIZE_IN_BYTE;
+ int[] indexData = getIntArray(buffer, buffer.position(), indexDataLength);
+ int[] indexMap = getIntArray(buffer, buffer.position(), indexMapLength);
+ return UnBlockIndexer.uncompressIndex(indexData, indexMap);
+ }
+
+ public static int[] getIntArray(ByteBuffer data, int offset, int length) {
+ if (length == 0) {
+ return new int[0];
+ }
+ data.position(offset);
+ int[] intArray = new int[length / 2];
+ int index = 0;
+ while (index < intArray.length) {
+ intArray[index++] = data.getShort();
+ }
+ return intArray;
+ }
+
/**
* Convert int array to Integer list
*
@@ -1233,6 +1256,18 @@ public final class CarbonUtil {
}, offset, length);
}
+ public static DataChunk3 readDataChunk3(ByteBuffer dataChunkBuffer, int offset, int length)
+ throws IOException {
+ byte[] data = new byte[length];
+ dataChunkBuffer.position(offset);
+ dataChunkBuffer.get(data);
+ return (DataChunk3) read(data, new ThriftReader.TBaseCreator() {
+ @Override public TBase create() {
+ return new DataChunk3();
+ }
+ }, 0, length);
+ }
+
public static DataChunk2 readDataChunk(ByteBuffer dataChunkBuffer, int offset, int length)
throws IOException {
byte[] data = new byte[length];
@@ -1293,6 +1328,35 @@ public final class CarbonUtil {
return meta;
}
+ public static ValueEncoderMeta deserializeEncoderMetaNew(byte[] encodeMeta) {
+ ByteBuffer buffer = ByteBuffer.wrap(encodeMeta);
+ char measureType = buffer.getChar();
+ ValueEncoderMeta valueEncoderMeta = new ValueEncoderMeta();
+ valueEncoderMeta.setType(measureType);
+ switch (measureType) {
+ case CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE:
+ valueEncoderMeta.setMaxValue(buffer.getDouble());
+ valueEncoderMeta.setMinValue(buffer.getDouble());
+ valueEncoderMeta.setUniqueValue(buffer.getDouble());
+ break;
+ case CarbonCommonConstants.BIG_DECIMAL_MEASURE:
+ valueEncoderMeta.setMaxValue(0.0);
+ valueEncoderMeta.setMinValue(0.0);
+ valueEncoderMeta.setUniqueValue(0.0);
+ break;
+ case CarbonCommonConstants.BIG_INT_MEASURE:
+ valueEncoderMeta.setMaxValue(buffer.getLong());
+ valueEncoderMeta.setMinValue(buffer.getLong());
+ valueEncoderMeta.setUniqueValue(buffer.getLong());
+ break;
+ default:
+ throw new IllegalArgumentException("invalid measure type");
+ }
+ valueEncoderMeta.setDecimal(buffer.getInt());
+ valueEncoderMeta.setDataTypeSelected(buffer.get());
+ return valueEncoderMeta;
+ }
+
/**
* Below method will be used to convert indexes in range
* Indexes=[0,1,2,3,4,5,6,7,8,9]
@@ -1454,5 +1518,51 @@ public final class CarbonUtil {
return null;
}
}
+
+ /**
+ * Below method will be used to convert byte data to surrogate key based
+ * column value size
+ *
+ * @param data data
+ * @param startOffsetOfData start offset of data
+ * @param eachColumnValueSize size of each column value
+ * @return surrogate key
+ */
+ public static int getSurrogateInternal(byte[] data, int startOffsetOfData,
+ int eachColumnValueSize) {
+ int surrogate = 0;
+ switch (eachColumnValueSize) {
+ case 1:
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData] & 0xFF;
+ return surrogate;
+ case 2:
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData] & 0xFF;
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData + 1] & 0xFF;
+ return surrogate;
+ case 3:
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData] & 0xFF;
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData + 1] & 0xFF;
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData + 2] & 0xFF;
+ return surrogate;
+ case 4:
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData] & 0xFF;
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData + 1] & 0xFF;
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData + 2] & 0xFF;
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData + 3] & 0xFF;
+ return surrogate;
+ default:
+ throw new IllegalArgumentException("Int cannot me more than 4 bytes");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
index 08bfd6d..153fcb9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
@@ -56,8 +56,11 @@ public class DataFileFooterConverterFactory {
switch (version) {
case V1:
return new DataFileFooterConverter();
- default:
+ case V2:
return new DataFileFooterConverter2();
+ case V3:
+ default:
+ return new DataFileFooterConverterV3();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
new file mode 100644
index 0000000..1ab3133
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonFooterReader;
+import org.apache.carbondata.format.FileFooter;
+
+public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
+
+ /**
+ * Below method will be used to convert thrift file meta to wrapper file meta
+ * This method will read the footer from footer offset present in the data file
+ * 1. It will set the stream offset
+ * 2. It will read the footer data from file
+ * 3. parse the footer to thrift object
+ * 4. convert to wrapper object
+ *
+ * @param tableBlockInfo
+ * table block info
+ * @return data file footer
+ */
+ @Override public DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo)
+ throws IOException {
+ DataFileFooter dataFileFooter = new DataFileFooter();
+ CarbonFooterReader reader =
+ new CarbonFooterReader(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset());
+ FileFooter footer = reader.readFooter();
+ dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) footer.getVersion()));
+ dataFileFooter.setNumberOfRows(footer.getNum_rows());
+ dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ List<org.apache.carbondata.format.ColumnSchema> table_columns = footer.getTable_columns();
+ for (int i = 0; i < table_columns.size(); i++) {
+ columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+ }
+ dataFileFooter.setColumnInTable(columnSchemaList);
+
+ List<org.apache.carbondata.format.BlockletIndex> leaf_node_indices_Thrift =
+ footer.getBlocklet_index_list();
+ List<BlockletIndex> blockletIndexList = new ArrayList<BlockletIndex>();
+ for (int i = 0; i < leaf_node_indices_Thrift.size(); i++) {
+ BlockletIndex blockletIndex = getBlockletIndex(leaf_node_indices_Thrift.get(i));
+ blockletIndexList.add(blockletIndex);
+ }
+ List<org.apache.carbondata.format.BlockletInfo3> leaf_node_infos_Thrift =
+ footer.getBlocklet_info_list3();
+ List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
+ for (int i = 0; i < leaf_node_infos_Thrift.size(); i++) {
+ BlockletInfo blockletInfo = getBlockletInfo(leaf_node_infos_Thrift.get(i),
+ getNumberOfDimensionColumns(columnSchemaList));
+ blockletInfo.setBlockletIndex(blockletIndexList.get(i));
+ blockletInfoList.add(blockletInfo);
+ }
+ dataFileFooter.setBlockletList(blockletInfoList);
+ dataFileFooter.setBlockletIndex(getBlockletIndexForDataFileFooter(blockletIndexList));
+ return dataFileFooter;
+ }
+
+ /**
+ * Below method is to convert the blocklet info of the thrift to wrapper
+ * blocklet info
+ *
+ * @param blockletInfoThrift blocklet info of the thrift
+ * @return blocklet info wrapper
+ */
+ private BlockletInfo getBlockletInfo(
+ org.apache.carbondata.format.BlockletInfo3 blockletInfoThrift, int numberOfDimensionColumns) {
+ BlockletInfo blockletInfo = new BlockletInfo();
+ List<Long> dimensionColumnChunkOffsets =
+ blockletInfoThrift.getColumn_data_chunks_offsets().subList(0, numberOfDimensionColumns);
+ List<Long> measureColumnChunksOffsets = blockletInfoThrift.getColumn_data_chunks_offsets()
+ .subList(numberOfDimensionColumns,
+ blockletInfoThrift.getColumn_data_chunks_offsets().size());
+ List<Integer> dimensionColumnChunkLength =
+ blockletInfoThrift.getColumn_data_chunks_length().subList(0, numberOfDimensionColumns);
+ List<Integer> measureColumnChunksLength = blockletInfoThrift.getColumn_data_chunks_length()
+ .subList(numberOfDimensionColumns,
+ blockletInfoThrift.getColumn_data_chunks_offsets().size());
+ blockletInfo.setDimensionChunkOffsets(dimensionColumnChunkOffsets);
+ blockletInfo.setMeasureChunkOffsets(measureColumnChunksOffsets);
+ blockletInfo.setDimensionChunksLength(dimensionColumnChunkLength);
+ blockletInfo.setMeasureChunksLength(measureColumnChunksLength);
+ blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
+ blockletInfo.setDimensionOffset(blockletInfoThrift.getDimension_offsets());
+ blockletInfo.setMeasureOffsets(blockletInfoThrift.getMeasure_offsets());
+ return blockletInfo;
+ }
+
+ /**
+ * Below method will be used to get the number of dimension column
+ * in carbon column schema
+ *
+ * @param columnSchemaList column schema list
+ * @return number of dimension column
+ */
+ private int getNumberOfDimensionColumns(List<ColumnSchema> columnSchemaList) {
+ int numberOfDimensionColumns = 0;
+ int previousColumnGroupId = -1;
+ ColumnSchema columnSchema = null;
+ for (int i = 0; i < columnSchemaList.size(); i++) {
+ columnSchema = columnSchemaList.get(i);
+ if (columnSchema.isDimensionColumn() && columnSchema.isColumnar()) {
+ numberOfDimensionColumns++;
+ } else if (columnSchema.isDimensionColumn()) {
+ if (previousColumnGroupId != columnSchema.getColumnGroupId()) {
+ previousColumnGroupId = columnSchema.getColumnGroupId();
+ numberOfDimensionColumns++;
+ }
+ } else {
+ break;
+ }
+ }
+ return numberOfDimensionColumns;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
new file mode 100644
index 0000000..d46b806
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+
+public class NodeHolder {
+ /**
+ * keyArray
+ */
+ private byte[][] keyArray;
+
+ /**
+ * dataArray
+ */
+ private byte[][] dataArray;
+
+ /**
+ * measureLenght
+ */
+ private int[] measureLenght;
+
+ /**
+ * startKey
+ */
+ private byte[] startKey;
+
+ /**
+ * endKey
+ */
+ private byte[] endKey;
+
+ /**
+ * entryCount
+ */
+ private int entryCount;
+ /**
+ * keyLenghts
+ */
+ private int[] keyLengths;
+
+ /**
+ * dataAfterCompression
+ */
+ private short[][] dataAfterCompression;
+
+ /**
+ * indexMap
+ */
+ private short[][] indexMap;
+
+ /**
+ * keyIndexBlockLenght
+ */
+ private int[] keyBlockIndexLength;
+
+ /**
+ * isSortedKeyBlock
+ */
+ private boolean[] isSortedKeyBlock;
+
+ private byte[][] compressedIndex;
+
+ private byte[][] compressedIndexMap;
+
+ /**
+ * dataIndexMap
+ */
+ private int[] dataIndexMapLength;
+
+ /**
+ * dataIndexMap
+ */
+ private int[] dataIndexMapOffsets;
+
+ /**
+ * compressedDataIndex
+ */
+ private byte[][] compressedDataIndex;
+
+ /**
+ * column max data
+ */
+ private byte[][] columnMaxData;
+
+ /**
+ * column min data
+ */
+ private byte[][] columnMinData;
+
+ private byte[][] measureColumnMaxData;
+
+ private byte[][] measureColumnMinData;
+
+ /**
+ * compression model for numbers data block.
+ */
+ private WriterCompressModel compressionModel;
+
+ /**
+ * array of aggBlocks flag to identify the aggBlocks
+ */
+ private boolean[] aggBlocks;
+
+ /**
+ * all columns max value
+ */
+ private byte[][] allMaxValue;
+
+ /**
+ * all column max value
+ */
+ private byte[][] allMinValue;
+
+ /**
+ * true if given index is colgroup block
+ */
+ private boolean[] colGrpBlock;
+
+ /**
+ * bit set which will holds the measure
+ * indexes which are null
+ */
+ private BitSet[] measureNullValueIndex;
+
+ /**
+ * total length of dimension values
+ */
+ private int totalDimensionArrayLength;
+
+ /**
+ * total length of all measure values
+ */
+ private int totalMeasureArrayLength;
+
+ /**
+ * @return the keyArray
+ */
+ public byte[][] getKeyArray() {
+ return keyArray;
+ }
+
+ /**
+ * @param keyArray the keyArray to set
+ */
+ public void setKeyArray(byte[][] keyArray) {
+ this.keyArray = keyArray;
+ }
+
+ /**
+ * @return the dataArray
+ */
+ public byte[][] getDataArray() {
+ return dataArray;
+ }
+
+ /**
+ * @param dataArray the dataArray to set
+ */
+ public void setDataArray(byte[][] dataArray) {
+ this.dataArray = dataArray;
+ }
+
+ /**
+ * @return the measureLenght
+ */
+ public int[] getMeasureLenght() {
+ return measureLenght;
+ }
+
+ /**
+ * @param measureLenght the measureLenght to set
+ */
+ public void setMeasureLenght(int[] measureLenght) {
+ this.measureLenght = measureLenght;
+ }
+
+ /**
+ * @return the startKey
+ */
+ public byte[] getStartKey() {
+ return startKey;
+ }
+
+ /**
+ * @param startKey the startKey to set
+ */
+ public void setStartKey(byte[] startKey) {
+ this.startKey = startKey;
+ }
+
+ /**
+ * @return the endKey
+ */
+ public byte[] getEndKey() {
+ return endKey;
+ }
+
+ /**
+ * @param endKey the endKey to set
+ */
+ public void setEndKey(byte[] endKey) {
+ this.endKey = endKey;
+ }
+
+ /**
+ * @return the entryCount
+ */
+ public int getEntryCount() {
+ return entryCount;
+ }
+
+ /**
+ * @param entryCount the entryCount to set
+ */
+ public void setEntryCount(int entryCount) {
+ this.entryCount = entryCount;
+ }
+
+ /**
+ * @return the keyLenghts
+ */
+ public int[] getKeyLengths() {
+ return keyLengths;
+ }
+
+ public void setKeyLengths(int[] keyLengths) {
+ this.keyLengths = keyLengths;
+ }
+
+ /**
+ * @return the keyBlockIndexLength
+ */
+ public int[] getKeyBlockIndexLength() {
+ return keyBlockIndexLength;
+ }
+
+ /**
+ * @param keyBlockIndexLength the keyBlockIndexLength to set
+ */
+ public void setKeyBlockIndexLength(int[] keyBlockIndexLength) {
+ this.keyBlockIndexLength = keyBlockIndexLength;
+ }
+
+ /**
+ * @return the isSortedKeyBlock
+ */
+ public boolean[] getIsSortedKeyBlock() {
+ return isSortedKeyBlock;
+ }
+
+ /**
+ * @param isSortedKeyBlock the isSortedKeyBlock to set
+ */
+ public void setIsSortedKeyBlock(boolean[] isSortedKeyBlock) {
+ this.isSortedKeyBlock = isSortedKeyBlock;
+ }
+
+ /**
+ * @return the compressedIndexex
+ */
+ public byte[][] getCompressedIndex() {
+ return compressedIndex;
+ }
+
+ public void setCompressedIndex(byte[][] compressedIndex) {
+ this.compressedIndex = compressedIndex;
+ }
+
+ /**
+ * @return the compressedIndexMap
+ */
+ public byte[][] getCompressedIndexMap() {
+ return compressedIndexMap;
+ }
+
+ /**
+ * @param compressedIndexMap the compressedIndexMap to set
+ */
+ public void setCompressedIndexMap(byte[][] compressedIndexMap) {
+ this.compressedIndexMap = compressedIndexMap;
+ }
+
+ /**
+ * @return the compressedDataIndex
+ */
+ public byte[][] getCompressedDataIndex() {
+ return compressedDataIndex;
+ }
+
+ /**
+ * @param compressedDataIndex the compressedDataIndex to set
+ */
+ public void setCompressedDataIndex(byte[][] compressedDataIndex) {
+ this.compressedDataIndex = compressedDataIndex;
+ }
+
+ /**
+ * @return the dataIndexMapLength
+ */
+ public int[] getDataIndexMapLength() {
+ return dataIndexMapLength;
+ }
+
+ /**
+ * @param dataIndexMapLength the dataIndexMapLength to set
+ */
+ public void setDataIndexMapLength(int[] dataIndexMapLength) {
+ this.dataIndexMapLength = dataIndexMapLength;
+ }
+
+ public byte[][] getColumnMaxData() {
+ return this.columnMaxData;
+ }
+
+ public void setColumnMaxData(byte[][] columnMaxData) {
+ this.columnMaxData = columnMaxData;
+ }
+
+ public byte[][] getColumnMinData() {
+ return this.columnMinData;
+ }
+
+ public void setColumnMinData(byte[][] columnMinData) {
+ this.columnMinData = columnMinData;
+ }
+
+ public WriterCompressModel getCompressionModel() {
+ return compressionModel;
+ }
+
+ public void setCompressionModel(WriterCompressModel compressionModel) {
+ this.compressionModel = compressionModel;
+ }
+
+ /**
+ * returns array of aggBlocks flag to identify the aag blocks
+ *
+ * @return
+ */
+ public boolean[] getAggBlocks() {
+ return aggBlocks;
+ }
+
+ /**
+ * set array of aggBlocks flag to identify the aggBlocks
+ *
+ * @param aggBlocks
+ */
+ public void setAggBlocks(boolean[] aggBlocks) {
+ this.aggBlocks = aggBlocks;
+ }
+
+ /**
+ * @return
+ */
+ public boolean[] getColGrpBlocks() {
+ return this.colGrpBlock;
+ }
+
+ /**
+ * @param colGrpBlock true if block is column group
+ */
+ public void setColGrpBlocks(boolean[] colGrpBlock) {
+ this.colGrpBlock = colGrpBlock;
+ }
+
+ /**
+ * @return the measureNullValueIndex
+ */
+ public BitSet[] getMeasureNullValueIndex() {
+ return measureNullValueIndex;
+ }
+
+ /**
+ * @param measureNullValueIndex the measureNullValueIndex to set
+ */
+ public void setMeasureNullValueIndex(BitSet[] measureNullValueIndex) {
+ this.measureNullValueIndex = measureNullValueIndex;
+ }
+
+ public int getTotalDimensionArrayLength() {
+ return totalDimensionArrayLength;
+ }
+
+ public void setTotalDimensionArrayLength(int totalDimensionArrayLength) {
+ this.totalDimensionArrayLength = totalDimensionArrayLength;
+ }
+
+ public int getTotalMeasureArrayLength() {
+ return totalMeasureArrayLength;
+ }
+
+ public void setTotalMeasureArrayLength(int totalMeasureArrayLength) {
+ this.totalMeasureArrayLength = totalMeasureArrayLength;
+ }
+
+ public byte[][] getMeasureColumnMaxData() {
+ return measureColumnMaxData;
+ }
+
+ public void setMeasureColumnMaxData(byte[][] measureColumnMaxData) {
+ this.measureColumnMaxData = measureColumnMaxData;
+ }
+
+ public byte[][] getMeasureColumnMinData() {
+ return measureColumnMinData;
+ }
+
+ public void setMeasureColumnMinData(byte[][] measureColumnMinData) {
+ this.measureColumnMinData = measureColumnMinData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index 3935bdc..2c6c890 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -155,7 +155,7 @@ public class CarbonMetadataUtilTest {
segmentInfo.setNum_cols(0);
segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
IndexHeader indexHeader = new IndexHeader();
- indexHeader.setVersion(2);
+ indexHeader.setVersion(3);
indexHeader.setSegment_info(segmentInfo);
indexHeader.setTable_columns(columnSchemaList);
indexHeader.setBucket_id(0);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index 759fbf7..3114ee1 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -127,10 +127,21 @@ struct DataChunk2{
7: optional SortState sort_state;
8: optional list<schema.Encoding> encoders; // The List of encoders overriden at node level
9: optional list<binary> encoder_meta; // extra information required by encoders
-}
+ 10: optional BlockletMinMaxIndex min_max;
+ 11: optional i32 numberOfRowsInpage;
+ }
/**
+* Represents a chunk of data. The chunk can be a single column stored in Column Major format or a group of columns stored in Row Major Format.
+**/
+struct DataChunk3{
+ 1: required list<DataChunk2> data_chunk_list; // list of data chunk
+ 2: optional list<i32> page_offset; // offset of each chunk
+ 3: optional list<i32> page_length; // length of each chunk
+
+ }
+/**
* Information about a blocklet
*/
struct BlockletInfo{
@@ -146,7 +157,16 @@ struct BlockletInfo2{
2: required list<i64> column_data_chunks_offsets; // Information about offsets all column chunks in this blocklet
3: required list<i16> column_data_chunks_length; // Information about length all column chunks in this blocklet
}
-
+/**
+* Information about a blocklet
+*/
+struct BlockletInfo3{
+ 1: required i32 num_rows; // Number of rows in this blocklet
+ 2: required list<i64> column_data_chunks_offsets; // Information about offsets all column chunks in this blocklet
+ 3: required list<i32> column_data_chunks_length; // Information about length all column chunks in this blocklet
+ 4: required i64 dimension_offsets;
+ 5: required i64 measure_offsets;
+ }
/**
* Footer for indexed carbon file
*/
@@ -158,7 +178,8 @@ struct FileFooter{
5: required list<BlockletIndex> blocklet_index_list; // blocklet index of all blocklets in this file
6: optional list<BlockletInfo> blocklet_info_list; // Information about blocklets of all columns in this file
7: optional list<BlockletInfo2> blocklet_info_list2; // Information about blocklets of all columns in this file
- 8: optional dictionary.ColumnDictionaryChunk dictionary; // blocklet local dictionary
+ 8: optional list<BlockletInfo3> blocklet_info_list3; // Information about blocklets of all columns in this file
+ 9: optional dictionary.ColumnDictionaryChunk dictionary; // blocklet local dictionary
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
index 828ece8..3f75cd1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
import org.apache.carbondata.processing.store.writer.v2.CarbonFactDataWriterImplV2;
+import org.apache.carbondata.processing.store.writer.v3.CarbonFactDataWriterImplV3;
/**
* Factory class to get the writer instance
@@ -62,8 +63,12 @@ public class CarbonDataWriterFactory {
switch (version) {
case V1:
return new CarbonFactDataWriterImplV1(carbonDataWriterVo);
- default:
+ case V2:
return new CarbonFactDataWriterImplV2(carbonDataWriterVo);
+ case V3:
+ return new CarbonFactDataWriterImplV3(carbonDataWriterVo);
+ default:
+ return new CarbonFactDataWriterImplV3(carbonDataWriterVo);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index bf66700..0699167 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -40,9 +40,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndex;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
import org.apache.carbondata.core.datastore.columnar.IndexStorage;
import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
@@ -59,6 +61,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NodeHolder;
import org.apache.carbondata.core.util.ValueCompressionUtil;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.mdkeygen.file.FileManager;
@@ -70,7 +73,6 @@ import org.apache.carbondata.processing.store.colgroup.ColumnDataHolder;
import org.apache.carbondata.processing.store.colgroup.DataHolder;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
-import org.apache.carbondata.processing.store.writer.NodeHolder;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
@@ -257,6 +259,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private int bucketNumber;
/**
+ * current data format version
+ */
+ private ColumnarFormatVersion version;
+
+ /**
* CarbonFactDataHandler constructor
*/
public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
@@ -326,6 +333,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
aggKeyBlock = arrangeUniqueBlockType(aggKeyBlock);
}
+ version = CarbonProperties.getInstance().getFormatVersion();
}
private void initParameters(CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
@@ -476,7 +484,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
} else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
max[i] = -Double.MAX_VALUE;
} else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- max[i] = new BigDecimal(0.0);
+ max[i] = new BigDecimal(-Double.MAX_VALUE);
} else {
max[i] = 0.0;
}
@@ -748,9 +756,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
// TODO remove after kettle flow is removed
private NodeHolder getNodeHolderObject(byte[][] dataHolderLocal, byte[][] byteArrayValues,
int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
- WriterCompressModel compressionModel, byte[][] noDictionaryData,
- byte[] noDictionaryStartKey, byte[] noDictionaryEndKey)
- throws CarbonDataWriterException {
+ WriterCompressModel compressionModel, byte[][] noDictionaryData, byte[] noDictionaryStartKey,
+ byte[] noDictionaryEndKey) throws CarbonDataWriterException {
byte[][][] noDictionaryColumnsData = null;
List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
int complexColCount = getComplexColsCount();
@@ -836,9 +843,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
if (dimensionType[i]) {
dictionaryColumnCount++;
if (colGrpModel.isColumnar(dictionaryColumnCount)) {
- submit.add(executorService
- .submit(new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(),
- true, isUseInvertedIndex[i])));
+ submit.add(executorService.submit(
+ new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true,
+ isUseInvertedIndex[i])));
} else {
submit.add(
executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
@@ -876,8 +883,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private NodeHolder getNodeHolderObjectWithOutKettle(byte[][] dataHolderLocal,
byte[][] byteArrayValues, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
WriterCompressModel compressionModel, byte[][][] noDictionaryData,
- byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey)
- throws CarbonDataWriterException {
+ byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey) throws CarbonDataWriterException {
byte[][][] noDictionaryColumnsData = null;
List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
int complexColCount = getComplexColsCount();
@@ -907,7 +913,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
int keyLength = splitKey[j].length;
byte[] newKey = new byte[keyLength + 2];
ByteBuffer buffer = ByteBuffer.wrap(newKey);
- buffer.putShort((short)keyLength);
+ buffer.putShort((short) keyLength);
System.arraycopy(splitKey[j], 0, newKey, 2, keyLength);
noDictionaryColumnsData[j][i] = newKey;
}
@@ -963,9 +969,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
if (dimensionType[i]) {
dictionaryColumnCount++;
if (colGrpModel.isColumnar(dictionaryColumnCount)) {
- submit.add(executorService
- .submit(new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(),
- true, isUseInvertedIndex[i])));
+ submit.add(executorService.submit(
+ new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true,
+ isUseInvertedIndex[i])));
} else {
submit.add(
executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
@@ -1008,7 +1014,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
endKeyLocal, compressionModel, composedNonDictStartKey, composedNonDictEndKey);
}
-
/**
* DataHolder will have all row mdkey data
*
@@ -1150,6 +1155,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
return decimalPlaces;
}
+
/**
* This method will be used to update the max value for each measure
*/
@@ -1220,7 +1226,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
- LOGGER.info("Blocklet Size: " + blockletSize);
+ if (version == ColumnarFormatVersion.V3) {
+ this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+ CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT));
+ }
+ LOGGER.info("Number of rows per column blocklet " + blockletSize);
dataRows = new ArrayList<>(this.blockletSize);
int dimSet =
Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE);
@@ -1280,8 +1291,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
.getBlockKeySize());
System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore,
blockKeySize.length - noOfColStore);
- this.dataWriter =
- getFactDataWriter(keyBlockSize);
+ this.dataWriter = getFactDataWriter(keyBlockSize);
this.dataWriter.setIsNoDictionary(isNoDictionary);
// initialize the channel;
this.dataWriter.initializeWriter();
@@ -1377,7 +1387,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* @return data writer instance
*/
private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) {
- ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
return CarbonDataWriterFactory.getInstance()
.getFactDataWriter(version, getDataWriterVo(keyBlockSize));
}
@@ -1620,8 +1629,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
@Override public IndexStorage call() throws Exception {
if (isUseInvertedIndex) {
- return new BlockIndexerStorageForInt(this.data, isCompressionReq, isNoDictionary,
- isSortRequired);
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(this.data, isCompressionReq, isNoDictionary,
+ isSortRequired);
+ } else {
+ return new BlockIndexerStorageForInt(this.data, isCompressionReq, isNoDictionary,
+ isSortRequired);
+ }
} else {
return new BlockIndexerStorageForNoInvertedIndex(this.data, isCompressionReq,
isNoDictionary);
[3/4] incubator-carbondata git commit: Added V3 Format Writer and
Reader Code
Posted by ja...@apache.org.
Added V3 Format Writer and Reader Code
Added code to support V3 Writer + Reader
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2cf1104d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2cf1104d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2cf1104d
Branch: refs/heads/master
Commit: 2cf1104db43a5591fe2bcabb97ba02202428132a
Parents: 922683e
Author: kumarvishal <ku...@gmail.com>
Authored: Thu Feb 23 16:44:41 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun Feb 26 22:42:32 2017 +0800
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 38 +-
.../constants/CarbonV3DataFormatConstants.java | 84 ++++
.../datastore/chunk/AbstractRawColumnChunk.java | 11 +
.../chunk/reader/CarbonDataReaderFactory.java | 14 +-
.../AbstractChunkReaderV2V3Format.java | 126 +++++
...mpressedDimensionChunkFileBasedReaderV2.java | 127 ++---
...mpressedDimensionChunkFileBasedReaderV3.java | 268 ++++++++++
.../AbstractMeasureChunkReaderV2V3Format.java | 124 +++++
...CompressedMeasureChunkFileBasedReaderV2.java | 106 +---
...CompressedMeasureChunkFileBasedReaderV3.java | 239 +++++++++
.../SafeFixedLengthDimensionDataChunkStore.java | 9 +-
.../columnar/BlockIndexerStorageForShort.java | 228 +++++++++
.../columnar/ColumnWithShortIndex.java | 76 +++
.../ColumnWithShortIndexForNoDictionay.java | 46 ++
.../core/metadata/ColumnarFormatVersion.java | 9 +-
.../executor/impl/AbstractQueryExecutor.java | 10 +-
.../IncludeColGroupFilterExecuterImpl.java | 24 +
...velRangeLessThanEqualFilterExecuterImpl.java | 14 +-
.../RowLevelRangeLessThanFiterExecuterImpl.java | 14 +-
.../carbondata/core/util/BitSetGroup.java | 6 +-
.../core/util/CarbonMetadataUtil.java | 347 ++++++++++++-
.../carbondata/core/util/CarbonProperties.java | 110 +++-
.../apache/carbondata/core/util/CarbonUtil.java | 110 ++++
.../util/DataFileFooterConverterFactory.java | 5 +-
.../core/util/DataFileFooterConverterV3.java | 141 ++++++
.../apache/carbondata/core/util/NodeHolder.java | 430 ++++++++++++++++
.../core/util/CarbonMetadataUtilTest.java | 2 +-
format/src/main/thrift/carbondata.thrift | 27 +-
.../store/CarbonDataWriterFactory.java | 7 +-
.../store/CarbonFactDataHandlerColumnar.java | 56 ++-
.../store/writer/AbstractFactDataWriter.java | 54 +-
.../store/writer/CarbonFactDataWriter.java | 1 +
.../processing/store/writer/NodeHolder.java | 410 ---------------
.../writer/v1/CarbonFactDataWriterImplV1.java | 9 +-
.../writer/v2/CarbonFactDataWriterImplV2.java | 8 +-
.../writer/v3/CarbonFactDataWriterImplV3.java | 499 +++++++++++++++++++
.../store/writer/v3/DataWriterHolder.java | 68 +++
37 files changed, 3114 insertions(+), 743 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1142c4e..146b78e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -83,7 +83,7 @@ public final class CarbonCommonConstants {
/**
* min blocklet size
*/
- public static final int BLOCKLET_SIZE_MIN_VAL = 50;
+ public static final int BLOCKLET_SIZE_MIN_VAL = 2000;
/**
* max blocklet size
*/
@@ -791,7 +791,7 @@ public final class CarbonCommonConstants {
public static final String CARBON_MERGE_SORT_PREFETCH_DEFAULT = "true";
/**
- * default name of data base
+ * default name of data base
*/
public static final String DATABASE_DEFAULT_NAME = "default";
@@ -808,8 +808,7 @@ public final class CarbonCommonConstants {
/**
* this variable is to enable/disable identify high cardinality during first data loading
*/
- public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE =
- "high.cardinality.identify.enable";
+ public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE = "high.cardinality.identify.enable";
public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT = "true";
/**
@@ -843,26 +842,23 @@ public final class CarbonCommonConstants {
/**
* ZOOKEEPERLOCK TYPE
*/
- public static final String CARBON_LOCK_TYPE_ZOOKEEPER =
- "ZOOKEEPERLOCK";
+ public static final String CARBON_LOCK_TYPE_ZOOKEEPER = "ZOOKEEPERLOCK";
/**
* LOCALLOCK TYPE
*/
- public static final String CARBON_LOCK_TYPE_LOCAL =
- "LOCALLOCK";
+ public static final String CARBON_LOCK_TYPE_LOCAL = "LOCALLOCK";
/**
* HDFSLOCK TYPE
*/
- public static final String CARBON_LOCK_TYPE_HDFS =
- "HDFSLOCK";
+ public static final String CARBON_LOCK_TYPE_HDFS = "HDFSLOCK";
/**
* Invalid filter member log string
*/
- public static final String FILTER_INVALID_MEMBER = " Invalid Record(s) are present "
- + "while filter evaluation. ";
+ public static final String FILTER_INVALID_MEMBER =
+ " Invalid Record(s) are present while filter evaluation. ";
/**
* Number of unmerged segments to be merged.
@@ -880,25 +876,23 @@ public final class CarbonCommonConstants {
* Only accepted Range is 0 - 10000. Outside this range system will pick default value.
*/
public static final String UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
- "carbon.horizontal.update.compaction.threshold";
+ "carbon.horizontal.update.compaction.threshold";
/**
* Default count of segments which act as a threshold for IUD compaction merge.
*/
public static final String DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION = "1";
-
/**
* Number of Delete Delta files which is the Threshold for IUD compaction.
* Only accepted Range is 0 - 10000. Outside this range system will pick default value.
*/
- public static final String DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
+ public static final String DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
"carbon.horizontal.delete.compaction.threshold";
/**
* Default count of segments which act as a threshold for IUD compaction merge.
*/
public static final String DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION = "1";
-
/**
* default location of the carbon metastore db
*/
@@ -943,8 +937,7 @@ public final class CarbonCommonConstants {
* @Deprecated : This property has been deprecated.
* Property for enabling system level compaction lock.1 compaction can run at once.
*/
- public static String ENABLE_CONCURRENT_COMPACTION =
- "carbon.concurrent.compaction";
+ public static String ENABLE_CONCURRENT_COMPACTION = "carbon.concurrent.compaction";
/**
* Default value of Property for enabling system level compaction lock.1 compaction can run
@@ -1024,12 +1017,8 @@ public final class CarbonCommonConstants {
/**
* current data file version
*/
- public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V2";
- /**
- * number of column data will read in IO operation
- * during query execution
- */
- public static final short NUMBER_OF_COLUMN_READ_IN_IO = 10;
+ public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V3";
+
/**
* data file version header
*/
@@ -1105,7 +1094,6 @@ public final class CarbonCommonConstants {
/**
* Default carbon dictionary server port
-
*/
public static final String DICTIONARY_SERVER_PORT_DEFAULT = "2030";
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
new file mode 100644
index 0000000..060b55c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.constants;
+
+/**
+ * Constants for V3 data format
+ */
+public interface CarbonV3DataFormatConstants {
+
+ /**
+ * number of page per blocklet column
+ */
+ String NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN = "carbon.number.of.page.in.blocklet.column";
+
+ /**
+ * number of page per blocklet column default value
+ */
+ String NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE = "10";
+
+ /**
+ * number of page per blocklet column max value
+ */
+ short NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MAX = 20;
+
+ /**
+ * number of page per blocklet column min value
+ */
+ short NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MIN = 1;
+
+ /**
+ * number of column to be read in one IO in query
+ */
+ String NUMBER_OF_COLUMN_TO_READ_IN_IO = "number.of.column.to.read.in.io";
+
+ /**
+ * number of column to be read in one IO in query default value
+ */
+ String NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE = "10";
+
+ /**
+ * number of column to be read in one IO in query max value
+ */
+ short NUMBER_OF_COLUMN_TO_READ_IN_IO_MAX = 20;
+
+ /**
+ * number of column to be read in one IO in query min value
+ */
+ short NUMBER_OF_COLUMN_TO_READ_IN_IO_MIN = 1;
+
+ /**
+ * number of rows per blocklet column page
+ */
+ String NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE = "number.of.rows.per.blocklet.column.page";
+
+ /**
+ * number of rows per blocklet column page default value
+ */
+ String NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT = "32000";
+
+ /**
+ * number of rows per blocklet column page max value
+ */
+ short NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MAX = 32000;
+
+ /**
+ * number of rows per blocklet column page min value
+ */
+ short NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MIN = 8000;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
index d04077c..eebb382 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datastore.chunk;
import java.nio.ByteBuffer;
+import org.apache.carbondata.format.DataChunk3;
/**
* It contains group of uncompressed blocklets on one column.
@@ -44,6 +45,8 @@ public abstract class AbstractRawColumnChunk {
protected int length;
+ protected DataChunk3 dataChunkV3;
+
public AbstractRawColumnChunk(int blockletId, ByteBuffer rawData, int offSet, int length) {
this.blockletId = blockletId;
this.rawData = rawData;
@@ -121,4 +124,12 @@ public abstract class AbstractRawColumnChunk {
return length;
}
+ public DataChunk3 getDataChunkV3() {
+ return dataChunkV3;
+ }
+
+ public void setDataChunkV3(DataChunk3 dataChunkV3) {
+ this.dataChunkV3 = dataChunkV3;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
index e20fcbe..8fee760 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
@@ -18,8 +18,10 @@ package org.apache.carbondata.core.datastore.chunk.reader;
import org.apache.carbondata.core.datastore.chunk.reader.dimension.v1.CompressedDimensionChunkFileBasedReaderV1;
import org.apache.carbondata.core.datastore.chunk.reader.dimension.v2.CompressedDimensionChunkFileBasedReaderV2;
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.v3.CompressedDimensionChunkFileBasedReaderV3;
import org.apache.carbondata.core.datastore.chunk.reader.measure.v1.CompressedMeasureChunkFileBasedReaderV1;
import org.apache.carbondata.core.datastore.chunk.reader.measure.v2.CompressedMeasureChunkFileBasedReaderV2;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.v3.CompressedMeasureChunkFileBasedReaderV3;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
@@ -65,9 +67,13 @@ public class CarbonDataReaderFactory {
case V1:
return new CompressedDimensionChunkFileBasedReaderV1(blockletInfo, eachColumnValueSize,
filePath);
- default:
+ case V2:
return new CompressedDimensionChunkFileBasedReaderV2(blockletInfo, eachColumnValueSize,
filePath);
+ case V3:
+ default:
+ return new CompressedDimensionChunkFileBasedReaderV3(blockletInfo, eachColumnValueSize,
+ filePath);
}
}
@@ -84,8 +90,12 @@ public class CarbonDataReaderFactory {
switch (version) {
case V1:
return new CompressedMeasureChunkFileBasedReaderV1(blockletInfo, filePath);
- default:
+ case V2:
return new CompressedMeasureChunkFileBasedReaderV2(blockletInfo, filePath);
+ case V3:
+ default:
+ return new CompressedMeasureChunkFileBasedReaderV3(blockletInfo, filePath);
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java
new file mode 100644
index 0000000..f083612
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.dimension;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * Abstract class for V2, V3 format dimension column reader
+ */
+public abstract class AbstractChunkReaderV2V3Format extends AbstractChunkReader {
+
+ /**
+ * dimension chunks offset
+ */
+ protected List<Long> dimensionChunksOffset;
+
+ /**
+ * dimension chunks length
+ */
+ protected List<Integer> dimensionChunksLength;
+
+ public AbstractChunkReaderV2V3Format(final BlockletInfo blockletInfo,
+ final int[] eachColumnValueSize, final String filePath) {
+ super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
+ dimensionChunksOffset = blockletInfo.getDimensionChunkOffsets();
+ dimensionChunksLength = blockletInfo.getDimensionChunksLength();
+ }
+
+ /**
+ * Below method will be used to read the chunk based on block indexes
+ * Reading logic of below method is:
+ * Except last column all the column chunk can be read in group
+ * if not last column then read data of all the column present in block index
+ * together then process it.
+ * For last column read is separately and process
+ *
+ * @param fileReader file reader to read the blocks from file
+ * @param blockletIndexes blocks range to be read
+ * @return dimension column chunks
+ */
+ @Override public DimensionRawColumnChunk[] readRawDimensionChunks(final FileHolder fileReader,
+ final int[][] blockletIndexes) throws IOException {
+ // read the column chunk based on block index and add
+ DimensionRawColumnChunk[] dataChunks =
+ new DimensionRawColumnChunk[dimensionChunksOffset.size()];
+ // if blocklet index is empty then return empry data chunk
+ if (blockletIndexes.length == 0) {
+ return dataChunks;
+ }
+ DimensionRawColumnChunk[] groupChunk = null;
+ int index = 0;
+ // iterate till block indexes -1 as block index will be in sorted order, so to avoid
+ // the last column reading in group
+ for (int i = 0; i < blockletIndexes.length - 1; i++) {
+ index = 0;
+ groupChunk =
+ readRawDimensionChunksInGroup(fileReader, blockletIndexes[i][0], blockletIndexes[i][1]);
+ for (int j = blockletIndexes[i][0]; j <= blockletIndexes[i][1]; j++) {
+ dataChunks[j] = groupChunk[index++];
+ }
+ }
+ // check last index is present in block index, if it is present then read separately
+ if (blockletIndexes[blockletIndexes.length - 1][0] == dimensionChunksOffset.size() - 1) {
+ dataChunks[blockletIndexes[blockletIndexes.length - 1][0]] =
+ readRawDimensionChunk(fileReader, blockletIndexes[blockletIndexes.length - 1][0]);
+ }
+ // otherwise read the data in group
+ else {
+ groupChunk =
+ readRawDimensionChunksInGroup(fileReader, blockletIndexes[blockletIndexes.length - 1][0],
+ blockletIndexes[blockletIndexes.length - 1][1]);
+ index = 0;
+ for (int j = blockletIndexes[blockletIndexes.length - 1][0];
+ j <= blockletIndexes[blockletIndexes.length - 1][1]; j++) {
+ dataChunks[j] = groupChunk[index++];
+ }
+ }
+ return dataChunks;
+ }
+
+ /**
+ * Below method will be used to read measure chunk data in group.
+ * This method will be useful to avoid multiple IO while reading the
+ * data from
+ *
+ * @param fileReader file reader to read the data
+ * @param startColumnBlockletIndex first column blocklet index to be read
+ * @param endColumnBlockletIndex end column blocklet index to be read
+ * @return measure raw chunkArray
+ * @throws IOException
+ */
+ protected abstract DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
+ int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException;
+
+ /**
+ * Below method will be used to check whether particular encoding is present
+ * in the dimension or not
+ *
+ * @param encoding encoding to search
+ * @return if encoding is present in dimension
+ */
+ protected boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
+ return encodings.contains(encoding);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
index 9d5849f..b2201cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.core.datastore.chunk.reader.dimension.v2;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.List;
import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
@@ -26,7 +25,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.ColumnGroupDimensionDataC
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
-import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader;
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -36,17 +35,7 @@ import org.apache.carbondata.format.Encoding;
/**
* Compressed dimension chunk reader class for version 2
*/
-public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkReader {
-
- /**
- * dimension chunks offset
- */
- private List<Long> dimensionChunksOffset;
-
- /**
- * dimension chunks length
- */
- private List<Integer> dimensionChunksLength;
+public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkReaderV2V3Format {
/**
* Constructor to get minimum parameter to create instance of this class
@@ -57,73 +46,18 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
*/
public CompressedDimensionChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
final int[] eachColumnValueSize, final String filePath) {
- super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
- this.dimensionChunksOffset = blockletInfo.getDimensionChunkOffsets();
- this.dimensionChunksLength = blockletInfo.getDimensionChunksLength();
-
- }
-
- /**
- * Below method will be used to read the chunk based on block indexes
- * Reading logic of below method is:
- * Except last column all the column chunk can be read in group
- * if not last column then read data of all the column present in block index
- * together then process it.
- * For last column read is separately and process
- *
- * @param fileReader file reader to read the blocks from file
- * @param blockletIndexes blocks range to be read
- * @return dimension column chunks
- */
- @Override public DimensionRawColumnChunk[] readRawDimensionChunks(final FileHolder fileReader,
- final int[][] blockletIndexes) throws IOException {
- // read the column chunk based on block index and add
- DimensionRawColumnChunk[] dataChunks =
- new DimensionRawColumnChunk[dimensionChunksOffset.size()];
- // if blocklet index is empty then return empry data chunk
- if (blockletIndexes.length == 0) {
- return dataChunks;
- }
- DimensionRawColumnChunk[] groupChunk = null;
- int index = 0;
- // iterate till block indexes -1 as block index will be in sorted order, so to avoid
- // the last column reading in group
- for (int i = 0; i < blockletIndexes.length - 1; i++) {
- index = 0;
- groupChunk =
- readRawDimensionChunksInGroup(fileReader, blockletIndexes[i][0], blockletIndexes[i][1]);
- for (int j = blockletIndexes[i][0]; j <= blockletIndexes[i][1]; j++) {
- dataChunks[j] = groupChunk[index++];
- }
- }
- // check last index is present in block index, if it is present then read separately
- if (blockletIndexes[blockletIndexes.length - 1][0] == dimensionChunksOffset.size() - 1) {
- dataChunks[blockletIndexes[blockletIndexes.length - 1][0]] =
- readRawDimensionChunk(fileReader, blockletIndexes[blockletIndexes.length - 1][0]);
- }
- // otherwise read the data in group
- else {
- groupChunk =
- readRawDimensionChunksInGroup(fileReader, blockletIndexes[blockletIndexes.length - 1][0],
- blockletIndexes[blockletIndexes.length - 1][1]);
- index = 0;
- for (int j = blockletIndexes[blockletIndexes.length - 1][0];
- j <= blockletIndexes[blockletIndexes.length - 1][1]; j++) {
- dataChunks[j] = groupChunk[index++];
- }
- }
- return dataChunks;
+ super(blockletInfo, eachColumnValueSize, filePath);
}
/**
* Below method will be used to read the chunk based on block index
*
- * @param fileReader file reader to read the blocks from file
+ * @param fileReader file reader to read the blocks from file
* @param blockletIndex block to be read
* @return dimension column chunk
*/
- public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader,
- int blockletIndex) throws IOException {
+ public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader, int blockletIndex)
+ throws IOException {
int length = 0;
if (dimensionChunksOffset.size() - 1 == blockletIndex) {
// Incase of last block read only for datachunk and read remaining while converting it.
@@ -140,24 +74,35 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
new DimensionRawColumnChunk(blockletIndex, buffer, 0, length, this);
rawColumnChunk.setFileHolder(fileReader);
rawColumnChunk.setPagesCount(1);
- rawColumnChunk.setRowCount(new int[]{numberOfRows});
+ rawColumnChunk.setRowCount(new int[] { numberOfRows });
return rawColumnChunk;
}
- private DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
- int startBlockIndex, int endBlockIndex) throws IOException {
- long currentDimensionOffset = dimensionChunksOffset.get(startBlockIndex);
+ /**
+ * Below method will be used to read measure chunk data in group.
+ * This method will be useful to avoid multiple IO while reading the
+ * data from
+ *
+ * @param fileReader file reader to read the data
+ * @param startColumnBlockletIndex first column blocklet index to be read
+ * @param endColumnBlockletIndex end column blocklet index to be read
+ * @return measure raw chunkArray
+ * @throws IOException
+ */
+ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
+ int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
+ long currentDimensionOffset = dimensionChunksOffset.get(startColumnBlockletIndex);
ByteBuffer buffer = ByteBuffer.allocateDirect(
- (int) (dimensionChunksOffset.get(endBlockIndex + 1) - currentDimensionOffset));
+ (int) (dimensionChunksOffset.get(endColumnBlockletIndex + 1) - currentDimensionOffset));
synchronized (fileReader) {
fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset,
- (int) (dimensionChunksOffset.get(endBlockIndex + 1) - currentDimensionOffset));
+ (int) (dimensionChunksOffset.get(endColumnBlockletIndex + 1) - currentDimensionOffset));
}
DimensionRawColumnChunk[] dataChunks =
- new DimensionRawColumnChunk[endBlockIndex - startBlockIndex + 1];
+ new DimensionRawColumnChunk[endColumnBlockletIndex - startColumnBlockletIndex + 1];
int index = 0;
int runningLength = 0;
- for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+ for (int i = startColumnBlockletIndex; i <= endColumnBlockletIndex; i++) {
int currentLength = (int) (dimensionChunksOffset.get(i + 1) - dimensionChunksOffset.get(i));
dataChunks[index] =
new DimensionRawColumnChunk(i, buffer, runningLength, currentLength, this);
@@ -181,8 +126,8 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
int blockIndex = dimensionRawColumnChunk.getBlockletId();
ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
if (dimensionChunksOffset.size() - 1 == blockIndex) {
- dimensionColumnChunk = CarbonUtil
- .readDataChunk(rawData, copySourcePoint, dimensionRawColumnChunk.getLength());
+ dimensionColumnChunk =
+ CarbonUtil.readDataChunk(rawData, copySourcePoint, dimensionRawColumnChunk.getLength());
int totalDimensionDataLength =
dimensionColumnChunk.data_page_length + dimensionColumnChunk.rle_page_length
+ dimensionColumnChunk.rowid_page_length;
@@ -202,8 +147,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
rawData.position(copySourcePoint);
rawData.get(data);
// first read the data and uncompressed it
- dataPage =
- COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
+ dataPage = COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
copySourcePoint += dimensionColumnChunk.data_page_length;
// if row id block is present then read the row id chunk and uncompress it
if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
@@ -223,8 +167,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
byte[] dataRle = new byte[dimensionColumnChunk.rle_page_length];
rawData.position(copySourcePoint);
rawData.get(dataRle);
- rlePage =
- numberComressor.unCompress(dataRle, 0, dimensionColumnChunk.rle_page_length);
+ rlePage = numberComressor.unCompress(dataRle, 0, dimensionColumnChunk.rle_page_length);
// uncompress the data with rle indexes
dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[blockIndex]);
}
@@ -250,16 +193,4 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
}
return columnDataChunk;
}
-
- /**
- * Below method will be used to check whether particular encoding is present
- * in the dimension or not
- *
- * @param encoding encoding to search
- * @return if encoding is present in dimension
- */
- private boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
- return encodings.contains(encoding);
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
new file mode 100644
index 0000000..acaa2fa
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.dimension.v3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
+import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
+import org.apache.carbondata.format.Encoding;
+
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * Dimension column V3 Reader class which will be used to read and uncompress
+ * V3 format data
+ * data format
+ * Data Format
+ * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
+ * <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
+ * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
+ * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
+ */
+public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkReaderV2V3Format {
+
+ /**
+ * end position of last dimension in carbon data file
+ */
+ private long lastDimensionOffsets;
+
+ public CompressedDimensionChunkFileBasedReaderV3(BlockletInfo blockletInfo,
+ int[] eachColumnValueSize, String filePath) {
+ super(blockletInfo, eachColumnValueSize, filePath);
+ lastDimensionOffsets = blockletInfo.getDimensionOffset();
+ }
+
+ /**
+ * Below method will be used to read the dimension column data form carbon data file
+ * Steps for reading
+ * 1. Get the length of the data to be read
+ * 2. Allocate the direct buffer
+ * 3. read the data from file
+ * 4. Get the data chunk object from data read
+ * 5. Create the raw chunk object and fill the details
+ *
+ * @param fileReader reader for reading the column from carbon data file
+ * @param blockletColumnIndex blocklet index of the column in carbon data file
+ * @return dimension raw chunk
+ */
+ public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader,
+ int blockletColumnIndex) throws IOException {
+ // get the current dimension offset
+ long currentDimensionOffset = dimensionChunksOffset.get(blockletColumnIndex);
+ int length = 0;
+ // to calculate the length of the data to be read
+ // column other than last column we can subtract the offset of current column with
+ // next column and get the total length.
+ // but for last column we need to use lastDimensionOffset which is the end position
+ // of the last dimension, we can subtract current dimension offset from lastDimesionOffset
+ if (dimensionChunksOffset.size() - 1 == blockletColumnIndex) {
+ length = (int) (lastDimensionOffsets - currentDimensionOffset);
+ } else {
+ length = (int) (dimensionChunksOffset.get(blockletColumnIndex + 1) - currentDimensionOffset);
+ }
+ // allocate the buffer
+ ByteBuffer buffer = ByteBuffer.allocateDirect(length);
+ // read the data from carbon data file
+ synchronized (fileReader) {
+ fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset, length);
+ }
+ // get the data chunk which will have all the details about the data pages
+ DataChunk3 dataChunk = CarbonUtil.readDataChunk3(buffer, 0, length);
+ // creating a raw chunks instance and filling all the details
+ DimensionRawColumnChunk rawColumnChunk =
+ new DimensionRawColumnChunk(blockletColumnIndex, buffer, 0, length, this);
+ int numberOfPages = dataChunk.getPage_length().size();
+ byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+ byte[][] minValueOfEachPage = new byte[numberOfPages][];
+ int[] eachPageLength = new int[numberOfPages];
+ for (int i = 0; i < minValueOfEachPage.length; i++) {
+ maxValueOfEachPage[i] =
+ dataChunk.getData_chunk_list().get(i).getMin_max().getMax_values().get(0).array();
+ minValueOfEachPage[i] =
+ dataChunk.getData_chunk_list().get(i).getMin_max().getMin_values().get(0).array();
+ eachPageLength[i] = dataChunk.getData_chunk_list().get(i).getNumberOfRowsInpage();
+ }
+ rawColumnChunk.setDataChunkV3(dataChunk);
+ rawColumnChunk.setFileHolder(fileReader);
+ rawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
+ rawColumnChunk.setMaxValues(maxValueOfEachPage);
+ rawColumnChunk.setMinValues(minValueOfEachPage);
+ rawColumnChunk.setRowCount(eachPageLength);
+ rawColumnChunk.setLengths(ArrayUtils
+ .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+ rawColumnChunk.setOffsets(ArrayUtils
+ .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+ return rawColumnChunk;
+ }
+
+ /**
+ * Below method will be used to read the multiple dimension column data in group
+ * and divide into dimension raw chunk object
+ * Steps for reading
+ * 1. Get the length of the data to be read
+ * 2. Allocate the direct buffer
+ * 3. read the data from file
+ * 4. Get the data chunk object from file for each column
+ * 5. Create the raw chunk object and fill the details for each column
+ * 6. increment the offset of the data
+ *
+ * @param fileReader
+ * reader which will be used to read the dimension columns data from file
+ * @param startBlockletColumnIndex
+ * blocklet index of the first dimension column
+ * @param endBlockletColumnIndex
+ * blocklet index of the last dimension column
+ * @ DimensionRawColumnChunk array
+ */
+ protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
+ int startBlockletColumnIndex, int endBlockletColumnIndex) throws IOException {
+ // to calculate the length of the data to be read
+ // column we can subtract the offset of start column offset with
+ // end column+1 offset and get the total length.
+ long currentDimensionOffset = dimensionChunksOffset.get(startBlockletColumnIndex);
+ ByteBuffer buffer = ByteBuffer.allocateDirect(
+ (int) (dimensionChunksOffset.get(endBlockletColumnIndex + 1) - currentDimensionOffset));
+ // read the data from carbon data file
+ synchronized (fileReader) {
+ fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset,
+ (int) (dimensionChunksOffset.get(endBlockletColumnIndex + 1) - currentDimensionOffset));
+ }
+ // create raw chunk for each dimension column
+ DimensionRawColumnChunk[] dimensionDataChunks =
+ new DimensionRawColumnChunk[endBlockletColumnIndex - startBlockletColumnIndex + 1];
+ int index = 0;
+ int runningLength = 0;
+ for (int i = startBlockletColumnIndex; i <= endBlockletColumnIndex; i++) {
+ int currentLength = (int) (dimensionChunksOffset.get(i + 1) - dimensionChunksOffset.get(i));
+ dimensionDataChunks[index] =
+ new DimensionRawColumnChunk(i, buffer, runningLength, currentLength, this);
+ DataChunk3 dataChunk =
+ CarbonUtil.readDataChunk3(buffer, runningLength, dimensionChunksLength.get(i));
+ int numberOfPages = dataChunk.getPage_length().size();
+ byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+ byte[][] minValueOfEachPage = new byte[numberOfPages][];
+ int[] eachPageLength = new int[numberOfPages];
+ for (int j = 0; j < minValueOfEachPage.length; j++) {
+ maxValueOfEachPage[j] =
+ dataChunk.getData_chunk_list().get(j).getMin_max().getMax_values().get(0).array();
+ minValueOfEachPage[j] =
+ dataChunk.getData_chunk_list().get(j).getMin_max().getMin_values().get(0).array();
+ eachPageLength[j] = dataChunk.getData_chunk_list().get(j).getNumberOfRowsInpage();
+ }
+ dimensionDataChunks[index].setDataChunkV3(dataChunk);
+ dimensionDataChunks[index].setFileHolder(fileReader);
+ dimensionDataChunks[index].setPagesCount(dataChunk.getPage_length().size());
+ dimensionDataChunks[index].setMaxValues(maxValueOfEachPage);
+ dimensionDataChunks[index].setMinValues(minValueOfEachPage);
+ dimensionDataChunks[index].setRowCount(eachPageLength);
+ dimensionDataChunks[index].setLengths(ArrayUtils
+ .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+ dimensionDataChunks[index].setOffsets(ArrayUtils
+ .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+ runningLength += currentLength;
+ index++;
+ }
+ return dimensionDataChunks;
+ }
+
+ /**
+ * Below method will be used to convert the compressed dimension chunk raw data to actual data
+ *
+ * @param dimensionRawColumnChunk dimension raw chunk
+ * @param page number
+ * @return DimensionColumnDataChunk
+ */
+ @Override public DimensionColumnDataChunk convertToDimensionChunk(
+ DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) throws IOException {
+ byte[] dataPage = null;
+ int[] invertedIndexes = null;
+ int[] invertedIndexesReverse = null;
+ int[] rlePage = null;
+ // data chunk of page
+ DataChunk2 dimensionColumnChunk = null;
+ // data chunk of blocklet column
+ DataChunk3 dataChunk3 = dimensionRawColumnChunk.getDataChunkV3();
+ // get the data buffer
+ ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
+ dimensionColumnChunk = dataChunk3.getData_chunk_list().get(pageNumber);
+ // calculating the start point of data
+ // as buffer can contain multiple column data, start point will be datachunkoffset +
+ // data chunk length + page offset
+ int copySourcePoint = dimensionRawColumnChunk.getOffSet() + dimensionChunksLength
+ .get(dimensionRawColumnChunk.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
+ byte[] data = new byte[dimensionColumnChunk.data_page_length];
+ rawData.position(copySourcePoint);
+ rawData.get(data);
+ // first read the data and uncompressed it
+ dataPage = COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
+ copySourcePoint += dimensionColumnChunk.data_page_length;
+ // if row id block is present then read the row id chunk and uncompress it
+ if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
+ invertedIndexes = CarbonUtil
+ .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, rawData,
+ copySourcePoint);
+ copySourcePoint += dimensionColumnChunk.rowid_page_length;
+ // get the reverse index
+ invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
+ }
+ // if rle is applied then read the rle block chunk and then uncompress
+ //then actual data based on rle block
+ if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
+ rlePage =
+ CarbonUtil.getIntArray(rawData, copySourcePoint, dimensionColumnChunk.rle_page_length);
+ // uncompress the data with rle indexes
+ dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage,
+ eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()]);
+ rlePage = null;
+ }
+ // fill chunk attributes
+ DimensionColumnDataChunk columnDataChunk = null;
+
+ if (dimensionColumnChunk.isRowMajor()) {
+ // to store fixed length column chunk values
+ columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage,
+ eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()],
+ dimensionRawColumnChunk.getRowCount()[pageNumber]);
+ }
+ // if no dictionary column then first create a no dictionary column chunk
+ // and set to data chunk instance
+ else if (!hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) {
+ columnDataChunk =
+ new VariableLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
+ dimensionRawColumnChunk.getRowCount()[pageNumber]);
+ } else {
+ // to store fixed length column chunk values
+ columnDataChunk =
+ new FixedLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
+ dimensionRawColumnChunk.getRowCount()[pageNumber],
+ eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()]);
+ }
+ return columnDataChunk;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
new file mode 100644
index 0000000..a94d08b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.measure;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
+
+/**
+ * Abstract class for V2, V3 format measure column reader
+ */
+public abstract class AbstractMeasureChunkReaderV2V3Format extends AbstractMeasureChunkReader {
+
+ /**
+ * measure column chunks offset
+ */
+ protected List<Long> measureColumnChunkOffsets;
+
+ /**
+ * measure column chunks length
+ */
+ protected List<Integer> measureColumnChunkLength;
+
+ public AbstractMeasureChunkReaderV2V3Format(final BlockletInfo blockletInfo,
+ final String filePath) {
+ super(filePath, blockletInfo.getNumberOfRows());
+ this.measureColumnChunkOffsets = blockletInfo.getMeasureChunkOffsets();
+ this.measureColumnChunkLength = blockletInfo.getMeasureChunksLength();
+ }
+
+ /**
+ * Below method will be used to read the chunk based on block indexes
+ * Reading logic of below method is: Except last column all the column chunk
+ * can be read in group if not last column then read data of all the column
+ * present in block index together then process it. For last column read is
+ * separately and process
+ *
+ * @param fileReader file reader to read the blocks from file
+ * @param blockIndexes blocks range to be read
+ * @return measure column chunks
+ * @throws IOException
+ */
+ public MeasureRawColumnChunk[] readRawMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
+ throws IOException {
+ // read the column chunk based on block index and add
+ MeasureRawColumnChunk[] dataChunks =
+ new MeasureRawColumnChunk[measureColumnChunkOffsets.size()];
+ if (blockIndexes.length == 0) {
+ return dataChunks;
+ }
+ MeasureRawColumnChunk[] groupChunk = null;
+ int index = 0;
+ for (int i = 0; i < blockIndexes.length - 1; i++) {
+ index = 0;
+ groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
+ for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
+ dataChunks[j] = groupChunk[index++];
+ }
+ }
+ if (blockIndexes[blockIndexes.length - 1][0] == measureColumnChunkOffsets.size() - 1) {
+ dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
+ readRawMeasureChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
+ } else {
+ groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
+ blockIndexes[blockIndexes.length - 1][1]);
+ index = 0;
+ for (int j = blockIndexes[blockIndexes.length - 1][0];
+ j <= blockIndexes[blockIndexes.length - 1][1]; j++) {
+ dataChunks[j] = groupChunk[index++];
+ }
+ }
+ return dataChunks;
+ }
+
+ /**
+ * Below method will be used to convert the thrift presence meta to wrapper
+ * presence meta
+ *
+ * @param presentMetadataThrift
+ * @return wrapper presence meta
+ */
+ protected PresenceMeta getPresenceMeta(
+ org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
+ PresenceMeta presenceMeta = new PresenceMeta();
+ presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
+ presenceMeta.setBitSet(BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
+ .unCompressByte(presentMetadataThrift.getPresent_bit_stream())));
+ return presenceMeta;
+ }
+
+ /**
+ * Below method will be used to read measure chunk data in group.
+ * This method will be useful to avoid multiple IO while reading the
+ * data from
+ *
+ * @param fileReader file reader to read the data
+ * @param startColumnBlockletIndex first column blocklet index to be read
+ * @param endColumnBlockletIndex end column blocklet index to be read
+ * @return measure raw chunkArray
+ * @throws IOException
+ */
+ protected abstract MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
+ int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index 7ac1578..7b6acee 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -19,37 +19,24 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure.v2;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.BitSet;
import java.util.List;
import org.apache.carbondata.core.datastore.FileHolder;
import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
-import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.DataChunk2;
/**
* Class to read the measure column data for version 2
*/
-public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChunkReader {
-
- /**
- * measure column chunks offset
- */
- private List<Long> measureColumnChunkOffsets;
-
- /**
- * measure column chunks length
- */
- private List<Integer> measureColumnChunkLength;
+public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChunkReaderV2V3Format {
/**
* Constructor to get minimum parameter to create instance of this class
@@ -59,69 +46,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
*/
public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
final String filePath) {
- super(filePath, blockletInfo.getNumberOfRows());
- this.measureColumnChunkOffsets = blockletInfo.getMeasureChunkOffsets();
- this.measureColumnChunkLength = blockletInfo.getMeasureChunksLength();
- }
-
- /**
- * Below method will be used to convert the thrift presence meta to wrapper
- * presence meta
- *
- * @param presentMetadataThrift
- * @return wrapper presence meta
- */
- private static PresenceMeta getPresenceMeta(
- org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
- PresenceMeta presenceMeta = new PresenceMeta();
- presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
- presenceMeta.setBitSet(BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
- .unCompressByte(presentMetadataThrift.getPresent_bit_stream())));
- return presenceMeta;
- }
-
- /**
- * Below method will be used to read the chunk based on block indexes
- * Reading logic of below method is: Except last column all the column chunk
- * can be read in group if not last column then read data of all the column
- * present in block index together then process it. For last column read is
- * separately and process
- *
- * @param fileReader file reader to read the blocks from file
- * @param blockIndexes blocks range to be read
- * @return measure column chunks
- * @throws IOException
- */
- public MeasureRawColumnChunk[] readRawMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
- throws IOException {
- // read the column chunk based on block index and add
- MeasureRawColumnChunk[] dataChunks =
- new MeasureRawColumnChunk[measureColumnChunkOffsets.size()];
- if (blockIndexes.length == 0) {
- return dataChunks;
- }
- MeasureRawColumnChunk[] groupChunk = null;
- int index = 0;
- for (int i = 0; i < blockIndexes.length - 1; i++) {
- index = 0;
- groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
- for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
- dataChunks[j] = groupChunk[index++];
- }
- }
- if (blockIndexes[blockIndexes.length - 1][0] == measureColumnChunkOffsets.size() - 1) {
- dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
- readRawMeasureChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
- } else {
- groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
- blockIndexes[blockIndexes.length - 1][1]);
- index = 0;
- for (int j = blockIndexes[blockIndexes.length - 1][0];
- j <= blockIndexes[blockIndexes.length - 1][1]; j++) {
- dataChunks[j] = groupChunk[index++];
- }
- }
- return dataChunks;
+ super(blockletInfo, filePath);
}
@Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
@@ -146,20 +71,31 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
return rawColumnChunk;
}
- private MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
- int startBlockIndex, int endBlockIndex) throws IOException {
- long currentMeasureOffset = measureColumnChunkOffsets.get(startBlockIndex);
+ /**
+ * Below method will be used to read measure chunk data in group.
+ * This method will be useful to avoid multiple IO while reading the
+ * data from
+ *
+ * @param fileReader file reader to read the data
+ * @param startColumnBlockletIndex first column blocklet index to be read
+ * @param endColumnBlockletIndex end column blocklet index to be read
+ * @return measure raw chunkArray
+ * @throws IOException
+ */
+ protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
+ int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
+ long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnBlockletIndex);
ByteBuffer buffer = ByteBuffer.allocateDirect(
- (int) (measureColumnChunkOffsets.get(endBlockIndex + 1) - currentMeasureOffset));
+ (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
synchronized (fileReader) {
fileReader.readByteBuffer(filePath, buffer, currentMeasureOffset,
- (int) (measureColumnChunkOffsets.get(endBlockIndex + 1) - currentMeasureOffset));
+ (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
}
MeasureRawColumnChunk[] dataChunks =
- new MeasureRawColumnChunk[endBlockIndex - startBlockIndex + 1];
+ new MeasureRawColumnChunk[endColumnBlockletIndex - startColumnBlockletIndex + 1];
int runningLength = 0;
int index = 0;
- for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+ for (int i = startColumnBlockletIndex; i <= endColumnBlockletIndex; i++) {
int currentLength =
(int) (measureColumnChunkOffsets.get(i + 1) - measureColumnChunkOffsets.get(i));
MeasureRawColumnChunk measureRawColumnChunk =
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
new file mode 100644
index 0000000..307af41
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
+
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * Measure column V3 Reader class which will be used to read and uncompress
+ * V3 format data
+ * data format
+ * Data Format
+ * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
+ * <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
+ * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
+ * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
+ */
+public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChunkReaderV2V3Format {
+
+ /**
+ * end position of last measure in carbon data file
+ */
+ private long measureOffsets;
+
+ public CompressedMeasureChunkFileBasedReaderV3(BlockletInfo blockletInfo, String filePath) {
+ super(blockletInfo, filePath);
+ measureOffsets = blockletInfo.getMeasureOffsets();
+ }
+
+ /**
+ * Below method will be used to read the measure column data form carbon data file
+ * 1. Get the length of the data to be read
+ * 2. Allocate the direct buffer
+ * 3. read the data from file
+ * 4. Get the data chunk object from data read
+ * 5. Create the raw chunk object and fill the details
+ *
+ * @param fileReader reader for reading the column from carbon data file
+ * @param blockletColumnIndex blocklet index of the column in carbon data file
+ * @return measure raw chunk
+ */
+ @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
+ throws IOException {
+ int dataLength = 0;
+ // to calculate the length of the data to be read
+ // column other than last column we can subtract the offset of current column with
+ // next column and get the total length.
+ // but for last column we need to use lastDimensionOffset which is the end position
+ // of the last dimension, we can subtract current dimension offset from lastDimesionOffset
+ if (measureColumnChunkOffsets.size() - 1 == blockIndex) {
+ dataLength = (int) (measureOffsets - measureColumnChunkOffsets.get(blockIndex));
+ } else {
+ dataLength = (int) (measureColumnChunkOffsets.get(blockIndex + 1) - measureColumnChunkOffsets
+ .get(blockIndex));
+ }
+ // allocate the buffer
+ ByteBuffer buffer = ByteBuffer.allocateDirect(dataLength);
+ // read the data from carbon data file
+ synchronized (fileReader) {
+ fileReader
+ .readByteBuffer(filePath, buffer, measureColumnChunkOffsets.get(blockIndex), dataLength);
+ }
+ // get the data chunk which will have all the details about the data pages
+ DataChunk3 dataChunk =
+ CarbonUtil.readDataChunk3(buffer, 0, measureColumnChunkLength.get(blockIndex));
+ // creating a raw chunks instance and filling all the details
+ MeasureRawColumnChunk rawColumnChunk =
+ new MeasureRawColumnChunk(blockIndex, buffer, 0, dataLength, this);
+ int numberOfPages = dataChunk.getPage_length().size();
+ byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+ byte[][] minValueOfEachPage = new byte[numberOfPages][];
+ int[] eachPageLength = new int[numberOfPages];
+ for (int i = 0; i < minValueOfEachPage.length; i++) {
+ maxValueOfEachPage[i] =
+ dataChunk.getData_chunk_list().get(i).getMin_max().getMax_values().get(0).array();
+ minValueOfEachPage[i] =
+ dataChunk.getData_chunk_list().get(i).getMin_max().getMin_values().get(0).array();
+ eachPageLength[i] = dataChunk.getData_chunk_list().get(i).getNumberOfRowsInpage();
+ }
+ rawColumnChunk.setDataChunkV3(dataChunk);
+ rawColumnChunk.setFileReader(fileReader);
+ rawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
+ rawColumnChunk.setMaxValues(maxValueOfEachPage);
+ rawColumnChunk.setMinValues(minValueOfEachPage);
+ rawColumnChunk.setRowCount(eachPageLength);
+ rawColumnChunk.setLengths(ArrayUtils
+ .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+ rawColumnChunk.setOffsets(ArrayUtils
+ .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+ return rawColumnChunk;
+ }
+
+ /**
+ * Below method will be used to read the multiple measure column data in group
+ * and divide into measure raw chunk object
+ * Steps for reading
+ * 1. Get the length of the data to be read
+ * 2. Allocate the direct buffer
+ * 3. read the data from file
+ * 4. Get the data chunk object from file for each column
+ * 5. Create the raw chunk object and fill the details for each column
+ * 6. increment the offset of the data
+ *
+ * @param fileReader
+ * reader which will be used to read the measure columns data from file
+ * @param startBlockletColumnIndex
+ * blocklet index of the first measure column
+ * @param endBlockletColumnIndex
+ * blocklet index of the last measure column
+ * @return MeasureRawColumnChunk array
+ */
+ protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
+ int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
+ // to calculate the length of the data to be read
+ // column we can subtract the offset of start column offset with
+ // end column+1 offset and get the total length.
+ long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnBlockletIndex);
+ ByteBuffer buffer = ByteBuffer.allocateDirect(
+ (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
+ // read the data from carbon data file
+ synchronized (fileReader) {
+ fileReader.readByteBuffer(filePath, buffer, currentMeasureOffset,
+ (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
+ }
+ // create raw chunk for each measure column
+ MeasureRawColumnChunk[] measureDataChunk =
+ new MeasureRawColumnChunk[endColumnBlockletIndex - startColumnBlockletIndex + 1];
+ int runningLength = 0;
+ int index = 0;
+ for (int i = startColumnBlockletIndex; i <= endColumnBlockletIndex; i++) {
+ int currentLength =
+ (int) (measureColumnChunkOffsets.get(i + 1) - measureColumnChunkOffsets.get(i));
+ MeasureRawColumnChunk measureRawColumnChunk =
+ new MeasureRawColumnChunk(i, buffer, runningLength, currentLength, this);
+ DataChunk3 dataChunk =
+ CarbonUtil.readDataChunk3(buffer, runningLength, measureColumnChunkLength.get(i));
+
+ int numberOfPages = dataChunk.getPage_length().size();
+ byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+ byte[][] minValueOfEachPage = new byte[numberOfPages][];
+ int[] eachPageLength = new int[numberOfPages];
+ for (int j = 0; j < minValueOfEachPage.length; j++) {
+ maxValueOfEachPage[j] =
+ dataChunk.getData_chunk_list().get(j).getMin_max().getMax_values().get(0).array();
+ minValueOfEachPage[j] =
+ dataChunk.getData_chunk_list().get(j).getMin_max().getMin_values().get(0).array();
+ eachPageLength[j] = dataChunk.getData_chunk_list().get(j).getNumberOfRowsInpage();
+ }
+ measureRawColumnChunk.setDataChunkV3(dataChunk);
+ ;
+ measureRawColumnChunk.setFileReader(fileReader);
+ measureRawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
+ measureRawColumnChunk.setMaxValues(maxValueOfEachPage);
+ measureRawColumnChunk.setMinValues(minValueOfEachPage);
+ measureRawColumnChunk.setRowCount(eachPageLength);
+ measureRawColumnChunk.setLengths(ArrayUtils
+ .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+ measureRawColumnChunk.setOffsets(ArrayUtils
+ .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+ measureDataChunk[index] = measureRawColumnChunk;
+ runningLength += currentLength;
+ index++;
+ }
+ return measureDataChunk;
+ }
+
+ /**
+ * Below method will be used to convert the compressed measure chunk raw data to actual data
+ *
+ * @param measureRawColumnChunk measure raw chunk
+ * @param page number
+ * @return DimensionColumnDataChunk
+ */
+ @Override public MeasureColumnDataChunk convertToMeasureChunk(
+ MeasureRawColumnChunk measureRawColumnChunk, int pageNumber) throws IOException {
+ MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
+ // data chunk of blocklet column
+ DataChunk3 dataChunk3 = measureRawColumnChunk.getDataChunkV3();
+ // data chunk of page
+ DataChunk2 measureColumnChunk = dataChunk3.getData_chunk_list().get(pageNumber);
+ // calculating the start point of data
+ // as buffer can contain multiple column data, start point will be datachunkoffset +
+ // data chunk length + page offset
+ int copyPoint = measureRawColumnChunk.getOffSet() + measureColumnChunkLength
+ .get(measureRawColumnChunk.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
+ List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
+ for (int i = 0; i < measureColumnChunk.getEncoder_meta().size(); i++) {
+ valueEncodeMeta.add(CarbonUtil
+ .deserializeEncoderMetaNew(measureColumnChunk.getEncoder_meta().get(i).array()));
+ }
+ WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);
+ ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0];
+ // uncompress
+ byte[] data = new byte[measureColumnChunk.data_page_length];
+ ByteBuffer rawData = measureRawColumnChunk.getRawData();
+ rawData.position(copyPoint);
+ rawData.get(data);
+ values.uncompress(compressionModel.getConvertedDataType()[0], data, 0,
+ measureColumnChunk.data_page_length, compressionModel.getMantissa()[0],
+ compressionModel.getMaxValue()[0], measureRawColumnChunk.getRowCount()[pageNumber]);
+ CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
+ // set the data chunk
+ datChunk.setMeasureDataHolder(measureDataHolder);
+ // set the null value indexes
+ datChunk.setNullValueIndexHolder(getPresenceMeta(measureColumnChunk.presence));
+ return datChunk;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
index 14e7938..23af707 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
/**
* Below class will be used to store fixed length dimension data
@@ -66,13 +67,7 @@ public class SafeFixedLengthDimensionDataChunkStore extends SafeAbsractDimension
}
// below part is to convert the byte array to surrogate value
int startOffsetOfData = index * columnValueSize;
- int surrogate = 0;
- for (int i = 0; i < columnValueSize; i++) {
- surrogate <<= 8;
- surrogate ^= data[startOffsetOfData] & 0xFF;
- startOffsetOfData++;
- }
- return surrogate;
+ return CarbonUtil.getSurrogateInternal(data, startOffsetOfData, columnValueSize);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
new file mode 100644
index 0000000..346d8d8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
+
+ private boolean alreadySorted;
+
+ private short[] dataAfterComp;
+
+ private short[] indexMap;
+
+ private byte[][] keyBlock;
+
+ private short[] dataIndexMap;
+
+ private int totalSize;
+
+ public BlockIndexerStorageForShort(byte[][] keyBlock, boolean compressData,
+ boolean isNoDictionary, boolean isSortRequired) {
+ ColumnWithShortIndex[] columnWithIndexs = createColumnWithIndexArray(keyBlock, isNoDictionary);
+ if (isSortRequired) {
+ Arrays.sort(columnWithIndexs);
+ }
+ compressMyOwnWay(extractDataAndReturnIndexes(columnWithIndexs, keyBlock));
+ if (compressData) {
+ compressDataMyOwnWay(columnWithIndexs);
+ }
+ }
+
+ /**
+ * Create an object with each column array and respective index
+ *
+ * @return
+ */
+ private ColumnWithShortIndex[] createColumnWithIndexArray(byte[][] keyBlock,
+ boolean isNoDictionary) {
+ ColumnWithShortIndex[] columnWithIndexs;
+ if (isNoDictionary) {
+ columnWithIndexs = new ColumnWithShortIndex[keyBlock.length];
+ for (short i = 0; i < columnWithIndexs.length; i++) {
+ columnWithIndexs[i] = new ColumnWithShortIndexForNoDictionay(keyBlock[i], i);
+ }
+ } else {
+ columnWithIndexs = new ColumnWithShortIndex[keyBlock.length];
+ for (short i = 0; i < columnWithIndexs.length; i++) {
+ columnWithIndexs[i] = new ColumnWithShortIndex(keyBlock[i], i);
+ }
+ }
+ return columnWithIndexs;
+ }
+
+ private short[] extractDataAndReturnIndexes(ColumnWithShortIndex[] columnWithIndexs,
+ byte[][] keyBlock) {
+ short[] indexes = new short[columnWithIndexs.length];
+ for (int i = 0; i < indexes.length; i++) {
+ indexes[i] = columnWithIndexs[i].getIndex();
+ keyBlock[i] = columnWithIndexs[i].getColumn();
+ }
+ this.keyBlock = keyBlock;
+ return indexes;
+ }
+
+ /**
+ * It compresses depends up on the sequence numbers.
+ * [1,2,3,4,6,8,10,11,12,13] is translated to [1,4,6,8,10,13] and [0,6]. In
+ * first array the start and end of sequential numbers and second array
+ * keeps the indexes of where sequential numbers starts. If there is no
+ * sequential numbers then the same array it returns with empty second
+ * array.
+ *
+ * @param indexes
+ */
+ public void compressMyOwnWay(short[] indexes) {
+ List<Short> list = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ int k = 0;
+ int i = 1;
+ for (; i < indexes.length; i++) {
+ if (indexes[i] - indexes[i - 1] == 1) {
+ k++;
+ } else {
+ if (k > 0) {
+ map.add(((short) list.size()));
+ list.add(indexes[i - k - 1]);
+ list.add(indexes[i - 1]);
+ } else {
+ list.add(indexes[i - 1]);
+ }
+ k = 0;
+ }
+ }
+ if (k > 0) {
+ map.add(((short) list.size()));
+ list.add(indexes[i - k - 1]);
+ list.add(indexes[i - 1]);
+ } else {
+ list.add(indexes[i - 1]);
+ }
+ double compressionPercentage = (((list.size() + map.size()) * 100) / indexes.length);
+ if (compressionPercentage > 70) {
+ dataAfterComp = indexes;
+ } else {
+ dataAfterComp = convertToArray(list);
+ }
+ if (indexes.length == dataAfterComp.length) {
+ indexMap = new short[0];
+ } else {
+ indexMap = convertToArray(map);
+ }
+ if (dataAfterComp.length == 2 && indexMap.length == 1) {
+ alreadySorted = true;
+ }
+ }
+
+ private short[] convertToArray(List<Short> list) {
+ short[] shortArray = new short[list.size()];
+ for (int i = 0; i < shortArray.length; i++) {
+ shortArray[i] = list.get(i);
+ }
+ return shortArray;
+ }
+
+ /**
+ * @return the alreadySorted
+ */
+ public boolean isAlreadySorted() {
+ return alreadySorted;
+ }
+
+ /**
+ * @return the dataAfterComp
+ */
+ public short[] getDataAfterComp() {
+ return dataAfterComp;
+ }
+
+ /**
+ * @return the indexMap
+ */
+ public short[] getIndexMap() {
+ return indexMap;
+ }
+
+ /**
+ * @return the keyBlock
+ */
+ public byte[][] getKeyBlock() {
+ return keyBlock;
+ }
+
+ private void compressDataMyOwnWay(ColumnWithShortIndex[] indexes) {
+ byte[] prvKey = indexes[0].getColumn();
+ List<ColumnWithShortIndex> list =
+ new ArrayList<ColumnWithShortIndex>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ list.add(indexes[0]);
+ short counter = 1;
+ short start = 0;
+ List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+ for (int i = 1; i < indexes.length; i++) {
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(prvKey, indexes[i].getColumn()) != 0) {
+ prvKey = indexes[i].getColumn();
+ list.add(indexes[i]);
+ map.add(start);
+ map.add(counter);
+ start += counter;
+ counter = 1;
+ continue;
+ }
+ counter++;
+ }
+ map.add(start);
+ map.add(counter);
+ this.keyBlock = convertToKeyArray(list);
+ if (indexes.length == keyBlock.length) {
+ dataIndexMap = new short[0];
+ } else {
+ dataIndexMap = convertToArray(map);
+ }
+ }
+
+ private byte[][] convertToKeyArray(List<ColumnWithShortIndex> list) {
+ byte[][] shortArray = new byte[list.size()][];
+ for (int i = 0; i < shortArray.length; i++) {
+ shortArray[i] = list.get(i).getColumn();
+ totalSize += shortArray[i].length;
+ }
+ return shortArray;
+ }
+
+ @Override public short[] getDataIndexMap() {
+ return dataIndexMap;
+ }
+
+ @Override public int getTotalSize() {
+ return totalSize;
+ }
+
+ @Override public byte[] getMin() {
+ return keyBlock[0];
+ }
+
+ @Override public byte[] getMax() {
+ return keyBlock[keyBlock.length - 1];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java
new file mode 100644
index 0000000..57447a3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.Arrays;
+
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class ColumnWithShortIndex implements Comparable<ColumnWithShortIndex> {
+ protected byte[] column;
+
+ private short index;
+
+ public ColumnWithShortIndex(byte[] column, short index) {
+ this.column = column;
+ this.index = index;
+ }
+
+ /**
+ * @return the column
+ */
+ public byte[] getColumn() {
+ return column;
+ }
+
+ /**
+ * @param column the column to set
+ */
+ public void setColumn(byte[] column) {
+ this.column = column;
+ }
+
+ /**
+ * @return the index
+ */
+ public short getIndex() {
+ return index;
+ }
+
+ /**
+ * @param index the index to set
+ */
+ public void setIndex(short index) {
+ this.index = index;
+ }
+
+ @Override public int compareTo(ColumnWithShortIndex o) {
+ return ByteUtil.UnsafeComparer.INSTANCE.compareTo(column, o.column);
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ ColumnWithShortIndex o = (ColumnWithShortIndex)obj;
+ return Arrays.equals(column, o.column) && index == o.index;
+ }
+
+ @Override public int hashCode() {
+ return Arrays.hashCode(column) + index;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java
new file mode 100644
index 0000000..34cce63
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.Arrays;
+
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+
+public class ColumnWithShortIndexForNoDictionay extends ColumnWithShortIndex
+ implements Comparable<ColumnWithShortIndex> {
+
+ public ColumnWithShortIndexForNoDictionay(byte[] column, short index) {
+ super(column, index);
+ }
+
+ @Override public int compareTo(ColumnWithShortIndex o) {
+ return UnsafeComparer.INSTANCE
+ .compareTo(column, 2, column.length - 2, o.column, 2, o.column.length - 2);
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ ColumnWithIntIndexForHighCard o = (ColumnWithIntIndexForHighCard) obj;
+ return Arrays.equals(column, o.column) && getIndex() == o.getIndex();
+ }
+
+ @Override public int hashCode() {
+ return Arrays.hashCode(column) + getIndex();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
index 7629895..240f891 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
@@ -19,7 +19,8 @@ package org.apache.carbondata.core.metadata;
public enum ColumnarFormatVersion {
V1((short)1),
- V2((short)2);
+ V2((short)2),
+ V3((short)3);
private short version;
ColumnarFormatVersion(short version) {
@@ -43,8 +44,12 @@ public enum ColumnarFormatVersion {
case 1:
// after multiple reader support, user can write new file with version 1
return V1;
- default:
+ case 2:
return V2;
+ case 3:
+ return V3;
+ default:
+ return V3;
}
}
}