You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/10 03:46:38 UTC
[2/6] carbondata git commit: extract interface
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index edc7ece..53d5dcd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -19,8 +19,6 @@ package org.apache.carbondata.processing.store;
import java.io.File;
import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -36,17 +34,10 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndex;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.GenericDataType;
import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.datastore.page.FixLengthColumnPage;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthEquiSplitGenerator;
@@ -57,17 +48,11 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.NodeHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.row.WriteStepRowUtil;
-import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
import org.apache.carbondata.processing.store.file.FileManager;
import org.apache.carbondata.processing.store.file.IFileManagerComposite;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.store.writer.Encoder;
/**
* Fact data handler class to handle the fact data
@@ -108,7 +93,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* keyBlockHolder
*/
private CarbonKeyBlockHolder[] keyBlockHolder;
- private boolean[] aggKeyBlock;
+
+ // This variable is true if it is dictionary dimension and its cardinality is lower than
+ // property of CarbonCommonConstants.HIGH_CARDINALITY_VALUE
+ // It decides whether it will do RLE encoding on data page for this dimension
+ private boolean[] rleEncodingForDictDimension;
private boolean[] isNoDictionary;
private long processedDataCount;
private ExecutorService producerExecutorService;
@@ -117,7 +106,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private List<Future<Void>> consumerExecutorServiceTaskList;
private List<CarbonRow> dataRows;
private ColumnGroupModel colGrpModel;
- private boolean[] isUseInvertedIndex;
/**
* semaphore which will used for managing node holder objects
*/
@@ -150,15 +138,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private boolean[] isDictDimension;
- private int bucketNumber;
-
- private int taskExtension;
-
/**
* current data format version
*/
private ColumnarFormatVersion version;
+ private DefaultEncoder encoder;
+
/**
* CarbonFactDataHandler constructor
*/
@@ -168,20 +154,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
int numDimColumns = colGrpModel.getNoOfColumnStore() + model.getNoDictionaryCount()
+ getExpandedComplexColsCount();
- this.aggKeyBlock = new boolean[numDimColumns];
+ this.rleEncodingForDictDimension = new boolean[numDimColumns];
this.isNoDictionary = new boolean[numDimColumns];
- this.bucketNumber = model.getBucketId();
- this.taskExtension = model.getTaskExtension();
- this.isUseInvertedIndex = new boolean[numDimColumns];
- if (null != model.getIsUseInvertedIndex()) {
- for (int i = 0; i < isUseInvertedIndex.length; i++) {
- if (i < model.getIsUseInvertedIndex().length) {
- isUseInvertedIndex[i] = model.getIsUseInvertedIndex()[i];
- } else {
- isUseInvertedIndex[i] = true;
- }
- }
- }
+
int noDictStartIndex = this.colGrpModel.getNoOfColumnStore();
// setting true value for dims of high card
for (int i = 0; i < model.getNoDictionaryCount(); i++) {
@@ -198,37 +173,37 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
CarbonCommonConstants.HIGH_CARDINALITY_VALUE,
CarbonCommonConstants.HIGH_CARDINALITY_VALUE_DEFAULTVALUE));
int[] columnSplits = colGrpModel.getColumnSplit();
- int dimCardinalityIndex = 0;
- int aggIndex = 0;
+ int dimCardinalityIndex = -1;
+ int aggIndex = -1;
int[] dimLens = model.getSegmentProperties().getDimColumnsCardinality();
for (int i = 0; i < columnSplits.length; i++) {
- if (colGrpModel.isColumnar(i) && dimLens[dimCardinalityIndex] < noDictionaryValue) {
- this.aggKeyBlock[aggIndex++] = true;
- continue;
- }
dimCardinalityIndex += columnSplits[i];
aggIndex++;
+ if (colGrpModel.isColumnar(i) && dimLens[dimCardinalityIndex] < noDictionaryValue) {
+ this.rleEncodingForDictDimension[aggIndex] = true;
+ }
}
if (model.getDimensionCount() < dimLens.length) {
int allColsCount = getColsCount(model.getDimensionCount());
- List<Boolean> aggKeyBlockWithComplex = new ArrayList<Boolean>(allColsCount);
+ List<Boolean> rleWithComplex = new ArrayList<Boolean>(allColsCount);
for (int i = 0; i < model.getDimensionCount(); i++) {
GenericDataType complexDataType = model.getComplexIndexMap().get(i);
if (complexDataType != null) {
- complexDataType.fillAggKeyBlock(aggKeyBlockWithComplex, this.aggKeyBlock);
+ complexDataType.fillAggKeyBlock(rleWithComplex, this.rleEncodingForDictDimension);
} else {
- aggKeyBlockWithComplex.add(this.aggKeyBlock[i]);
+ rleWithComplex.add(this.rleEncodingForDictDimension[i]);
}
}
- this.aggKeyBlock = new boolean[allColsCount];
+ this.rleEncodingForDictDimension = new boolean[allColsCount];
for (int i = 0; i < allColsCount; i++) {
- this.aggKeyBlock[i] = aggKeyBlockWithComplex.get(i);
+ this.rleEncodingForDictDimension[i] = rleWithComplex.get(i);
}
}
- aggKeyBlock = arrangeUniqueBlockType(aggKeyBlock);
+ rleEncodingForDictDimension = arrangeUniqueBlockType(rleEncodingForDictDimension);
}
- version = CarbonProperties.getInstance().getFormatVersion();
+ this.version = CarbonProperties.getInstance().getFormatVersion();
+ this.encoder = new DefaultEncoder(model);
}
private void initParameters(CarbonFactDataHandlerModel model) {
@@ -357,87 +332,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
}
- class IndexKey {
- private int pageSize;
- byte[] currentMDKey = null;
- byte[][] currentNoDictionaryKey = null;
- byte[] startKey = null;
- byte[] endKey = null;
- byte[][] noDictStartKey = null;
- byte[][] noDictEndKey = null;
- byte[] packedNoDictStartKey = null;
- byte[] packedNoDictEndKey = null;
-
- IndexKey(int pageSize) {
- this.pageSize = pageSize;
- }
-
- /** update all keys based on the input row */
- void update(int rowId, CarbonRow row) throws KeyGenException {
- currentMDKey = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
- if (model.getNoDictionaryCount() > 0 || model.getComplexIndexMap().size() > 0) {
- currentNoDictionaryKey = WriteStepRowUtil.getNoDictAndComplexDimension(row);
- }
- if (rowId == 0) {
- startKey = currentMDKey;
- noDictStartKey = currentNoDictionaryKey;
- }
- endKey = currentMDKey;
- noDictEndKey = currentNoDictionaryKey;
- if (rowId == pageSize - 1) {
- finalizeKeys();
- }
- }
-
- /** update all keys if SORT_COLUMNS option is used when creating table */
- private void finalizeKeys() {
- // If SORT_COLUMNS is used, may need to update start/end keys since the they may
- // contains dictionary columns that are not in SORT_COLUMNS, which need to be removed from
- // start/end key
- int numberOfDictSortColumns = model.getSegmentProperties().getNumberOfDictSortColumns();
- if (numberOfDictSortColumns > 0) {
- // if SORT_COLUMNS contain dictionary columns
- int[] keySize = columnarSplitter.getBlockKeySize();
- if (keySize.length > numberOfDictSortColumns) {
- // if there are some dictionary columns that are not in SORT_COLUMNS, it will come to here
- int newMdkLength = 0;
- for (int i = 0; i < numberOfDictSortColumns; i++) {
- newMdkLength += keySize[i];
- }
- byte[] newStartKeyOfSortKey = new byte[newMdkLength];
- byte[] newEndKeyOfSortKey = new byte[newMdkLength];
- System.arraycopy(startKey, 0, newStartKeyOfSortKey, 0, newMdkLength);
- System.arraycopy(endKey, 0, newEndKeyOfSortKey, 0, newMdkLength);
- startKey = newStartKeyOfSortKey;
- endKey = newEndKeyOfSortKey;
- }
- } else {
- startKey = new byte[0];
- endKey = new byte[0];
- }
-
- // Do the same update for noDictionary start/end Key
- int numberOfNoDictSortColumns = model.getSegmentProperties().getNumberOfNoDictSortColumns();
- if (numberOfNoDictSortColumns > 0) {
- // if sort_columns contain no-dictionary columns
- if (noDictStartKey.length > numberOfNoDictSortColumns) {
- byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][];
- byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][];
- System.arraycopy(
- noDictStartKey, 0, newNoDictionaryStartKey, 0, numberOfNoDictSortColumns);
- System.arraycopy(
- noDictEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns);
- noDictStartKey = newNoDictionaryStartKey;
- noDictEndKey = newNoDictionaryEndKey;
- }
- packedNoDictStartKey =
- NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictStartKey);
- packedNoDictEndKey =
- NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictEndKey);
- }
- }
- }
-
/**
* generate the NodeHolder from the input rows (one page in case of V3 format)
*/
@@ -447,7 +341,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
return new NodeHolder();
}
TablePage tablePage = new TablePage(model, dataRows.size());
- IndexKey keys = new IndexKey(dataRows.size());
+ TablePageKey keys = new TablePageKey(model, dataRows.size());
int rowId = 0;
// convert row to columnar data
@@ -458,26 +352,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
// encode and compress dimensions and measure
- // TODO: To make the encoding more transparent to the user, user should be enable to specify
- // the encoding and compression method for each type when creating table.
-
- Codec codec = new Codec(model.getMeasureDataType());
- IndexStorage[] dimColumns = codec.encodeAndCompressDimensions(tablePage);
- Codec encodedMeasure = codec.encodeAndCompressMeasures(tablePage);
-
- // prepare nullBitSet for writer, remove this after writer can accept TablePage
- BitSet[] nullBitSet = new BitSet[tablePage.getMeasurePage().length];
- FixLengthColumnPage[] measurePages = tablePage.getMeasurePage();
- for (int i = 0; i < nullBitSet.length; i++) {
- nullBitSet[i] = measurePages[i].getNullBitSet();
- }
+ Encoder.EncodedData encodedData = encoder.encode(tablePage);
+
+ TablePageStatistics tablePageStatistics = new TablePageStatistics(
+ model.getTableSpec(), tablePage, encodedData, tablePage.getMeasureStats());
LOGGER.info("Number Of records processed: " + dataRows.size());
// TODO: writer interface should be modified to use TablePage
- return dataWriter.buildDataNodeHolder(dimColumns, encodedMeasure.getEncodedMeasure(),
- dataRows.size(), keys.startKey, keys.endKey, encodedMeasure.getCompressionModel(),
- keys.packedNoDictStartKey, keys.packedNoDictEndKey, nullBitSet);
+ return dataWriter.buildDataNodeHolder(encodedData, tablePageStatistics, keys);
}
/**
@@ -553,15 +436,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
// return the number of complex column after complex columns are expanded
private int getExpandedComplexColsCount() {
- int count = 0;
- int dictDimensionCount = model.getDimensionCount();
- for (int i = 0; i < dictDimensionCount; i++) {
- GenericDataType complexDataType = model.getComplexIndexMap().get(i);
- if (complexDataType != null) {
- count += complexDataType.getColsCount();
- }
- }
- return count;
+ return model.getExpandedComplexColsCount();
}
// return the number of complex column
@@ -722,9 +597,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
carbonDataWriterVo.setStoreLocation(model.getStoreLocation());
carbonDataWriterVo.setMeasureCount(model.getMeasureCount());
carbonDataWriterVo.setTableName(model.getTableName());
- carbonDataWriterVo.setKeyBlockSize(keyBlockSize);
carbonDataWriterVo.setFileManager(fileManager);
- carbonDataWriterVo.setAggBlocks(aggKeyBlock);
+ carbonDataWriterVo.setRleEncodingForDictDim(rleEncodingForDictDimension);
carbonDataWriterVo.setIsComplexType(isComplexTypes());
carbonDataWriterVo.setNoDictionaryCount(model.getNoDictionaryCount());
carbonDataWriterVo.setCarbonDataFileAttributes(model.getCarbonDataFileAttributes());
@@ -735,8 +609,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
carbonDataWriterVo.setColCardinality(model.getColCardinality());
carbonDataWriterVo.setSegmentProperties(model.getSegmentProperties());
carbonDataWriterVo.setTableBlocksize(model.getBlockSizeInMB());
- carbonDataWriterVo.setBucketNumber(bucketNumber);
- carbonDataWriterVo.setTaskExtension(taskExtension);
+ carbonDataWriterVo.setBucketNumber(model.getBucketId());
+ carbonDataWriterVo.setTaskExtension(model.getTaskExtension());
carbonDataWriterVo.setSchemaUpdatedTimeStamp(model.getSchemaUpdatedTimeStamp());
return carbonDataWriterVo;
}
@@ -917,233 +791,4 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
return null;
}
}
-
- private final class BlockSortThread implements Callable<IndexStorage> {
- private int index;
-
- private byte[][] data;
- private boolean isSortRequired;
- private boolean isCompressionReq;
- private boolean isUseInvertedIndex;
-
- private boolean isNoDictionary;
-
- private BlockSortThread(int index, byte[][] data, boolean isSortRequired,
- boolean isUseInvertedIndex) {
- this.index = index;
- this.data = data;
- isCompressionReq = aggKeyBlock[this.index];
- this.isSortRequired = isSortRequired;
- this.isUseInvertedIndex = isUseInvertedIndex;
- }
-
- public BlockSortThread(byte[][] data, boolean compression, boolean isNoDictionary,
- boolean isSortRequired, boolean isUseInvertedIndex) {
- this.data = data;
- this.isCompressionReq = compression;
- this.isNoDictionary = isNoDictionary;
- this.isSortRequired = isSortRequired;
- this.isUseInvertedIndex = isUseInvertedIndex;
- }
-
- @Override public IndexStorage call() throws Exception {
- if (index == 1) {
- int dd = 1 + 1;
- }
- if (isUseInvertedIndex) {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForShort(this.data, isCompressionReq, isNoDictionary,
- isSortRequired);
- } else {
- return new BlockIndexerStorageForInt(this.data, isCompressionReq, isNoDictionary,
- isSortRequired);
- }
- } else {
- if (version == ColumnarFormatVersion.V3) {
- return new BlockIndexerStorageForNoInvertedIndexForShort(this.data,isNoDictionary);
- } else {
- return new BlockIndexerStorageForNoInvertedIndex(this.data);
- }
- }
-
- }
-
- }
-
- public class Codec {
- private WriterCompressModel compressionModel;
- private byte[][] encodedMeasureArray;
- private DataType[] measureType;
-
- Codec(DataType[] measureType) {
- this.measureType = measureType;
- }
-
- public WriterCompressModel getCompressionModel() {
- return compressionModel;
- }
-
- public byte[][] getEncodedMeasure() {
- return encodedMeasureArray;
- }
-
- public Codec encodeAndCompressMeasures(TablePage tablePage) {
- // TODO: following conversion is required only because compress model requires them,
- // remove then after the compress framework is refactoried
- FixLengthColumnPage[] measurePage = tablePage.getMeasurePage();
- int measureCount = measurePage.length;
- Object[] min = new Object[measurePage.length];
- Object[] max = new Object[measurePage.length];
- Object[] uniqueValue = new Object[measurePage.length];
- int[] decimal = new int[measurePage.length];
- for (int i = 0; i < measurePage.length; i++) {
- min[i] = measurePage[i].getStatistics().getMin();
- max[i] = measurePage[i].getStatistics().getMax();
- uniqueValue[i] = measurePage[i].getStatistics().getUniqueValue();
- decimal[i] = measurePage[i].getStatistics().getDecimal();
- }
- // encode and compress measure column page
- compressionModel =
- ValueCompressionUtil.getWriterCompressModel(max, min, decimal, uniqueValue, measureType,
- new byte[measureCount]);
- encodedMeasureArray = encodeMeasure(compressionModel, measurePage);
- return this;
- }
-
- // this method first invokes encoding routine to encode the data chunk,
- // followed by invoking compression routine for preparing the data chunk for writing.
- private byte[][] encodeMeasure(WriterCompressModel compressionModel,
- FixLengthColumnPage[] columnPages) {
-
- CarbonWriteDataHolder[] holders = new CarbonWriteDataHolder[columnPages.length];
- for (int i = 0; i < holders.length; i++) {
- holders[i] = new CarbonWriteDataHolder();
- switch (columnPages[i].getDataType()) {
- case SHORT:
- case INT:
- case LONG:
- holders[i].setWritableLongPage(columnPages[i].getLongPage());
- break;
- case DOUBLE:
- holders[i].setWritableDoublePage(columnPages[i].getDoublePage());
- break;
- case DECIMAL:
- holders[i].setWritableDecimalPage(columnPages[i].getDecimalPage());
- break;
- default:
- throw new RuntimeException("Unsupported data type: " + columnPages[i].getDataType());
- }
- }
-
- DataType[] dataType = compressionModel.getDataType();
- ValueCompressionHolder[] values =
- new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
- byte[][] returnValue = new byte[values.length][];
- for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) {
- values[i] = compressionModel.getValueCompressionHolder()[i];
- if (dataType[i] != DataType.DECIMAL) {
- values[i].setValue(
- ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i])
- .getCompressedValues(compressionModel.getCompressionFinders()[i], holders[i],
- compressionModel.getMaxValue()[i],
- compressionModel.getMantissa()[i]));
- } else {
- values[i].setValue(holders[i].getWritableByteArrayValues());
- }
- values[i].compress();
- returnValue[i] = values[i].getCompressedData();
- }
-
- return returnValue;
- }
-
- /**
- * Encode and compress each column page. The work is done using a thread pool.
- */
- private IndexStorage[] encodeAndCompressDimensions(TablePage tablePage) {
- int noDictionaryCount = tablePage.getNoDictDimensionPage().length;
- int complexColCount = tablePage.getComplexDimensionPage().length;
-
- // thread pool size to be used for encoding dimension
- // each thread will sort the column page data and compress it
- int thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
- CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL));
- ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
- Callable<IndexStorage> callable;
- List<Future<IndexStorage>> submit = new ArrayList<Future<IndexStorage>>(
- model.getPrimitiveDimLens().length + noDictionaryCount + complexColCount);
- int i = 0;
- int dictionaryColumnCount = -1;
- int noDictionaryColumnCount = -1;
- int colGrpId = -1;
- boolean isSortColumn = false;
- SegmentProperties segmentProperties = model.getSegmentProperties();
- for (i = 0; i < isDictDimension.length; i++) {
- isSortColumn = i < segmentProperties.getNumberOfSortColumns();
- if (isDictDimension[i]) {
- dictionaryColumnCount++;
- if (colGrpModel.isColumnar(dictionaryColumnCount)) {
- // dictionary dimension
- callable =
- new BlockSortThread(
- tablePage.getKeyColumnPage().getKeyVector(dictionaryColumnCount),
- true,
- false,
- isSortColumn,
- isUseInvertedIndex[i] & isSortColumn);
-
- } else {
- // column group
- callable = new ColGroupBlockStorage(
- segmentProperties,
- ++colGrpId,
- tablePage.getKeyColumnPage().getKeyVector(dictionaryColumnCount));
- }
- } else {
- // no dictionary dimension
- callable =
- new BlockSortThread(
- tablePage.getNoDictDimensionPage()[++noDictionaryColumnCount].getByteArrayPage(),
- false,
- true,
- isSortColumn,
- isUseInvertedIndex[i] & isSortColumn);
- }
- // start a thread to sort the page data
- submit.add(executorService.submit(callable));
- }
-
- // complex type column
- for (int index = 0; index < getComplexColumnCount(); index++) {
- Iterator<byte[][]> iterator = tablePage.getComplexDimensionPage()[index].iterator();
- while (iterator.hasNext()) {
- byte[][] data = iterator.next();
- callable =
- new BlockSortThread(
- i++,
- data,
- false,
- true);
- submit.add(executorService.submit(callable));
- }
- }
- executorService.shutdown();
- try {
- executorService.awaitTermination(1, TimeUnit.DAYS);
- } catch (InterruptedException e) {
- LOGGER.error(e, e.getMessage());
- }
- IndexStorage[] dimColumns = new IndexStorage[
- colGrpModel.getNoOfColumnStore() + noDictionaryCount + getExpandedComplexColsCount()];
- try {
- for (int k = 0; k < dimColumns.length; k++) {
- dimColumns[k] = submit.get(k).get();
- }
- } catch (Exception e) {
- LOGGER.error(e, e.getMessage());
- }
- return dimColumns;
- }
- }
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index df27dcc..d400a6d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Map;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.GenericDataType;
+import org.apache.carbondata.core.datastore.TableSpec;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.metadata.CarbonMetadata;
@@ -37,7 +39,6 @@ import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.model.CarbonLoadModel;
import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
@@ -153,13 +154,14 @@ public class CarbonFactDataHandlerModel {
// key generator for complex dimension
private KeyGenerator[] complexDimensionKeyGenerator;
+ private TableSpec tableSpec;
+
/**
* Create the model using @{@link CarbonDataLoadConfiguration}
*/
public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
CarbonDataLoadConfiguration configuration, String storeLocation, int bucketId,
int taskExtension) {
-
CarbonTableIdentifier identifier =
configuration.getTableIdentifier().getCarbonTableIdentifier();
boolean[] isUseInvertedIndex =
@@ -247,6 +249,7 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.bucketId = bucketId;
carbonFactDataHandlerModel.segmentId = configuration.getSegmentId();
carbonFactDataHandlerModel.taskExtension = taskExtension;
+ carbonFactDataHandlerModel.tableSpec = configuration.getTableSpec();
return carbonFactDataHandlerModel;
}
@@ -304,6 +307,10 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes);
carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
+
+ carbonFactDataHandlerModel.tableSpec = new TableSpec(
+ segmentProperties.getDimensions(),
+ segmentProperties.getMeasures());
return carbonFactDataHandlerModel;
}
@@ -522,5 +529,26 @@ public class CarbonFactDataHandlerModel {
public int getComplexColumnCount() {
return complexIndexMap.size();
}
+
+ // return the number of complex column after complex columns are expanded
+ public int getExpandedComplexColsCount() {
+ int count = 0;
+ int dictDimensionCount = getDimensionCount();
+ for (int i = 0; i < dictDimensionCount; i++) {
+ GenericDataType complexDataType = getComplexIndexMap().get(i);
+ if (complexDataType != null) {
+ count += complexDataType.getColsCount();
+ }
+ }
+ return count;
+ }
+
+ public boolean isSortColumn(int columnIndex) {
+ return columnIndex < segmentProperties.getNumberOfSortColumns();
+ }
+
+ public TableSpec getTableSpec() {
+ return tableSpec;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
index 16d2da0..260cfc7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
@@ -17,8 +17,8 @@
package org.apache.carbondata.processing.store;
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
public interface CarbonFactHandler {
void initialise() throws CarbonDataWriterException;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java
new file mode 100644
index 0000000..73c4fa1
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import org.apache.carbondata.core.compression.ValueCompressor;
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndex;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.ColGroupBlockStorage;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatistics;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CompressionFinder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.processing.store.writer.Encoder;
+
+// Default encoder for encoding dimension and measures. For dimensions, it applies RLE and
+// inverted index encoding. For measures, it applies delta encoding or adaptive encoding
+public class DefaultEncoder implements Encoder {
+
+ private ColumnarFormatVersion version;
+
+ private boolean[] isUseInvertedIndex;
+
+ private CarbonFactDataHandlerModel model;
+
+ public DefaultEncoder(CarbonFactDataHandlerModel model) {
+ this.version = CarbonProperties.getInstance().getFormatVersion();
+ this.model = model;
+ this.isUseInvertedIndex = model.getIsUseInvertedIndex();
+ }
+
+ // function to encode all columns in one table page
+ public Encoder.EncodedData encode(TablePage tablePage) {
+ Encoder.EncodedData encodedData = new Encoder.EncodedData();
+ encodeAndCompressDimensions(tablePage, encodedData);
+ encodeAndCompressMeasures(tablePage, encodedData);
+ return encodedData;
+ }
+
+ // encode measure and set encodedData in `encodedData`
+ private void encodeAndCompressMeasures(TablePage tablePage, Encoder.EncodedData encodedData) {
+ // TODO: following conversion is required only because compress model requires them,
+ // remove then after the compress framework is refactoried
+ ColumnPage[] measurePage = tablePage.getMeasurePage();
+ int measureCount = measurePage.length;
+ byte[] dataTypeSelected = new byte[measureCount];
+ CompressionFinder[] finders = new CompressionFinder[measureCount];
+ for (int i = 0; i < measureCount; i++) {
+ ColumnPageStatistics stats = measurePage[i].getStatistics();
+ finders[i] = ValueCompressionUtil.getCompressionFinder(
+ stats.getMax(),
+ stats.getMin(),
+ stats.getDecimal(),
+ measurePage[i].getDataType(), dataTypeSelected[i]);
+ }
+
+ //CompressionFinder[] finders = compressionModel.getCompressionFinders();
+ ValueCompressionHolder[] holders = ValueCompressionUtil.getValueCompressionHolder(finders);
+ encodedData.measures = encodeMeasure(holders, finders, measurePage);
+ }
+
+ // this method first invokes encoding routine to encode the data chunk,
+ // followed by invoking compression routine for preparing the data chunk for writing.
+ private byte[][] encodeMeasure(ValueCompressionHolder[] holders,
+ CompressionFinder[] finders,
+ ColumnPage[] columnPages) {
+ ValueCompressionHolder[] values = new ValueCompressionHolder[columnPages.length];
+ byte[][] encodedMeasures = new byte[values.length][];
+ for (int i = 0; i < columnPages.length; i++) {
+ values[i] = holders[i];
+ if (columnPages[i].getDataType() != DataType.DECIMAL) {
+ ValueCompressor compressor =
+ ValueCompressionUtil.getValueCompressor(finders[i]);
+ Object compressed = compressor.getCompressedValues(
+ finders[i],
+ columnPages[i],
+ columnPages[i].getStatistics().getMax(),
+ columnPages[i].getStatistics().getDecimal());
+ values[i].setValue(compressed);
+ } else {
+ // in case of decimal, 'flatten' the byte[][] to byte[]
+ byte[][] decimalPage = columnPages[i].getDecimalPage();
+ int totalSize = 0;
+ for (byte[] decimal : decimalPage) {
+ totalSize += decimal.length;
+ }
+ ByteBuffer temp = ByteBuffer.allocate(totalSize);
+ for (byte[] decimal : decimalPage) {
+ temp.put(decimal);
+ }
+ values[i].setValue(temp.array());
+ }
+ values[i].compress();
+ encodedMeasures[i] = values[i].getCompressedData();
+ }
+
+ return encodedMeasures;
+ }
+
+ private IndexStorage encodeAndCompressDictDimension(byte[][] data, boolean isSort,
+ boolean isUseInvertedIndex) {
+ if (isUseInvertedIndex) {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(data, true, false, isSort);
+ } else {
+ return new BlockIndexerStorageForInt(data, true, false, isSort);
+ }
+ } else {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+ } else {
+ return new BlockIndexerStorageForNoInvertedIndex(data);
+ }
+ }
+ }
+
+ private IndexStorage encodeAndCompressDirectDictDimension(byte[][] data, boolean isSort,
+ boolean isUseInvertedIndex) {
+ if (isUseInvertedIndex) {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(data, false, false, isSort);
+ } else {
+ return new BlockIndexerStorageForInt(data, false, false, isSort);
+ }
+ } else {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+ } else {
+ return new BlockIndexerStorageForNoInvertedIndex(data);
+ }
+ }
+ }
+
+ private IndexStorage encodeAndCompressComplexDimension(byte[][] data) {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(data, false, false, false);
+ } else {
+ return new BlockIndexerStorageForInt(data, false, false, false);
+ }
+ }
+
+ private IndexStorage encodeAndCompressNoDictDimension(byte[][] data, boolean isSort,
+ boolean isUseInvertedIndex) {
+ if (isUseInvertedIndex) {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForShort(data, false, true, isSort);
+ } else {
+ return new BlockIndexerStorageForInt(data, false, true, isSort);
+ }
+ } else {
+ if (version == ColumnarFormatVersion.V3) {
+ return new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
+ } else {
+ return new BlockIndexerStorageForNoInvertedIndex(data);
+ }
+ }
+ }
+
+ // encode and compress each dimension, set encoded data in `encodedData`
+ private void encodeAndCompressDimensions(TablePage tablePage, Encoder.EncodedData encodedData) {
+ TableSpec.DimensionSpec dimensionSpec = model.getTableSpec().getDimensionSpec();
+ int dictionaryColumnCount = -1;
+ int noDictionaryColumnCount = -1;
+ int colGrpId = -1;
+ int indexStorageOffset = 0;
+ IndexStorage[] indexStorages = new IndexStorage[dimensionSpec.getNumExpandedDimensions()];
+ SegmentProperties segmentProperties = model.getSegmentProperties();
+ Compressor compressor = CompressorFactory.getInstance().getCompressor();
+ byte[][] compressedColumns = new byte[indexStorages.length][];
+ for (int i = 0; i < dimensionSpec.getNumSimpleDimensions(); i++) {
+ byte[] flattened;
+ boolean isSortColumn = model.isSortColumn(i);
+ switch (dimensionSpec.getType(i)) {
+ case GLOBAL_DICTIONARY:
+ // dictionary dimension
+ indexStorages[indexStorageOffset] =
+ encodeAndCompressDictDimension(
+ tablePage.getKeyColumnPage().getKeyVector(++dictionaryColumnCount),
+ isSortColumn,
+ isUseInvertedIndex[i] & isSortColumn);
+ flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+ break;
+ case DIRECT_DICTIONARY:
+ // timestamp and date column
+ indexStorages[indexStorageOffset] =
+ encodeAndCompressDirectDictDimension(
+ tablePage.getKeyColumnPage().getKeyVector(++dictionaryColumnCount),
+ isSortColumn,
+ isUseInvertedIndex[i] & isSortColumn);
+ flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+ break;
+ case PLAIN_VALUE:
+ // high cardinality dimension, encoded as plain string
+ indexStorages[indexStorageOffset] =
+ encodeAndCompressNoDictDimension(
+ tablePage.getNoDictDimensionPage()[++noDictionaryColumnCount].getStringPage(),
+ isSortColumn,
+ isUseInvertedIndex[i] & isSortColumn);
+ flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+ break;
+ case COLUMN_GROUP:
+ // column group
+ indexStorages[indexStorageOffset] =
+ new ColGroupBlockStorage(
+ segmentProperties,
+ ++colGrpId,
+ tablePage.getKeyColumnPage().getKeyVector(++dictionaryColumnCount));
+ flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+ break;
+ case COMPLEX:
+ // we need to add complex column at last, so skipping it here
+ continue;
+ default:
+ throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getType(i));
+ }
+ compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
+ indexStorageOffset++;
+ }
+
+ // handle complex type column
+ for (int i = 0; i < dimensionSpec.getNumComplexDimensions(); i++) {
+ Iterator<byte[][]> iterator = tablePage.getComplexDimensionPage()[i].iterator();
+ while (iterator.hasNext()) {
+ byte[][] data = iterator.next();
+ indexStorages[indexStorageOffset] = encodeAndCompressComplexDimension(data);
+ byte[] flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+ compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
+ indexStorageOffset++;
+ }
+ }
+
+ encodedData.indexStorages = indexStorages;
+ encodedData.dimensions = compressedColumns;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index 6a33f34..9b81979 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -30,11 +30,11 @@ import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.sortandgroupby.sortdata.SortTempFileChunkHolder;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 0c02980..fb7ebfb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -25,17 +25,17 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.carbondata.core.datastore.GenericDataType;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
-import org.apache.carbondata.core.datastore.page.FixLengthColumnPage;
import org.apache.carbondata.core.datastore.page.KeyColumnPage;
-import org.apache.carbondata.core.datastore.page.VarLengthColumnPage;
+import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.row.WriteStepRowUtil;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
import org.apache.spark.sql.types.Decimal;
@@ -43,7 +43,7 @@ import org.apache.spark.sql.types.Decimal;
* Represent a page data for all columns, we store its data in columnar layout, so that
* all processing apply to TablePage can be done in vectorized fashion.
*/
-class TablePage {
+public class TablePage {
// For all dimension and measure columns, we store the column data directly in the page,
// the length of the page is the number of rows.
@@ -51,9 +51,11 @@ class TablePage {
// TODO: we should have separate class for key columns so that keys are stored together in
// one vector to make it efficient for sorting
private KeyColumnPage keyColumnPage;
- private VarLengthColumnPage[] noDictDimensionPage;
+ private ColumnPage[] noDictDimensionPage;
private ComplexColumnPage[] complexDimensionPage;
- private FixLengthColumnPage[] measurePage;
+ private ColumnPage[] measurePage;
+
+ private MeasurePageStatsVO measurePageStatistics;
// the num of rows in this page, it must be less than short value (65536)
private int pageSize;
@@ -65,9 +67,9 @@ class TablePage {
this.pageSize = pageSize;
keyColumnPage = new KeyColumnPage(pageSize,
model.getSegmentProperties().getDimensionPartitions().length);
- noDictDimensionPage = new VarLengthColumnPage[model.getNoDictionaryCount()];
+ noDictDimensionPage = new ColumnPage[model.getNoDictionaryCount()];
for (int i = 0; i < noDictDimensionPage.length; i++) {
- noDictDimensionPage[i] = new VarLengthColumnPage(pageSize);
+ noDictDimensionPage[i] = new ColumnPage(DataType.STRING, pageSize);
}
complexDimensionPage = new ComplexColumnPage[model.getComplexColumnCount()];
for (int i = 0; i < complexDimensionPage.length; i++) {
@@ -75,10 +77,10 @@ class TablePage {
// we get the first row.
complexDimensionPage[i] = null;
}
- measurePage = new FixLengthColumnPage[model.getMeasureCount()];
+ measurePage = new ColumnPage[model.getMeasureCount()];
DataType[] dataTypes = model.getMeasureDataType();
for (int i = 0; i < measurePage.length; i++) {
- measurePage[i] = new FixLengthColumnPage(dataTypes[i], pageSize);
+ measurePage[i] = new ColumnPage(dataTypes[i], pageSize);
}
}
@@ -104,9 +106,9 @@ class TablePage {
for (int i = 0; i < noDictAndComplex.length; i++) {
if (i < noDictionaryCount) {
// noDictionary columns, since it is variable length, we need to prepare each
- // element as LV encoded byte array (first two bytes are the length of the array)
+ // element as LV result byte array (first two bytes are the length of the array)
byte[] valueWithLength = addLengthToByteArray(noDictAndComplex[i]);
- noDictDimensionPage[i].putByteArray(rowId, valueWithLength);
+ noDictDimensionPage[i].putData(rowId, valueWithLength);
} else {
// complex columns
addComplexColumn(i - noDictionaryCount, rowId, noDictAndComplex[i]);
@@ -121,13 +123,19 @@ class TablePage {
// in compaction flow the measure with decimal type will come as Spark decimal.
// need to convert it to byte array.
- if (null != value && measurePage[i].getDataType() == DataType.DECIMAL && model
- .isCompactionFlow()) {
+ if (measurePage[i].getDataType() == DataType.DECIMAL &&
+ model.isCompactionFlow() &&
+ value != null) {
BigDecimal bigDecimal = ((Decimal) value).toJavaBigDecimal();
value = DataTypeUtil.bigDecimalToByte(bigDecimal);
}
measurePage[i].putData(rowId, value);
}
+
+ // update statistics if it is last row
+ if (rowId + 1 == pageSize) {
+ this.measurePageStatistics = new MeasurePageStatsVO(measurePage);
+ }
}
/**
@@ -150,7 +158,7 @@ class TablePage {
}
int depthInComplexColumn = getComplexDimensionPage()[index].getDepth();
- // this is the encoded columnar data which will be added to page,
+ // this is the result columnar data which will be added to page,
// size of this list is the depth of complex column, we will fill it by input data
List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>();
for (int k = 0; k < depthInComplexColumn; k++) {
@@ -191,7 +199,7 @@ class TablePage {
return keyColumnPage;
}
- public VarLengthColumnPage[] getNoDictDimensionPage() {
+ public ColumnPage[] getNoDictDimensionPage() {
return noDictDimensionPage;
}
@@ -199,7 +207,17 @@ class TablePage {
return complexDimensionPage;
}
- public FixLengthColumnPage[] getMeasurePage() {
+ public ColumnPage[] getMeasurePage() {
return measurePage;
}
+
+ public MeasurePageStatsVO getMeasureStats() {
+ return measurePageStatistics;
+ }
+
+ public int getPageSize() {
+ return pageSize;
+ }
}
+
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java
new file mode 100644
index 0000000..3cb4777
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.processing.util.NonDictionaryUtil;
+
+public class TablePageKey {
+ private int pageSize;
+
+ private byte[][] currentNoDictionaryKey;
+
+ // MDK start key
+ private byte[] startKey;
+
+ // MDK end key
+ private byte[] endKey;
+
+ // startkey for no dictionary columns
+ private byte[][] noDictStartKey;
+
+ // endkey for no diciotn
+ private byte[][] noDictEndKey;
+
+ // startkey for no dictionary columns after packing into one column
+ private byte[] packedNoDictStartKey;
+
+ // endkey for no dictionary columns after packing into one column
+ private byte[] packedNoDictEndKey;
+
+ private CarbonFactDataHandlerModel model;
+
+ TablePageKey(CarbonFactDataHandlerModel model, int pageSize) {
+ this.model = model;
+ this.pageSize = pageSize;
+ }
+
+ /** update all keys based on the input row */
+ void update(int rowId, CarbonRow row) throws KeyGenException {
+ byte[] currentMDKey = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
+ if (model.getNoDictionaryCount() > 0 || model.getComplexIndexMap().size() > 0) {
+ currentNoDictionaryKey = WriteStepRowUtil.getNoDictAndComplexDimension(row);
+ }
+ if (rowId == 0) {
+ startKey = currentMDKey;
+ noDictStartKey = currentNoDictionaryKey;
+ }
+ endKey = currentMDKey;
+ noDictEndKey = currentNoDictionaryKey;
+ if (rowId == pageSize - 1) {
+ finalizeKeys();
+ }
+ }
+
+ /** update all keys if SORT_COLUMNS option is used when creating table */
+ private void finalizeKeys() {
+ // If SORT_COLUMNS is used, may need to update start/end keys since the they may
+ // contains dictionary columns that are not in SORT_COLUMNS, which need to be removed from
+ // start/end key
+ int numberOfDictSortColumns = model.getSegmentProperties().getNumberOfDictSortColumns();
+ if (numberOfDictSortColumns > 0) {
+ // if SORT_COLUMNS contain dictionary columns
+ int[] keySize = model.getSegmentProperties().getFixedLengthKeySplitter().getBlockKeySize();
+ if (keySize.length > numberOfDictSortColumns) {
+ // if there are some dictionary columns that are not in SORT_COLUMNS, it will come to here
+ int newMdkLength = 0;
+ for (int i = 0; i < numberOfDictSortColumns; i++) {
+ newMdkLength += keySize[i];
+ }
+ byte[] newStartKeyOfSortKey = new byte[newMdkLength];
+ byte[] newEndKeyOfSortKey = new byte[newMdkLength];
+ System.arraycopy(startKey, 0, newStartKeyOfSortKey, 0, newMdkLength);
+ System.arraycopy(endKey, 0, newEndKeyOfSortKey, 0, newMdkLength);
+ startKey = newStartKeyOfSortKey;
+ endKey = newEndKeyOfSortKey;
+ }
+ } else {
+ startKey = new byte[0];
+ endKey = new byte[0];
+ }
+
+ // Do the same update for noDictionary start/end Key
+ int numberOfNoDictSortColumns = model.getSegmentProperties().getNumberOfNoDictSortColumns();
+ if (numberOfNoDictSortColumns > 0) {
+ // if sort_columns contain no-dictionary columns
+ if (noDictStartKey.length > numberOfNoDictSortColumns) {
+ byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][];
+ byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][];
+ System.arraycopy(
+ noDictStartKey, 0, newNoDictionaryStartKey, 0, numberOfNoDictSortColumns);
+ System.arraycopy(
+ noDictEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns);
+ noDictStartKey = newNoDictionaryStartKey;
+ noDictEndKey = newNoDictionaryEndKey;
+ }
+ packedNoDictStartKey =
+ NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictStartKey);
+ packedNoDictEndKey =
+ NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictEndKey);
+ }
+ }
+
+ public byte[] getStartKey() {
+ return startKey;
+ }
+
+ public byte[] getEndKey() {
+ return endKey;
+ }
+
+ public byte[] getNoDictStartKey() {
+ return packedNoDictStartKey;
+ }
+
+ public byte[] getNoDictEndKey() {
+ return packedNoDictEndKey;
+ }
+
+ public int getPageSize() {
+ return pageSize;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
new file mode 100644
index 0000000..2911936
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatistics;
+import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.processing.store.writer.Encoder;
+
+// Statistics of dimension and measure column in a TablePage
+public class TablePageStatistics {
+
+ // number of dimension after complex column expanded
+ private int numDimensionsExpanded;
+
+ // min of each dimension column
+ private byte[][] dimensionMinValue;
+
+ // max of each dimension column
+ private byte[][] dimensionMaxValue;
+
+ // min of each measure column
+ private byte[][] measureMinValue;
+
+ // max os each measure column
+ private byte[][] measureMaxValue;
+
+ // null bit set for each measure column
+ private BitSet[] nullBitSet;
+
+ // measure stats
+ // TODO: there are redundant stats
+ private MeasurePageStatsVO measurePageStatistics;
+
+ private TableSpec tableSpec;
+
+ TablePageStatistics(TableSpec tableSpec, TablePage tablePage,
+ Encoder.EncodedData encodedData, MeasurePageStatsVO measurePageStatistics) {
+ this.numDimensionsExpanded = tableSpec.getDimensionSpec().getNumExpandedDimensions();
+ int numMeasures = tableSpec.getMeasureSpec().getNumMeasures();
+ this.dimensionMinValue = new byte[numDimensionsExpanded][];
+ this.dimensionMaxValue = new byte[numDimensionsExpanded][];
+ this.measureMinValue = new byte[numMeasures][];
+ this.measureMaxValue = new byte[numMeasures][];
+ this.nullBitSet = new BitSet[numMeasures];
+ this.tableSpec = tableSpec;
+ this.measurePageStatistics = measurePageStatistics;
+ updateMinMax(tablePage, encodedData);
+ updateNullBitSet(tablePage);
+ }
+
+ private void updateMinMax(TablePage tablePage, Encoder.EncodedData encodedData) {
+ IndexStorage[] keyStorageArray = encodedData.indexStorages;
+ byte[][] measureArray = encodedData.measures;
+
+ for (int i = 0; i < numDimensionsExpanded; i++) {
+ switch (tableSpec.getDimensionSpec().getType(i)) {
+ case GLOBAL_DICTIONARY:
+ case DIRECT_DICTIONARY:
+ case COLUMN_GROUP:
+ case COMPLEX:
+ dimensionMinValue[i] = keyStorageArray[i].getMin();
+ dimensionMaxValue[i] = keyStorageArray[i].getMax();
+ break;
+ case PLAIN_VALUE:
+ dimensionMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
+ dimensionMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
+ break;
+ }
+ }
+ for (int i = 0; i < measureArray.length; i++) {
+ ColumnPageStatistics stats = tablePage.getMeasurePage()[i].getStatistics();
+ measureMaxValue[i] = stats.minBytes();
+ measureMinValue[i] = stats.maxBytes();
+ }
+ }
+
+ private void updateNullBitSet(TablePage tablePage) {
+ nullBitSet = new BitSet[tablePage.getMeasurePage().length];
+ ColumnPage[] measurePages = tablePage.getMeasurePage();
+ for (int i = 0; i < nullBitSet.length; i++) {
+ nullBitSet[i] = measurePages[i].getNullBitSet();
+ }
+ }
+
+ /**
+ * Below method will be used to update the min or max value
+ * by removing the length from it
+ *
+ * @return min max value without length
+ */
+ private byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
+ ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
+ byte[] actualValue = new byte[buffer.getShort()];
+ buffer.get(actualValue);
+ return actualValue;
+ }
+
+ public byte[][] getDimensionMinValue() {
+ return dimensionMinValue;
+ }
+
+ public byte[][] getDimensionMaxValue() {
+ return dimensionMaxValue;
+ }
+
+ public byte[][] getMeasureMinValue() {
+ return measureMinValue;
+ }
+
+ public byte[][] getMeasureMaxValue() {
+ return measureMaxValue;
+ }
+
+ public BitSet[] getNullBitSet() {
+ return nullBitSet;
+ }
+
+ public MeasurePageStatsVO getMeasurePageStatistics() {
+ return measurePageStatistics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
deleted file mode 100644
index 8fa8432..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.colgroup;
-
-import java.util.concurrent.Callable;
-
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-
-/**
- * it is holder of column group data and also min max for colgroup block data
- */
-public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage> {
-
- private byte[][] data;
-
- private ColGroupMinMax colGrpMinMax;
-
- public ColGroupBlockStorage(SegmentProperties segmentProperties, int colGrpIndex, byte[][] data) {
- colGrpMinMax = new ColGroupMinMax(segmentProperties, colGrpIndex);
- this.data = data;
- for (int i = 0; i < data.length; i++) {
- colGrpMinMax.add(data[i]);
- }
- }
-
- /**
- * sorting is not required for colgroup storage and hence return true
- */
- @Override public boolean isAlreadySorted() {
- return true;
- }
-
- /**
- * for column group storage its not required
- */
- @Override public ColGroupDataHolder getDataAfterComp() {
- //not required for column group storage
- return null;
- }
-
- /**
- * for column group storage its not required
- */
- @Override public ColGroupDataHolder getIndexMap() {
- // not required for column group storage
- return null;
- }
-
- /**
- * for column group storage its not required
- */
- @Override public byte[][] getKeyBlock() {
- return data;
- }
-
- /**
- * for column group storage its not required
- */
- @Override public ColGroupDataHolder getDataIndexMap() {
- //not required for column group
- return null;
- }
-
- /**
- * for column group storage its not required
- */
- @Override public int getTotalSize() {
- return data.length;
- }
-
- @Override public byte[] getMin() {
- return colGrpMinMax.getMin();
- }
-
- @Override public byte[] getMax() {
- return colGrpMinMax.getMax();
- }
-
- /**
- * return self
- */
- @Override public IndexStorage call() throws Exception {
- return this;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 0fc1d64..aaeaf66 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -39,8 +39,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
@@ -67,7 +66,6 @@ import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.format.BlockletInfo3;
import org.apache.carbondata.format.IndexHeader;
import org.apache.carbondata.processing.store.file.FileData;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.io.IOUtils;
@@ -575,59 +573,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
*/
public abstract void writeBlockletData(NodeHolder nodeHolder) throws CarbonDataWriterException;
- protected byte[][] fillAndCompressedKeyBlockData(IndexStorage[] keyStorageArray,
- int entryCount) {
- byte[][] keyBlockData = new byte[keyStorageArray.length][];
- int destPos = 0;
- int keyBlockSizePosition = -1;
- for (int i = 0; i < keyStorageArray.length; i++) {
- destPos = 0;
- //handling for high card dims
- if (!dataWriterVo.getIsComplexType()[i] && !dataWriterVo.getIsDictionaryColumn()[i]) {
- int totalLength = 0;
- // calc size of the total bytes in all the colmns.
- for (int k = 0; k < keyStorageArray[i].getKeyBlock().length; k++) {
- byte[] colValue = keyStorageArray[i].getKeyBlock()[k];
- totalLength += colValue.length;
- }
- keyBlockData[i] = new byte[totalLength];
-
- for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
- int length = keyStorageArray[i].getKeyBlock()[j].length;
- System
- .arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos, length);
- destPos += length;
- }
- } else {
- keyBlockSizePosition++;
- if (dataWriterVo.getAggBlocks()[i]) {
- keyBlockData[i] = new byte[keyStorageArray[i].getTotalSize()];
- for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
- System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos,
- keyStorageArray[i].getKeyBlock()[j].length);
- destPos += keyStorageArray[i].getKeyBlock()[j].length;
- }
- } else {
- if (dataWriterVo.getIsComplexType()[i]) {
- keyBlockData[i] = new byte[keyStorageArray[i].getKeyBlock().length * dataWriterVo
- .getKeyBlockSize()[keyBlockSizePosition]];
- } else {
- keyBlockData[i] =
- new byte[entryCount * dataWriterVo.getKeyBlockSize()[keyBlockSizePosition]];
- }
- for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
- System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos,
- dataWriterVo.getKeyBlockSize()[keyBlockSizePosition]);
- destPos += dataWriterVo.getKeyBlockSize()[keyBlockSizePosition];
- }
- }
- }
- keyBlockData[i] = CompressorFactory.getInstance().getCompressor()
- .compressByte(keyBlockData[i]);
- }
- return keyBlockData;
- }
-
/**
* Below method will be used to update the min or max value
* by removing the length from it
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 8fbf9c0..defa23a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -36,9 +36,7 @@ public class CarbonDataWriterVo {
private IFileManagerComposite fileManager;
- private int[] keyBlockSize;
-
- private boolean[] aggBlocks;
+ private boolean[] rleEncodingForDictDim;
private boolean[] isComplexType;
@@ -123,31 +121,17 @@ public class CarbonDataWriterVo {
}
/**
- * @return the keyBlockSize
- */
- public int[] getKeyBlockSize() {
- return keyBlockSize;
- }
-
- /**
- * @param keyBlockSize the keyBlockSize to set
- */
- public void setKeyBlockSize(int[] keyBlockSize) {
- this.keyBlockSize = keyBlockSize;
- }
-
- /**
- * @return the aggBlocks
+ * @return the rleEncodingForDictDim
*/
- public boolean[] getAggBlocks() {
- return aggBlocks;
+ public boolean[] getRleEncodingForDictDim() {
+ return rleEncodingForDictDim;
}
/**
- * @param aggBlocks the aggBlocks to set
+ * @param rleEncodingForDictDim the rleEncodingForDictDim to set
*/
- public void setAggBlocks(boolean[] aggBlocks) {
- this.aggBlocks = aggBlocks;
+ public void setRleEncodingForDictDim(boolean[] rleEncodingForDictDim) {
+ this.rleEncodingForDictDim = rleEncodingForDictDim;
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index c8f740b..8ee08c4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -17,33 +17,19 @@
package org.apache.carbondata.processing.store.writer;
-import java.util.BitSet;
-
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.util.NodeHolder;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.store.TablePageKey;
+import org.apache.carbondata.processing.store.TablePageStatistics;
public interface CarbonFactDataWriter<T> {
/**
- * This method will be used to write leaf data to file
- * file format
- * <key><measure1><measure2>....
- *
- * @param measureArray measure array
- * @param entryCount number of entries
- * @param startKey start key of leaf
- * @param endKey end key of leaf
- * @param noDictionaryEndKey
- * @param noDictionaryStartKey
- * @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem
+ * This method will be used to create NodeHolder for a table page
*/
- NodeHolder buildDataNodeHolder(IndexStorage<T>[] keyStorageArray, byte[][] measureArray,
- int entryCount, byte[] startKey, byte[] endKey, WriterCompressModel compressionModel,
- byte[] noDictionaryStartKey, byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
- throws CarbonDataWriterException;
+ NodeHolder buildDataNodeHolder(Encoder.EncodedData encoded, TablePageStatistics stats,
+ TablePageKey key) throws CarbonDataWriterException;
/**
* If node holder flag is enabled the object will be added to list
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/writer/Encoder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/Encoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/Encoder.java
new file mode 100644
index 0000000..c2d0214
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/Encoder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store.writer;
+
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.processing.store.TablePage;
+
+public interface Encoder {
+
+ EncodedData encode(TablePage tablePage);
+
+ // result result of all columns
+ class EncodedData {
+ // dimension data that include rowid (index)
+ public IndexStorage[] indexStorages;
+
+ // encoded and compressed dimension data
+ public byte[][] dimensions;
+
+ // encoded and compressed measure data
+ public byte[][] measures;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/writer/exception/CarbonDataWriterException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/exception/CarbonDataWriterException.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/exception/CarbonDataWriterException.java
deleted file mode 100644
index 9ac3481..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/exception/CarbonDataWriterException.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.writer.exception;
-
-import java.util.Locale;
-
-public class CarbonDataWriterException extends RuntimeException {
-
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * The Error message.
- */
- private String msg = "";
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public CarbonDataWriterException(String msg) {
- super(msg);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public CarbonDataWriterException(String msg, Throwable t) {
- super(msg, t);
- this.msg = msg;
- }
-
- /**
- * This method is used to get the localized message.
- *
- * @param locale - A Locale object represents a specific geographical,
- * political, or cultural region.
- * @return - Localized error message.
- */
- public String getLocalizedMessage(Locale locale) {
- return "";
- }
-
- /**
- * getLocalizedMessage
- */
- @Override public String getLocalizedMessage() {
- return super.getLocalizedMessage();
- }
-
- /**
- * getMessage
- */
- public String getMessage() {
- return this.msg;
- }
-
-}