You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/26 14:54:30 UTC
[2/4] incubator-carbondata git commit: Added V3 Format Writer and
Reader Code
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 0c2e8ab..1e8207c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.impl.StandardLogService;
import org.apache.carbondata.core.cache.CacheProvider;
import org.apache.carbondata.core.cache.CacheType;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.BlockIndexStore;
import org.apache.carbondata.core.datastore.IndexKey;
import org.apache.carbondata.core.datastore.block.AbstractIndex;
@@ -60,6 +61,7 @@ import org.apache.carbondata.core.scan.model.QueryMeasure;
import org.apache.carbondata.core.scan.model.QueryModel;
import org.apache.carbondata.core.stats.QueryStatistic;
import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
@@ -336,6 +338,10 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
int[] dimensionsBlockIndexes = QueryUtil.getDimensionsBlockIndexes(updatedQueryDimension,
segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions,
queryProperties.complexFilterDimension, allProjectionListDimensionIdexes);
+ int numberOfColumnToBeReadInOneIO = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+ CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE));
+
if (dimensionsBlockIndexes.length > 0) {
numberOfElementToConsider = dimensionsBlockIndexes[dimensionsBlockIndexes.length - 1]
== segmentProperties.getBlockTodimensionOrdinalMapping().size() - 1 ?
@@ -343,7 +349,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
dimensionsBlockIndexes.length;
blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(CarbonUtil
.getRangeIndex(dimensionsBlockIndexes, numberOfElementToConsider,
- CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO));
+ numberOfColumnToBeReadInOneIO));
} else {
blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(new int[0][0]);
}
@@ -362,7 +368,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
// setting all the measure chunk indexes to be read from file
blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(CarbonUtil
.getRangeIndex(measureBlockIndexes, numberOfElementToConsider,
- CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO));
+ numberOfColumnToBeReadInOneIO));
} else {
blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(new int[0][0]);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
index d2523a1..c64f498 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.core.scan.filter.executer;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
@@ -24,11 +25,14 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
import org.apache.carbondata.core.scan.executor.util.QueryUtil;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
import org.apache.carbondata.core.util.ByteUtil;
/**
@@ -80,6 +84,26 @@ public class IncludeColGroupFilterExecuterImpl extends IncludeFilterExecuterImpl
return bitSet;
}
+ @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {
+ int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+ .get(dimColumnEvaluatorInfo.getColumnIndex());
+ if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+ blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+ .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+ }
+ DimensionRawColumnChunk dimensionRawColumnChunk =
+ blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+ BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
+ for (int i = 0; i < dimensionRawColumnChunk.getPagesCount(); i++) {
+ if (dimensionRawColumnChunk.getMaxValues() != null) {
+ BitSet bitSet = getFilteredIndexes(dimensionRawColumnChunk.convertToDimColDataChunk(i),
+ dimensionRawColumnChunk.getRowCount()[i]);
+ bitSetGroup.setBitSet(bitSet, i);
+ }
+ }
+ return bitSetGroup;
+ }
+
/**
* It is required for extracting column data from columngroup chunk
*
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 2bdce8d..c7e2acc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -96,17 +96,9 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
if (rawColumnChunk.getMinValues() != null) {
if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
- int compare = ByteUtil.UnsafeComparer.INSTANCE
- .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]);
- if (compare >= 0) {
- BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
- bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
- bitSetGroup.setBitSet(bitSet, i);
- } else {
- BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
- rawColumnChunk.getRowCount()[i]);
- bitSetGroup.setBitSet(bitSet, i);
- }
+ BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+ rawColumnChunk.getRowCount()[i]);
+ bitSetGroup.setBitSet(bitSet, i);
}
} else {
BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index ae9ba8a..d9795eb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -96,17 +96,9 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
if (rawColumnChunk.getMinValues() != null) {
if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
- int compare = ByteUtil.UnsafeComparer.INSTANCE
- .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]);
- if (compare > 0) {
- BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
- bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
- bitSetGroup.setBitSet(bitSet, i);
- } else {
- BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
- rawColumnChunk.getRowCount()[i]);
- bitSetGroup.setBitSet(bitSet, i);
- }
+ BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+ rawColumnChunk.getRowCount()[i]);
+ bitSetGroup.setBitSet(bitSet, i);
}
} else {
BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
index 07e3487..323042a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
@@ -52,9 +52,11 @@ public class BitSetGroup {
public void and(BitSetGroup group) {
int i = 0;
for (BitSet bitSet : bitSets) {
- BitSet otherSet = group.getBitSet(i);
+ BitSet otherSet = group.getBitSet(i);
if (bitSet != null && otherSet != null) {
bitSet.and(otherSet);
+ } else {
+ bitSets[i] = null;
}
i++;
}
@@ -63,7 +65,7 @@ public class BitSetGroup {
public void or(BitSetGroup group) {
int i = 0;
for (BitSet bitSet : bitSets) {
- BitSet otherSet = group.getBitSet(i);
+ BitSet otherSet = group.getBitSet(i);
if (bitSet != null && otherSet != null) {
bitSet.or(otherSet);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 25e7cfe..55c0302 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -35,18 +36,22 @@ import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.BlockletBTreeIndex;
import org.apache.carbondata.format.BlockletIndex;
import org.apache.carbondata.format.BlockletInfo;
import org.apache.carbondata.format.BlockletInfo2;
+import org.apache.carbondata.format.BlockletInfo3;
import org.apache.carbondata.format.BlockletMinMaxIndex;
import org.apache.carbondata.format.ChunkCompressionMeta;
import org.apache.carbondata.format.ColumnSchema;
import org.apache.carbondata.format.CompressionCodec;
import org.apache.carbondata.format.DataChunk;
import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
import org.apache.carbondata.format.Encoding;
import org.apache.carbondata.format.FileFooter;
import org.apache.carbondata.format.IndexHeader;
@@ -109,6 +114,50 @@ public class CarbonMetadataUtil {
}
/**
+ * It converts list of BlockletInfoColumnar to FileFooter thrift objects
+ *
+ * @param infoList
+ * @param numCols
+ * @param cardinalities
+ * @return FileFooter
+ */
+ public static FileFooter convertFileFooter3(List<BlockletInfo3> infoList,
+ List<BlockletIndex> blockletIndexs, int[] cardinalities, List<ColumnSchema> columnSchemaList,
+ SegmentProperties segmentProperties) throws IOException {
+ FileFooter footer = getFileFooter3(infoList, blockletIndexs, cardinalities, columnSchemaList);
+ for (BlockletInfo3 info : infoList) {
+ footer.addToBlocklet_info_list3(info);
+ }
+ return footer;
+ }
+
+ /**
+ * Below method will be used to get the file footer object
+ *
+ * @param infoList blocklet info
+ * @param cardinalities cardinlaity of dimension columns
+ * @param columnSchemaList column schema list
+ * @return file footer
+ */
+ private static FileFooter getFileFooter3(List<BlockletInfo3> infoList,
+ List<BlockletIndex> blockletIndexs, int[] cardinalities,
+ List<ColumnSchema> columnSchemaList) {
+ SegmentInfo segmentInfo = new SegmentInfo();
+ segmentInfo.setNum_cols(columnSchemaList.size());
+ segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(cardinalities));
+ ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+ FileFooter footer = new FileFooter();
+ footer.setVersion(version.number());
+ footer.setNum_rows(getNumberOfRowForFooter(infoList));
+ footer.setSegment_info(segmentInfo);
+ footer.setTable_columns(columnSchemaList);
+ for (BlockletIndex info : blockletIndexs) {
+ footer.addToBlocklet_index_list(info);
+ }
+ return footer;
+ }
+
+ /**
* Below method will be used to get the file footer object for
*
* @param infoList blocklet info
@@ -162,6 +211,20 @@ public class CarbonMetadataUtil {
return numberOfRows;
}
+ /**
+ * Get total number of rows for the file.
+ *
+ * @param infoList
+ * @return
+ */
+ private static long getNumberOfRowForFooter(List<BlockletInfo3> infoList) {
+ long numberOfRows = 0;
+ for (BlockletInfo3 info : infoList) {
+ numberOfRows += info.num_rows;
+ }
+ return numberOfRows;
+ }
+
private static BlockletIndex getBlockletIndex(BlockletInfoColumnar info) {
BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
@@ -181,9 +244,52 @@ public class CarbonMetadataUtil {
return blockletIndex;
}
+ public static BlockletIndex getBlockletIndex(List<NodeHolder> nodeHolderList,
+ List<CarbonMeasure> carbonMeasureList) {
+ BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
+ for (byte[] max : nodeHolderList.get(nodeHolderList.size() - 1).getColumnMaxData()) {
+ blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(max));
+ }
+ for (byte[] min : nodeHolderList.get(0).getColumnMinData()) {
+ blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
+ }
+ byte[][] measureMaxValue = nodeHolderList.get(0).getMeasureColumnMaxData().clone();
+ byte[][] measureMinValue = nodeHolderList.get(0).getMeasureColumnMinData().clone();
+ byte[] minVal = null;
+ byte[] maxVal = null;
+ for (int i = 1; i < nodeHolderList.size(); i++) {
+ for (int j = 0; j < measureMinValue.length; j++) {
+ minVal = nodeHolderList.get(i).getMeasureColumnMinData()[j];
+ maxVal = nodeHolderList.get(i).getMeasureColumnMaxData()[j];
+ if (compareMeasureData(measureMaxValue[j], maxVal, carbonMeasureList.get(j).getDataType())
+ < 0) {
+ measureMaxValue[j] = maxVal.clone();
+ }
+ if (compareMeasureData(measureMinValue[j], minVal, carbonMeasureList.get(j).getDataType())
+ > 0) {
+ measureMinValue[j] = minVal.clone();
+ }
+ }
+ }
+
+ for (byte[] max : measureMaxValue) {
+ blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(max));
+ }
+ for (byte[] min : measureMinValue) {
+ blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
+ }
+ BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
+ blockletBTreeIndex.setStart_key(nodeHolderList.get(0).getStartKey());
+ blockletBTreeIndex.setEnd_key(nodeHolderList.get(nodeHolderList.size() - 1).getEndKey());
+ BlockletIndex blockletIndex = new BlockletIndex();
+ blockletIndex.setMin_max_index(blockletMinMaxIndex);
+ blockletIndex.setB_tree_index(blockletBTreeIndex);
+ return blockletIndex;
+ }
+
/**
- * Below method will be used to get the blocklet info object for
- * data version 2 file
+ * Below method will be used to get the blocklet info object for data version
+ * 2 file
*
* @param blockletInfoColumnar blocklet info
* @param dataChunkOffsets data chunks offsets
@@ -222,7 +328,8 @@ public class CarbonMetadataUtil {
encodings.add(Encoding.DIRECT_DICTIONARY);
}
dataChunk.setRowMajor(colGrpblock[i]);
- //TODO : Once schema PR is merged and information needs to be passed here.
+ // TODO : Once schema PR is merged and information needs to be passed
+ // here.
dataChunk.setColumn_ids(new ArrayList<Integer>());
dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]);
dataChunk.setData_page_offset(blockletInfoColumnar.getKeyOffSets()[i]);
@@ -242,7 +349,8 @@ public class CarbonMetadataUtil {
j++;
}
- //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+ // TODO : Right now the encodings are happening at runtime. change as per
+ // this encoders.
dataChunk.setEncoders(encodings);
colDataChunks.add(dataChunk);
@@ -252,24 +360,26 @@ public class CarbonMetadataUtil {
DataChunk dataChunk = new DataChunk();
dataChunk.setChunk_meta(getChunkCompressionMeta());
dataChunk.setRowMajor(false);
- //TODO : Once schema PR is merged and information needs to be passed here.
+ // TODO : Once schema PR is merged and information needs to be passed
+ // here.
dataChunk.setColumn_ids(new ArrayList<Integer>());
dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]);
dataChunk.setData_page_offset(blockletInfoColumnar.getMeasureOffset()[i]);
- //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+ // TODO : Right now the encodings are happening at runtime. change as per
+ // this encoders.
List<Encoding> encodings = new ArrayList<Encoding>();
encodings.add(Encoding.DELTA);
dataChunk.setEncoders(encodings);
- //TODO writing dummy presence meta need to set actual presence
- //meta
+ // TODO writing dummy presence meta need to set actual presence
+ // meta
PresenceMeta presenceMeta = new PresenceMeta();
presenceMeta.setPresent_bit_streamIsSet(true);
presenceMeta
.setPresent_bit_stream(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray());
dataChunk.setPresence(presenceMeta);
- //TODO : PresenceMeta needs to be implemented and set here
+ // TODO : PresenceMeta needs to be implemented and set here
// dataChunk.setPresence(new PresenceMeta());
- //TODO : Need to write ValueCompression meta here.
+ // TODO : Need to write ValueCompression meta here.
List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i))));
@@ -291,7 +401,7 @@ public class CarbonMetadataUtil {
private static boolean containsEncoding(int blockIndex, Encoding encoding,
List<ColumnSchema> columnSchemas, SegmentProperties segmentProperties) {
Set<Integer> dimOrdinals = segmentProperties.getDimensionOrdinalForBlock(blockIndex);
- //column groups will always have dictionary encoding
+ // column groups will always have dictionary encoding
if (dimOrdinals.size() > 1 && Encoding.DICTIONARY == encoding) {
return true;
}
@@ -336,7 +446,8 @@ public class CarbonMetadataUtil {
}
/**
- * It converts FileFooter thrift object to list of BlockletInfoColumnar objects
+ * It converts FileFooter thrift object to list of BlockletInfoColumnar
+ * objects
*
* @param footer
* @return
@@ -486,8 +597,8 @@ public class CarbonMetadataUtil {
}
/**
- * Below method will be used to get the block index info thrift object for each block
- * present in the segment
+ * Below method will be used to get the block index info thrift object for
+ * each block present in the segment
*
* @param blockIndexInfoList block index info list
* @return list of block index
@@ -508,8 +619,7 @@ public class CarbonMetadataUtil {
}
/**
- * Below method will be used to get the data chunk object for all the
- * columns
+ * Below method will be used to get the data chunk object for all the columns
*
* @param blockletInfoColumnar blocklet info
* @param columnSchenma list of columns
@@ -536,7 +646,8 @@ public class CarbonMetadataUtil {
encodings.add(Encoding.DIRECT_DICTIONARY);
}
dataChunk.setRowMajor(colGrpblock[i]);
- //TODO : Once schema PR is merged and information needs to be passed here.
+ // TODO : Once schema PR is merged and information needs to be passed
+ // here.
dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]);
if (aggKeyBlock[i]) {
dataChunk.setRle_page_length(blockletInfoColumnar.getDataIndexMapLength()[aggregateIndex]);
@@ -552,7 +663,8 @@ public class CarbonMetadataUtil {
rowIdIndex++;
}
- //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+ // TODO : Right now the encodings are happening at runtime. change as per
+ // this encoders.
dataChunk.setEncoders(encodings);
colDataChunks.add(dataChunk);
@@ -562,22 +674,24 @@ public class CarbonMetadataUtil {
DataChunk2 dataChunk = new DataChunk2();
dataChunk.setChunk_meta(getChunkCompressionMeta());
dataChunk.setRowMajor(false);
- //TODO : Once schema PR is merged and information needs to be passed here.
+ // TODO : Once schema PR is merged and information needs to be passed
+ // here.
dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]);
- //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+ // TODO : Right now the encodings are happening at runtime. change as per
+ // this encoders.
List<Encoding> encodings = new ArrayList<Encoding>();
encodings.add(Encoding.DELTA);
dataChunk.setEncoders(encodings);
- //TODO writing dummy presence meta need to set actual presence
- //meta
+ // TODO writing dummy presence meta need to set actual presence
+ // meta
PresenceMeta presenceMeta = new PresenceMeta();
presenceMeta.setPresent_bit_streamIsSet(true);
presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor()
.compressByte(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray()));
dataChunk.setPresence(presenceMeta);
- //TODO : PresenceMeta needs to be implemented and set here
+ // TODO : PresenceMeta needs to be implemented and set here
// dataChunk.setPresence(new PresenceMeta());
- //TODO : Need to write ValueCompression meta here.
+ // TODO : Need to write ValueCompression meta here.
List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i))));
@@ -586,4 +700,189 @@ public class CarbonMetadataUtil {
}
return colDataChunks;
}
+
+ /**
+ * Below method will be used to get the data chunk object for all the columns
+ *
+ * @param blockletInfoColumnar blocklet info
+ * @param columnSchenma list of columns
+ * @param segmentProperties segment properties
+ * @return list of data chunks
+ * @throws IOException
+ */
+ private static List<DataChunk2> getDatachunk2(List<NodeHolder> nodeHolderList,
+ List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties, int index,
+ boolean isDimensionColumn) throws IOException {
+ List<DataChunk2> colDataChunks = new ArrayList<DataChunk2>();
+ DataChunk2 dataChunk = null;
+ NodeHolder nodeHolder = null;
+ for (int i = 0; i < nodeHolderList.size(); i++) {
+ nodeHolder = nodeHolderList.get(i);
+ dataChunk = new DataChunk2();
+ dataChunk.min_max = new BlockletMinMaxIndex();
+ dataChunk.setChunk_meta(getChunkCompressionMeta());
+ dataChunk.setNumberOfRowsInpage(nodeHolder.getEntryCount());
+ List<Encoding> encodings = new ArrayList<Encoding>();
+ if (isDimensionColumn) {
+ dataChunk.setData_page_length(nodeHolder.getKeyLengths()[index]);
+ if (containsEncoding(index, Encoding.DICTIONARY, columnSchenma, segmentProperties)) {
+ encodings.add(Encoding.DICTIONARY);
+ }
+ if (containsEncoding(index, Encoding.DIRECT_DICTIONARY, columnSchenma, segmentProperties)) {
+ encodings.add(Encoding.DIRECT_DICTIONARY);
+ }
+ dataChunk.setRowMajor(nodeHolder.getColGrpBlocks()[index]);
+ // TODO : Once schema PR is merged and information needs to be passed
+ // here.
+ if (nodeHolder.getAggBlocks()[index]) {
+ dataChunk.setRle_page_length(nodeHolder.getDataIndexMapLength()[index]);
+ encodings.add(Encoding.RLE);
+ }
+ dataChunk.setSort_state(nodeHolder.getIsSortedKeyBlock()[index] ?
+ SortState.SORT_EXPLICIT :
+ SortState.SORT_NATIVE);
+
+ if (!nodeHolder.getIsSortedKeyBlock()[index]) {
+ dataChunk.setRowid_page_length(nodeHolder.getKeyBlockIndexLength()[index]);
+ encodings.add(Encoding.INVERTED_INDEX);
+ }
+ dataChunk.min_max.addToMax_values(ByteBuffer.wrap(nodeHolder.getColumnMaxData()[index]));
+ dataChunk.min_max.addToMin_values(ByteBuffer.wrap(nodeHolder.getColumnMinData()[index]));
+ } else {
+ dataChunk.setData_page_length(nodeHolder.getDataArray()[index].length);
+ // TODO : Right now the encodings are happening at runtime. change as
+ // per this encoders.
+ dataChunk.setEncoders(encodings);
+
+ dataChunk.setRowMajor(false);
+ // TODO : Right now the encodings are happening at runtime. change as
+ // per this encoders.
+ encodings.add(Encoding.DELTA);
+ dataChunk.setEncoders(encodings);
+ // TODO writing dummy presence meta need to set actual presence
+ // meta
+ PresenceMeta presenceMeta = new PresenceMeta();
+ presenceMeta.setPresent_bit_streamIsSet(true);
+ presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor()
+ .compressByte(nodeHolder.getMeasureNullValueIndex()[index].toByteArray()));
+ dataChunk.setPresence(presenceMeta);
+ List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
+ encoderMetaList.add(ByteBuffer.wrap(serializeEncodeMetaUsingByteBuffer(
+ createValueEncoderMeta(nodeHolder.getCompressionModel(), index))));
+ dataChunk.setEncoder_meta(encoderMetaList);
+ dataChunk.min_max
+ .addToMax_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMaxData()[index]));
+ dataChunk.min_max
+ .addToMin_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMinData()[index]));
+ }
+ dataChunk.setEncoders(encodings);
+ colDataChunks.add(dataChunk);
+ }
+ return colDataChunks;
+ }
+
+ public static DataChunk3 getDataChunk3(List<NodeHolder> nodeHolderList,
+ List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties, int index,
+ boolean isDimensionColumn) throws IOException {
+ List<DataChunk2> dataChunksList =
+ getDatachunk2(nodeHolderList, columnSchenma, segmentProperties, index, isDimensionColumn);
+ int offset = 0;
+ DataChunk3 dataChunk = new DataChunk3();
+ List<Integer> pageOffsets = new ArrayList<>();
+ List<Integer> pageLengths = new ArrayList<>();
+ int length = 0;
+ for (int i = 0; i < dataChunksList.size(); i++) {
+ pageOffsets.add(offset);
+ length =
+ dataChunksList.get(i).getData_page_length() + dataChunksList.get(i).getRle_page_length()
+ + dataChunksList.get(i).getRowid_page_length();
+ pageLengths.add(length);
+ offset += length;
+ }
+ dataChunk.setData_chunk_list(dataChunksList);
+ dataChunk.setPage_length(pageLengths);
+ dataChunk.setPage_offset(pageOffsets);
+ return dataChunk;
+ }
+
+ public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta) {
+ ByteBuffer buffer = null;
+ if (valueEncoderMeta.getType() == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+ buffer = ByteBuffer.allocate(
+ (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+ + 3);
+ buffer.putChar(valueEncoderMeta.getType());
+ buffer.putDouble((Double) valueEncoderMeta.getMaxValue());
+ buffer.putDouble((Double) valueEncoderMeta.getMinValue());
+ buffer.putDouble((Double) valueEncoderMeta.getUniqueValue());
+ } else if (valueEncoderMeta.getType() == CarbonCommonConstants.BIG_INT_MEASURE) {
+ buffer = ByteBuffer.allocate(
+ (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+ + 3);
+ buffer.putChar(valueEncoderMeta.getType());
+ buffer.putLong((Long) valueEncoderMeta.getMaxValue());
+ buffer.putLong((Long) valueEncoderMeta.getMinValue());
+ buffer.putLong((Long) valueEncoderMeta.getUniqueValue());
+ } else {
+ buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + 3);
+ buffer.putChar(valueEncoderMeta.getType());
+ }
+ buffer.putInt(valueEncoderMeta.getDecimal());
+ buffer.put(valueEncoderMeta.getDataTypeSelected());
+ buffer.flip();
+ return buffer.array();
+ }
+
+ public static byte[] getByteValueForMeasure(Object data, DataType dataType) {
+ ByteBuffer b = null;
+ switch (dataType) {
+ case DOUBLE:
+ b = ByteBuffer.allocate(8);
+ b.putDouble((Double) data);
+ b.flip();
+ return b.array();
+ case LONG:
+ case INT:
+ case SHORT:
+ b = ByteBuffer.allocate(8);
+ b.putLong((Long) data);
+ b.flip();
+ return b.array();
+ case DECIMAL:
+ return DataTypeUtil.bigDecimalToByte((BigDecimal)data);
+ default:
+ throw new IllegalArgumentException("Invalid data type");
+ }
+ }
+
+ public static int compareMeasureData(byte[] first, byte[] second, DataType dataType) {
+ ByteBuffer firstBuffer = null;
+ ByteBuffer secondBuffer = null;
+ switch (dataType) {
+ case DOUBLE:
+ firstBuffer = ByteBuffer.allocate(8);
+ firstBuffer.put(first);
+ secondBuffer = ByteBuffer.allocate(8);
+ secondBuffer.put(first);
+ firstBuffer.flip();
+ secondBuffer.flip();
+ return (int) (firstBuffer.getDouble() - secondBuffer.getDouble());
+ case LONG:
+ case INT:
+ case SHORT:
+ firstBuffer = ByteBuffer.allocate(8);
+ firstBuffer.put(first);
+ secondBuffer = ByteBuffer.allocate(8);
+ secondBuffer.put(first);
+ firstBuffer.flip();
+ secondBuffer.flip();
+ return (int) (firstBuffer.getLong() - secondBuffer.getLong());
+ case DECIMAL:
+ return DataTypeUtil.byteToBigDecimal(first)
+ .compareTo(DataTypeUtil.byteToBigDecimal(second));
+ default:
+ throw new IllegalArgumentException("Invalid data type");
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 962d352..39c36ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -26,6 +26,7 @@ import java.util.Properties;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
public final class CarbonProperties {
@@ -85,6 +86,9 @@ public final class CarbonProperties {
validateCarbonDataFileVersion();
validateExecutorStartUpTime();
validatePrefetchBufferSize();
+ validateNumberOfPagesPerBlocklet();
+ validateNumberOfColumnPerIORead();
+ validateNumberOfRowsPerBlockletColumnPage();
}
private void validatePrefetchBufferSize() {
@@ -107,6 +111,93 @@ public final class CarbonProperties {
}
}
+ /**
+ * This method validates the number of pages per blocklet column
+ */
+ private void validateNumberOfPagesPerBlocklet() {
+ String numberOfPagePerBlockletColumnString = carbonProperties
+ .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+ CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+ try {
+ short numberOfPagePerBlockletColumn = Short.parseShort(numberOfPagePerBlockletColumnString);
+ if (numberOfPagePerBlockletColumn
+ < CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MIN
+ || numberOfPagePerBlockletColumn
+ > CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MAX) {
+ LOGGER.info(
+ "The Number Of pages per blocklet column value \"" + numberOfPagePerBlockletColumnString
+ + "\" is invalid. Using the default value \""
+ + CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+ carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+ CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.info(
+ "The Number Of pages per blocklet column value \"" + numberOfPagePerBlockletColumnString
+ + "\" is invalid. Using the default value \""
+ + CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+ carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+ CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+ }
+ }
+
+ /**
+ * This method validates the number of column read in one IO
+ */
+ private void validateNumberOfColumnPerIORead() {
+ String numberofColumnPerIOString = carbonProperties
+ .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+ CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+ try {
+ short numberofColumnPerIO = Short.parseShort(numberofColumnPerIOString);
+ if (numberofColumnPerIO < CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_MIN
+ || numberofColumnPerIO > CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_MAX) {
+ LOGGER.info("The Number Of pages per blocklet column value \"" + numberofColumnPerIOString
+ + "\" is invalid. Using the default value \""
+ + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+ carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+ CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.info("The Number Of pages per blocklet column value \"" + numberofColumnPerIOString
+ + "\" is invalid. Using the default value \""
+ + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+ carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+ CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+ }
+ }
+
+ /**
+ * This method validates the number of column read in one IO
+ */
+ private void validateNumberOfRowsPerBlockletColumnPage() {
+ String numberOfRowsPerBlockletColumnPageString = carbonProperties
+ .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+ CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+ try {
+ short numberOfRowsPerBlockletColumnPage =
+ Short.parseShort(numberOfRowsPerBlockletColumnPageString);
+ if (numberOfRowsPerBlockletColumnPage
+ < CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MIN
+ || numberOfRowsPerBlockletColumnPage
+ > CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MAX) {
+ LOGGER.info("The Number Of rows per blocklet column pages value \""
+ + numberOfRowsPerBlockletColumnPageString + "\" is invalid. Using the default value \""
+ + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+ carbonProperties
+ .setProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+ CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.info("The Number Of rows per blocklet column pages value \""
+ + numberOfRowsPerBlockletColumnPageString + "\" is invalid. Using the default value \""
+ + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+ carbonProperties
+ .setProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+ CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+ }
+ }
+
private void validateBadRecordsLocation() {
String badRecordsLocation =
carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
@@ -288,17 +379,16 @@ public final class CarbonProperties {
carbonProperties.getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
if (carbondataFileVersionString == null) {
// use default property if user does not specify version property
- carbonProperties
- .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
- CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+ carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+ CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
} else {
try {
ColumnarFormatVersion.valueOf(carbondataFileVersionString);
} catch (IllegalArgumentException e) {
// use default property if user specifies an invalid version property
- LOGGER.warn("Specified file version property is invalid: " +
- carbondataFileVersionString + ". Using " +
- CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION + " as default file version");
+ LOGGER.warn("Specified file version property is invalid: " + carbondataFileVersionString
+ + ". Using " + CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION
+ + " as default file version");
carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
}
@@ -569,6 +659,7 @@ public final class CarbonProperties {
/**
* Returns configured update deleta files value for IUD compaction
+ *
* @return numberOfDeltaFilesThreshold
*/
public int getNoUpdateDeltaFilesThresholdForIUDCompaction() {
@@ -588,8 +679,7 @@ public final class CarbonProperties {
}
} catch (NumberFormatException e) {
LOGGER.error("The specified value for property "
- + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
- + "is incorrect."
+ + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
+ " Correct value should be in range of 0 -10000. Taking the default value.");
numberOfDeltaFilesThreshold = Integer
.parseInt(CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
@@ -599,6 +689,7 @@ public final class CarbonProperties {
/**
* Returns configured delete deleta files value for IUD compaction
+ *
* @return numberOfDeltaFilesThreshold
*/
public int getNoDeleteDeltaFilesThresholdForIUDCompaction() {
@@ -618,8 +709,7 @@ public final class CarbonProperties {
}
} catch (NumberFormatException e) {
LOGGER.error("The specified value for property "
- + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
- + "is incorrect."
+ + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
+ " Correct value should be in range of 0 -10000. Taking the default value.");
numberOfDeltaFilesThreshold = Integer
.parseInt(CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b9a96d2..5a656e0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -73,6 +73,7 @@ import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
@@ -471,6 +472,28 @@ public final class CarbonUtil {
numberCompressor.unCompress(indexMap, 0, indexMap.length));
}
+ public static int[] getUnCompressColumnIndex(int totalLength, ByteBuffer buffer, int offset) {
+ buffer.position(offset);
+ int indexDataLength = buffer.getInt();
+ int indexMapLength = totalLength - indexDataLength - CarbonCommonConstants.INT_SIZE_IN_BYTE;
+ int[] indexData = getIntArray(buffer, buffer.position(), indexDataLength);
+ int[] indexMap = getIntArray(buffer, buffer.position(), indexMapLength);
+ return UnBlockIndexer.uncompressIndex(indexData, indexMap);
+ }
+
+ public static int[] getIntArray(ByteBuffer data, int offset, int length) {
+ if (length == 0) {
+ return new int[0];
+ }
+ data.position(offset);
+ int[] intArray = new int[length / 2];
+ int index = 0;
+ while (index < intArray.length) {
+ intArray[index++] = data.getShort();
+ }
+ return intArray;
+ }
+
/**
* Convert int array to Integer list
*
@@ -1233,6 +1256,18 @@ public final class CarbonUtil {
}, offset, length);
}
+ public static DataChunk3 readDataChunk3(ByteBuffer dataChunkBuffer, int offset, int length)
+ throws IOException {
+ byte[] data = new byte[length];
+ dataChunkBuffer.position(offset);
+ dataChunkBuffer.get(data);
+ return (DataChunk3) read(data, new ThriftReader.TBaseCreator() {
+ @Override public TBase create() {
+ return new DataChunk3();
+ }
+ }, 0, length);
+ }
+
public static DataChunk2 readDataChunk(ByteBuffer dataChunkBuffer, int offset, int length)
throws IOException {
byte[] data = new byte[length];
@@ -1293,6 +1328,35 @@ public final class CarbonUtil {
return meta;
}
+ public static ValueEncoderMeta deserializeEncoderMetaNew(byte[] encodeMeta) {
+ ByteBuffer buffer = ByteBuffer.wrap(encodeMeta);
+ char measureType = buffer.getChar();
+ ValueEncoderMeta valueEncoderMeta = new ValueEncoderMeta();
+ valueEncoderMeta.setType(measureType);
+ switch (measureType) {
+ case CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE:
+ valueEncoderMeta.setMaxValue(buffer.getDouble());
+ valueEncoderMeta.setMinValue(buffer.getDouble());
+ valueEncoderMeta.setUniqueValue(buffer.getDouble());
+ break;
+ case CarbonCommonConstants.BIG_DECIMAL_MEASURE:
+ valueEncoderMeta.setMaxValue(0.0);
+ valueEncoderMeta.setMinValue(0.0);
+ valueEncoderMeta.setUniqueValue(0.0);
+ break;
+ case CarbonCommonConstants.BIG_INT_MEASURE:
+ valueEncoderMeta.setMaxValue(buffer.getLong());
+ valueEncoderMeta.setMinValue(buffer.getLong());
+ valueEncoderMeta.setUniqueValue(buffer.getLong());
+ break;
+ default:
+ throw new IllegalArgumentException("invalid measure type");
+ }
+ valueEncoderMeta.setDecimal(buffer.getInt());
+ valueEncoderMeta.setDataTypeSelected(buffer.get());
+ return valueEncoderMeta;
+ }
+
/**
* Below method will be used to convert indexes in range
* Indexes=[0,1,2,3,4,5,6,7,8,9]
@@ -1454,5 +1518,51 @@ public final class CarbonUtil {
return null;
}
}
+
+ /**
+ * Below method will be used to convert byte data to surrogate key based
+ * column value size
+ *
+ * @param data data
+ * @param startOffsetOfData start offset of data
+ * @param eachColumnValueSize size of each column value
+ * @return surrogate key
+ */
+ public static int getSurrogateInternal(byte[] data, int startOffsetOfData,
+ int eachColumnValueSize) {
+ int surrogate = 0;
+ switch (eachColumnValueSize) {
+ case 1:
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData] & 0xFF;
+ return surrogate;
+ case 2:
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData] & 0xFF;
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData + 1] & 0xFF;
+ return surrogate;
+ case 3:
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData] & 0xFF;
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData + 1] & 0xFF;
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData + 2] & 0xFF;
+ return surrogate;
+ case 4:
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData] & 0xFF;
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData + 1] & 0xFF;
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData + 2] & 0xFF;
+ surrogate <<= 8;
+ surrogate ^= data[startOffsetOfData + 3] & 0xFF;
+ return surrogate;
+ default:
+ throw new IllegalArgumentException("Int cannot me more than 4 bytes");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
index 08bfd6d..153fcb9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
@@ -56,8 +56,11 @@ public class DataFileFooterConverterFactory {
switch (version) {
case V1:
return new DataFileFooterConverter();
- default:
+ case V2:
return new DataFileFooterConverter2();
+ case V3:
+ default:
+ return new DataFileFooterConverterV3();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
new file mode 100644
index 0000000..1ab3133
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonFooterReader;
+import org.apache.carbondata.format.FileFooter;
+
+public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
+
+ /**
+ * Below method will be used to convert thrift file meta to wrapper file meta
+ * This method will read the footer from footer offset present in the data file
+ * 1. It will set the stream offset
+ * 2. It will read the footer data from file
+ * 3. parse the footer to thrift object
+ * 4. convert to wrapper object
+ *
+ * @param tableBlockInfo
+ * table block info
+ * @return data file footer
+ */
+ @Override public DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo)
+ throws IOException {
+ DataFileFooter dataFileFooter = new DataFileFooter();
+ CarbonFooterReader reader =
+ new CarbonFooterReader(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset());
+ FileFooter footer = reader.readFooter();
+ dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) footer.getVersion()));
+ dataFileFooter.setNumberOfRows(footer.getNum_rows());
+ dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
+ List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+ List<org.apache.carbondata.format.ColumnSchema> table_columns = footer.getTable_columns();
+ for (int i = 0; i < table_columns.size(); i++) {
+ columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+ }
+ dataFileFooter.setColumnInTable(columnSchemaList);
+
+ List<org.apache.carbondata.format.BlockletIndex> leaf_node_indices_Thrift =
+ footer.getBlocklet_index_list();
+ List<BlockletIndex> blockletIndexList = new ArrayList<BlockletIndex>();
+ for (int i = 0; i < leaf_node_indices_Thrift.size(); i++) {
+ BlockletIndex blockletIndex = getBlockletIndex(leaf_node_indices_Thrift.get(i));
+ blockletIndexList.add(blockletIndex);
+ }
+ List<org.apache.carbondata.format.BlockletInfo3> leaf_node_infos_Thrift =
+ footer.getBlocklet_info_list3();
+ List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
+ for (int i = 0; i < leaf_node_infos_Thrift.size(); i++) {
+ BlockletInfo blockletInfo = getBlockletInfo(leaf_node_infos_Thrift.get(i),
+ getNumberOfDimensionColumns(columnSchemaList));
+ blockletInfo.setBlockletIndex(blockletIndexList.get(i));
+ blockletInfoList.add(blockletInfo);
+ }
+ dataFileFooter.setBlockletList(blockletInfoList);
+ dataFileFooter.setBlockletIndex(getBlockletIndexForDataFileFooter(blockletIndexList));
+ return dataFileFooter;
+ }
+
+ /**
+ * Below method is to convert the blocklet info of the thrift to wrapper
+ * blocklet info
+ *
+ * @param blockletInfoThrift blocklet info of the thrift
+ * @return blocklet info wrapper
+ */
+ private BlockletInfo getBlockletInfo(
+ org.apache.carbondata.format.BlockletInfo3 blockletInfoThrift, int numberOfDimensionColumns) {
+ BlockletInfo blockletInfo = new BlockletInfo();
+ List<Long> dimensionColumnChunkOffsets =
+ blockletInfoThrift.getColumn_data_chunks_offsets().subList(0, numberOfDimensionColumns);
+ List<Long> measureColumnChunksOffsets = blockletInfoThrift.getColumn_data_chunks_offsets()
+ .subList(numberOfDimensionColumns,
+ blockletInfoThrift.getColumn_data_chunks_offsets().size());
+ List<Integer> dimensionColumnChunkLength =
+ blockletInfoThrift.getColumn_data_chunks_length().subList(0, numberOfDimensionColumns);
+ List<Integer> measureColumnChunksLength = blockletInfoThrift.getColumn_data_chunks_length()
+ .subList(numberOfDimensionColumns,
+ blockletInfoThrift.getColumn_data_chunks_offsets().size());
+ blockletInfo.setDimensionChunkOffsets(dimensionColumnChunkOffsets);
+ blockletInfo.setMeasureChunkOffsets(measureColumnChunksOffsets);
+ blockletInfo.setDimensionChunksLength(dimensionColumnChunkLength);
+ blockletInfo.setMeasureChunksLength(measureColumnChunksLength);
+ blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
+ blockletInfo.setDimensionOffset(blockletInfoThrift.getDimension_offsets());
+ blockletInfo.setMeasureOffsets(blockletInfoThrift.getMeasure_offsets());
+ return blockletInfo;
+ }
+
+ /**
+ * Below method will be used to get the number of dimension column
+ * in carbon column schema
+ *
+ * @param columnSchemaList column schema list
+ * @return number of dimension column
+ */
+ private int getNumberOfDimensionColumns(List<ColumnSchema> columnSchemaList) {
+ int numberOfDimensionColumns = 0;
+ int previousColumnGroupId = -1;
+ ColumnSchema columnSchema = null;
+ for (int i = 0; i < columnSchemaList.size(); i++) {
+ columnSchema = columnSchemaList.get(i);
+ if (columnSchema.isDimensionColumn() && columnSchema.isColumnar()) {
+ numberOfDimensionColumns++;
+ } else if (columnSchema.isDimensionColumn()) {
+ if (previousColumnGroupId != columnSchema.getColumnGroupId()) {
+ previousColumnGroupId = columnSchema.getColumnGroupId();
+ numberOfDimensionColumns++;
+ }
+ } else {
+ break;
+ }
+ }
+ return numberOfDimensionColumns;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
new file mode 100644
index 0000000..d46b806
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+
+public class NodeHolder {
+ /**
+ * keyArray
+ */
+ private byte[][] keyArray;
+
+ /**
+ * dataArray
+ */
+ private byte[][] dataArray;
+
+ /**
+ * measureLenght
+ */
+ private int[] measureLenght;
+
+ /**
+ * startKey
+ */
+ private byte[] startKey;
+
+ /**
+ * endKey
+ */
+ private byte[] endKey;
+
+ /**
+ * entryCount
+ */
+ private int entryCount;
+ /**
+ * keyLenghts
+ */
+ private int[] keyLengths;
+
+ /**
+ * dataAfterCompression
+ */
+ private short[][] dataAfterCompression;
+
+ /**
+ * indexMap
+ */
+ private short[][] indexMap;
+
+ /**
+ * keyIndexBlockLenght
+ */
+ private int[] keyBlockIndexLength;
+
+ /**
+ * isSortedKeyBlock
+ */
+ private boolean[] isSortedKeyBlock;
+
+ private byte[][] compressedIndex;
+
+ private byte[][] compressedIndexMap;
+
+ /**
+ * dataIndexMap
+ */
+ private int[] dataIndexMapLength;
+
+ /**
+ * dataIndexMap
+ */
+ private int[] dataIndexMapOffsets;
+
+ /**
+ * compressedDataIndex
+ */
+ private byte[][] compressedDataIndex;
+
+ /**
+ * column max data
+ */
+ private byte[][] columnMaxData;
+
+ /**
+ * column min data
+ */
+ private byte[][] columnMinData;
+
+ private byte[][] measureColumnMaxData;
+
+ private byte[][] measureColumnMinData;
+
+ /**
+ * compression model for numbers data block.
+ */
+ private WriterCompressModel compressionModel;
+
+ /**
+ * array of aggBlocks flag to identify the aggBlocks
+ */
+ private boolean[] aggBlocks;
+
+ /**
+ * all columns max value
+ */
+ private byte[][] allMaxValue;
+
+ /**
+ * all column max value
+ */
+ private byte[][] allMinValue;
+
+ /**
+ * true if given index is colgroup block
+ */
+ private boolean[] colGrpBlock;
+
+ /**
+ * bit set which will holds the measure
+ * indexes which are null
+ */
+ private BitSet[] measureNullValueIndex;
+
+ /**
+ * total length of dimension values
+ */
+ private int totalDimensionArrayLength;
+
+ /**
+ * total length of all measure values
+ */
+ private int totalMeasureArrayLength;
+
+ /**
+ * @return the keyArray
+ */
+ public byte[][] getKeyArray() {
+ return keyArray;
+ }
+
+ /**
+ * @param keyArray the keyArray to set
+ */
+ public void setKeyArray(byte[][] keyArray) {
+ this.keyArray = keyArray;
+ }
+
+ /**
+ * @return the dataArray
+ */
+ public byte[][] getDataArray() {
+ return dataArray;
+ }
+
+ /**
+ * @param dataArray the dataArray to set
+ */
+ public void setDataArray(byte[][] dataArray) {
+ this.dataArray = dataArray;
+ }
+
+ /**
+ * @return the measureLenght
+ */
+ public int[] getMeasureLenght() {
+ return measureLenght;
+ }
+
+ /**
+ * @param measureLenght the measureLenght to set
+ */
+ public void setMeasureLenght(int[] measureLenght) {
+ this.measureLenght = measureLenght;
+ }
+
+ /**
+ * @return the startKey
+ */
+ public byte[] getStartKey() {
+ return startKey;
+ }
+
+ /**
+ * @param startKey the startKey to set
+ */
+ public void setStartKey(byte[] startKey) {
+ this.startKey = startKey;
+ }
+
+ /**
+ * @return the endKey
+ */
+ public byte[] getEndKey() {
+ return endKey;
+ }
+
+ /**
+ * @param endKey the endKey to set
+ */
+ public void setEndKey(byte[] endKey) {
+ this.endKey = endKey;
+ }
+
+ /**
+ * @return the entryCount
+ */
+ public int getEntryCount() {
+ return entryCount;
+ }
+
+ /**
+ * @param entryCount the entryCount to set
+ */
+ public void setEntryCount(int entryCount) {
+ this.entryCount = entryCount;
+ }
+
+ /**
+ * @return the keyLenghts
+ */
+ public int[] getKeyLengths() {
+ return keyLengths;
+ }
+
+ public void setKeyLengths(int[] keyLengths) {
+ this.keyLengths = keyLengths;
+ }
+
+ /**
+ * @return the keyBlockIndexLength
+ */
+ public int[] getKeyBlockIndexLength() {
+ return keyBlockIndexLength;
+ }
+
+ /**
+ * @param keyBlockIndexLength the keyBlockIndexLength to set
+ */
+ public void setKeyBlockIndexLength(int[] keyBlockIndexLength) {
+ this.keyBlockIndexLength = keyBlockIndexLength;
+ }
+
+ /**
+ * @return the isSortedKeyBlock
+ */
+ public boolean[] getIsSortedKeyBlock() {
+ return isSortedKeyBlock;
+ }
+
+ /**
+ * @param isSortedKeyBlock the isSortedKeyBlock to set
+ */
+ public void setIsSortedKeyBlock(boolean[] isSortedKeyBlock) {
+ this.isSortedKeyBlock = isSortedKeyBlock;
+ }
+
+ /**
+ * @return the compressedIndexex
+ */
+ public byte[][] getCompressedIndex() {
+ return compressedIndex;
+ }
+
+ public void setCompressedIndex(byte[][] compressedIndex) {
+ this.compressedIndex = compressedIndex;
+ }
+
+ /**
+ * @return the compressedIndexMap
+ */
+ public byte[][] getCompressedIndexMap() {
+ return compressedIndexMap;
+ }
+
+ /**
+ * @param compressedIndexMap the compressedIndexMap to set
+ */
+ public void setCompressedIndexMap(byte[][] compressedIndexMap) {
+ this.compressedIndexMap = compressedIndexMap;
+ }
+
+ /**
+ * @return the compressedDataIndex
+ */
+ public byte[][] getCompressedDataIndex() {
+ return compressedDataIndex;
+ }
+
+ /**
+ * @param compressedDataIndex the compressedDataIndex to set
+ */
+ public void setCompressedDataIndex(byte[][] compressedDataIndex) {
+ this.compressedDataIndex = compressedDataIndex;
+ }
+
+ /**
+ * @return the dataIndexMapLength
+ */
+ public int[] getDataIndexMapLength() {
+ return dataIndexMapLength;
+ }
+
+ /**
+ * @param dataIndexMapLength the dataIndexMapLength to set
+ */
+ public void setDataIndexMapLength(int[] dataIndexMapLength) {
+ this.dataIndexMapLength = dataIndexMapLength;
+ }
+
+ public byte[][] getColumnMaxData() {
+ return this.columnMaxData;
+ }
+
+ public void setColumnMaxData(byte[][] columnMaxData) {
+ this.columnMaxData = columnMaxData;
+ }
+
+ public byte[][] getColumnMinData() {
+ return this.columnMinData;
+ }
+
+ public void setColumnMinData(byte[][] columnMinData) {
+ this.columnMinData = columnMinData;
+ }
+
+ public WriterCompressModel getCompressionModel() {
+ return compressionModel;
+ }
+
+ public void setCompressionModel(WriterCompressModel compressionModel) {
+ this.compressionModel = compressionModel;
+ }
+
+ /**
+ * returns array of aggBlocks flag to identify the aag blocks
+ *
+ * @return
+ */
+ public boolean[] getAggBlocks() {
+ return aggBlocks;
+ }
+
+ /**
+ * set array of aggBlocks flag to identify the aggBlocks
+ *
+ * @param aggBlocks
+ */
+ public void setAggBlocks(boolean[] aggBlocks) {
+ this.aggBlocks = aggBlocks;
+ }
+
+ /**
+ * @return
+ */
+ public boolean[] getColGrpBlocks() {
+ return this.colGrpBlock;
+ }
+
+ /**
+ * @param colGrpBlock true if block is column group
+ */
+ public void setColGrpBlocks(boolean[] colGrpBlock) {
+ this.colGrpBlock = colGrpBlock;
+ }
+
+ /**
+ * @return the measureNullValueIndex
+ */
+ public BitSet[] getMeasureNullValueIndex() {
+ return measureNullValueIndex;
+ }
+
+ /**
+ * @param measureNullValueIndex the measureNullValueIndex to set
+ */
+ public void setMeasureNullValueIndex(BitSet[] measureNullValueIndex) {
+ this.measureNullValueIndex = measureNullValueIndex;
+ }
+
+ public int getTotalDimensionArrayLength() {
+ return totalDimensionArrayLength;
+ }
+
+ public void setTotalDimensionArrayLength(int totalDimensionArrayLength) {
+ this.totalDimensionArrayLength = totalDimensionArrayLength;
+ }
+
+ public int getTotalMeasureArrayLength() {
+ return totalMeasureArrayLength;
+ }
+
+ public void setTotalMeasureArrayLength(int totalMeasureArrayLength) {
+ this.totalMeasureArrayLength = totalMeasureArrayLength;
+ }
+
+ public byte[][] getMeasureColumnMaxData() {
+ return measureColumnMaxData;
+ }
+
+ public void setMeasureColumnMaxData(byte[][] measureColumnMaxData) {
+ this.measureColumnMaxData = measureColumnMaxData;
+ }
+
+ public byte[][] getMeasureColumnMinData() {
+ return measureColumnMinData;
+ }
+
+ public void setMeasureColumnMinData(byte[][] measureColumnMinData) {
+ this.measureColumnMinData = measureColumnMinData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index 3935bdc..2c6c890 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -155,7 +155,7 @@ public class CarbonMetadataUtilTest {
segmentInfo.setNum_cols(0);
segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
IndexHeader indexHeader = new IndexHeader();
- indexHeader.setVersion(2);
+ indexHeader.setVersion(3);
indexHeader.setSegment_info(segmentInfo);
indexHeader.setTable_columns(columnSchemaList);
indexHeader.setBucket_id(0);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index 759fbf7..3114ee1 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -127,10 +127,21 @@ struct DataChunk2{
7: optional SortState sort_state;
8: optional list<schema.Encoding> encoders; // The List of encoders overriden at node level
9: optional list<binary> encoder_meta; // extra information required by encoders
-}
+ 10: optional BlockletMinMaxIndex min_max;
+ 11: optional i32 numberOfRowsInpage;
+ }
/**
+* Represents a chunk of data. The chunk can be a single column stored in Column Major format or a group of columns stored in Row Major Format.
+**/
+struct DataChunk3{
+ 1: required list<DataChunk2> data_chunk_list; // list of data chunk
+ 2: optional list<i32> page_offset; // offset of each chunk
+ 3: optional list<i32> page_length; // length of each chunk
+
+ }
+/**
* Information about a blocklet
*/
struct BlockletInfo{
@@ -146,7 +157,16 @@ struct BlockletInfo2{
2: required list<i64> column_data_chunks_offsets; // Information about offsets all column chunks in this blocklet
3: required list<i16> column_data_chunks_length; // Information about length all column chunks in this blocklet
}
-
+/**
+* Information about a blocklet
+*/
+struct BlockletInfo3{
+ 1: required i32 num_rows; // Number of rows in this blocklet
+ 2: required list<i64> column_data_chunks_offsets; // Information about offsets all column chunks in this blocklet
+ 3: required list<i32> column_data_chunks_length; // Information about length all column chunks in this blocklet
+ 4: required i64 dimension_offsets;
+ 5: required i64 measure_offsets;
+ }
/**
* Footer for indexed carbon file
*/
@@ -158,7 +178,8 @@ struct FileFooter{
5: required list<BlockletIndex> blocklet_index_list; // blocklet index of all blocklets in this file
6: optional list<BlockletInfo> blocklet_info_list; // Information about blocklets of all columns in this file
7: optional list<BlockletInfo2> blocklet_info_list2; // Information about blocklets of all columns in this file
- 8: optional dictionary.ColumnDictionaryChunk dictionary; // blocklet local dictionary
+ 8: optional list<BlockletInfo3> blocklet_info_list3; // Information about blocklets of all columns in this file
+ 9: optional dictionary.ColumnDictionaryChunk dictionary; // blocklet local dictionary
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
index 828ece8..3f75cd1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
import org.apache.carbondata.processing.store.writer.v2.CarbonFactDataWriterImplV2;
+import org.apache.carbondata.processing.store.writer.v3.CarbonFactDataWriterImplV3;
/**
* Factory class to get the writer instance
@@ -62,8 +63,12 @@ public class CarbonDataWriterFactory {
switch (version) {
case V1:
return new CarbonFactDataWriterImplV1(carbonDataWriterVo);
- default:
+ case V2:
return new CarbonFactDataWriterImplV2(carbonDataWriterVo);
+ case V3:
+ return new CarbonFactDataWriterImplV3(carbonDataWriterVo);
+ default:
+ return new CarbonFactDataWriterImplV3(carbonDataWriterVo);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index bf66700..0699167 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -40,9 +40,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndex;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
import org.apache.carbondata.core.datastore.columnar.IndexStorage;
import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
@@ -59,6 +61,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NodeHolder;
import org.apache.carbondata.core.util.ValueCompressionUtil;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.mdkeygen.file.FileManager;
@@ -70,7 +73,6 @@ import org.apache.carbondata.processing.store.colgroup.ColumnDataHolder;
import org.apache.carbondata.processing.store.colgroup.DataHolder;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
-import org.apache.carbondata.processing.store.writer.NodeHolder;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
@@ -257,6 +259,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private int bucketNumber;
/**
+ * current data format version
+ */
+ private ColumnarFormatVersion version;
+
+ /**
* CarbonFactDataHandler constructor
*/
public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
@@ -326,6 +333,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
aggKeyBlock = arrangeUniqueBlockType(aggKeyBlock);
}
+ version = CarbonProperties.getInstance().getFormatVersion();
}
private void initParameters(CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
@@ -476,7 +484,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
} else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
max[i] = -Double.MAX_VALUE;
} else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- max[i] = new BigDecimal(0.0);
+ max[i] = new BigDecimal(-Double.MAX_VALUE);
} else {
max[i] = 0.0;
}
@@ -748,9 +756,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
// TODO remove after kettle flow is removed
private NodeHolder getNodeHolderObject(byte[][] dataHolderLocal, byte[][] byteArrayValues,
int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
- WriterCompressModel compressionModel, byte[][] noDictionaryData,
- byte[] noDictionaryStartKey, byte[] noDictionaryEndKey)
- throws CarbonDataWriterException {
+ WriterCompressModel compressionModel, byte[][] noDictionaryData, byte[] noDictionaryStartKey,
+ byte[] noDictionaryEndKey) throws CarbonDataWriterException {
byte[][][] noDictionaryColumnsData = null;
List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
int complexColCount = getComplexColsCount();
@@ -836,9 +843,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
if (dimensionType[i]) {
dictionaryColumnCount++;
if (colGrpModel.isColumnar(dictionaryColumnCount)) {
- submit.add(executorService
- .submit(new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(),
- true, isUseInvertedIndex[i])));
+ submit.add(executorService.submit(
+ new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true,
+ isUseInvertedIndex[i])));
} else {
submit.add(
executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
@@ -876,8 +883,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private NodeHolder getNodeHolderObjectWithOutKettle(byte[][] dataHolderLocal,
byte[][] byteArrayValues, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
WriterCompressModel compressionModel, byte[][][] noDictionaryData,
- byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey)
- throws CarbonDataWriterException {
+ byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey) throws CarbonDataWriterException {
byte[][][] noDictionaryColumnsData = null;
List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
int complexColCount = getComplexColsCount();
@@ -907,7 +913,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
int keyLength = splitKey[j].length;
byte[] newKey = new byte[keyLength + 2];
ByteBuffer buffer = ByteBuffer.wrap(newKey);
- buffer.putShort((short)keyLength);
+ buffer.putShort((short) keyLength);
System.arraycopy(splitKey[j], 0, newKey, 2, keyLength);
noDictionaryColumnsData[j][i] = newKey;
}
@@ -963,9 +969,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
if (dimensionType[i]) {
dictionaryColumnCount++;
if (colGrpModel.isColumnar(dictionaryColumnCount)) {
- submit.add(executorService
- .submit(new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(),
- true, isUseInvertedIndex[i])));
+ submit.add(executorService.submit(
+ new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true,
+ isUseInvertedIndex[i])));
} else {
submit.add(
executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
@@ -1008,7 +1014,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
endKeyLocal, compressionModel, composedNonDictStartKey, composedNonDictEndKey);
}
-
/**
* DataHolder will have all row mdkey data
*
@@ -1150,6 +1155,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
return decimalPlaces;
}
+
/**
* This method will be used to update the max value for each measure
*/
@@ -1220,7 +1226,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
- LOGGER.info("Blocklet Size: " + blockletSize);
+ if (version == ColumnarFormatVersion.V3) {
+ this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+ CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT));
+ }
+ LOGGER.info("Number of rows per column blocklet " + blockletSize);
dataRows = new ArrayList<>(this.blockletSize);
int dimSet =
Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE);
@@ -1280,8 +1291,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
.getBlockKeySize());
System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore,
blockKeySize.length - noOfColStore);
- this.dataWriter =
- getFactDataWriter(keyBlockSize);
+ this.dataWriter = getFactDataWriter(keyBlockSize);
this.dataWriter.setIsNoDictionary(isNoDictionary);
// initialize the channel;
this.dataWriter.initializeWriter();
@@ -1377,7 +1387,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* @return data writer instance
*/
private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) {
- ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
return CarbonDataWriterFactory.getInstance()
.getFactDataWriter(version, getDataWriterVo(keyBlockSize));
}
@@ -1620,8 +1629,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
@Override public IndexStorage call() throws Exception {
if (isUseInvertedIndex) {
- return new BlockIndexerStorageForInt(this.data, isCompressionReq, isNoDictionary,
- isSortRequired);
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(this.data, isCompressionReq, isNoDictionary,
+ isSortRequired);
+ } else {
+ return new BlockIndexerStorageForInt(this.data, isCompressionReq, isNoDictionary,
+ isSortRequired);
+ }
} else {
return new BlockIndexerStorageForNoInvertedIndex(this.data, isCompressionReq,
isNoDictionary);