You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/05/27 12:50:20 UTC
[2/4] carbondata git commit: add TablePage
http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index 1f48324..e4b8a46 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -375,9 +375,9 @@ public class SortParameters {
parameters.setTaskNo(configuration.getTaskNo());
parameters.setMeasureColCount(configuration.getMeasureCount());
parameters.setDimColCount(
- configuration.getDimensionCount() - configuration.getComplexDimensionCount());
+ configuration.getDimensionCount() - configuration.getComplexColumnCount());
parameters.setNoDictionaryCount(configuration.getNoDictionaryCount());
- parameters.setComplexDimColCount(configuration.getComplexDimensionCount());
+ parameters.setComplexDimColCount(configuration.getComplexColumnCount());
parameters.setNoDictionaryDimnesionColumn(
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration));
@@ -466,8 +466,7 @@ public class SortParameters {
CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
- DataType[] measureDataType = CarbonDataProcessorUtil
- .getMeasureDataType(configuration.getMeasureCount(), configuration.getMeasureFields());
+ DataType[] measureDataType = configuration.getMeasureDataType();
parameters.setMeasureDataType(measureDataType);
return parameters;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 9a7450a..f165dcc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -17,17 +17,11 @@
package org.apache.carbondata.processing.store;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.File;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -52,25 +46,21 @@ import org.apache.carbondata.core.datastore.columnar.IndexStorage;
import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
import org.apache.carbondata.core.datastore.page.FixLengthColumnPage;
-import org.apache.carbondata.core.datastore.page.VarLengthColumnPage;
import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthEquiSplitGenerator;
-import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.NodeHolder;
import org.apache.carbondata.core.util.ValueCompressionUtil;
import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.WriteStepRowUtil;
import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
import org.apache.carbondata.processing.store.file.FileManager;
import org.apache.carbondata.processing.store.file.IFileManagerComposite;
@@ -79,8 +69,6 @@ import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
import org.apache.carbondata.processing.util.NonDictionaryUtil;
-import org.apache.spark.sql.types.Decimal;
-
/**
* Fact data handler class to handle the fact data
*/
@@ -91,6 +79,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private static final LogService LOGGER =
LogServiceFactory.getLogService(CarbonFactDataHandlerColumnar.class.getName());
+
+ private CarbonFactDataHandlerModel model;
+
/**
* data writer
*/
@@ -103,49 +94,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* total number of entries in blocklet
*/
private int entryCount;
- private Map<Integer, GenericDataType> complexIndexMap;
- /**
- * measure count
- */
- private int measureCount;
- /**
- * measure count
- */
- private int dimensionCount;
- /**
- * index of mdkey in incoming rows
- */
- private int mdKeyIndex;
+
/**
* blocklet size (for V1 and V2) or page size (for V3). A Producer thread will start to process
* once this size of input is reached
*/
private int blockletSize;
/**
- * mdkeyLength
- */
- private int mdkeyLength;
- /**
- * storeLocation
- */
- private String storeLocation;
- /**
- * databaseName
- */
- private String databaseName;
- /**
- * tableName
- */
- private String tableName;
- /**
- * table block size in MB
- */
- private int tableBlockSize;
- /**
- * dimLens
- */
- private int[] dimLens;
- /**
* keyGenerator
*/
private ColumnarSplitter columnarSplitter;
@@ -156,23 +111,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private boolean[] aggKeyBlock;
private boolean[] isNoDictionary;
private long processedDataCount;
- private KeyGenerator[] complexKeyGenerator;
private ExecutorService producerExecutorService;
private List<Future<Void>> producerExecutorServiceTaskList;
private ExecutorService consumerExecutorService;
private List<Future<Void>> consumerExecutorServiceTaskList;
- private List<Object[]> dataRows;
- private int noDictionaryCount;
+ private List<CarbonRow> dataRows;
private ColumnGroupModel colGrpModel;
- private int[] primitiveDimLens;
- private DataType[] type;
- private int[] completeDimLens;
private boolean[] isUseInvertedIndex;
/**
- * data file attributes which will used for file construction
- */
- private CarbonDataFileAttributes carbonDataFileAttributes;
- /**
* semaphore which will used for managing node holder objects
*/
private Semaphore semaphore;
@@ -197,19 +143,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* flag to check whether all blocklets have been finished writing
*/
private boolean processingComplete;
- /**
- * data directory location in carbon store path
- */
- private String carbonDataDirectoryPath;
- /**
- * no of complex dimensions
- */
- private int complexColCount;
-
- /**
- * column schema present in the table
- */
- private List<ColumnSchema> wrapperColumnSchemaList;
/**
* boolean to check whether dimension
@@ -217,24 +150,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private boolean[] isDictDimension;
- /**
- * colCardinality for the merge case.
- */
- private int[] colCardinality;
-
- /**
- * Segment properties
- */
- private SegmentProperties segmentProperties;
- /**
- * flag to check for compaction flow
- */
- private boolean compactionFlow;
-
private int bucketNumber;
- private long schemaUpdatedTimeStamp;
-
private int taskExtension;
/**
@@ -245,24 +162,21 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
/**
* CarbonFactDataHandler constructor
*/
- public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
- initParameters(carbonFactDataHandlerModel);
- this.dimensionCount = carbonFactDataHandlerModel.getDimensionCount();
- this.complexIndexMap = carbonFactDataHandlerModel.getComplexIndexMap();
- this.primitiveDimLens = carbonFactDataHandlerModel.getPrimitiveDimLens();
- this.carbonDataDirectoryPath = carbonFactDataHandlerModel.getCarbonDataDirectoryPath();
- this.complexColCount = getExpandedComplexColsCount();
-
- int numDimColumns = colGrpModel.getNoOfColumnStore() + noDictionaryCount + complexColCount;
+ public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel model) {
+ this.model = model;
+ initParameters(model);
+
+ int numDimColumns = colGrpModel.getNoOfColumnStore() + model.getNoDictionaryCount()
+ + getExpandedComplexColsCount();
this.aggKeyBlock = new boolean[numDimColumns];
this.isNoDictionary = new boolean[numDimColumns];
- this.bucketNumber = carbonFactDataHandlerModel.getBucketId();
- this.taskExtension = carbonFactDataHandlerModel.getTaskExtension();
+ this.bucketNumber = model.getBucketId();
+ this.taskExtension = model.getTaskExtension();
this.isUseInvertedIndex = new boolean[numDimColumns];
- if (null != carbonFactDataHandlerModel.getIsUseInvertedIndex()) {
+ if (null != model.getIsUseInvertedIndex()) {
for (int i = 0; i < isUseInvertedIndex.length; i++) {
- if (i < carbonFactDataHandlerModel.getIsUseInvertedIndex().length) {
- isUseInvertedIndex[i] = carbonFactDataHandlerModel.getIsUseInvertedIndex()[i];
+ if (i < model.getIsUseInvertedIndex().length) {
+ isUseInvertedIndex[i] = model.getIsUseInvertedIndex()[i];
} else {
isUseInvertedIndex[i] = true;
}
@@ -270,20 +184,23 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
int noDictStartIndex = this.colGrpModel.getNoOfColumnStore();
// setting true value for dims of high card
- for (int i = 0; i < noDictionaryCount; i++) {
+ for (int i = 0; i < model.getNoDictionaryCount(); i++) {
this.isNoDictionary[noDictStartIndex + i] = true;
}
- boolean isAggKeyBlock = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
+ boolean isAggKeyBlock = Boolean.parseBoolean(
+ CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
if (isAggKeyBlock) {
- int noDictionaryValue = Integer.parseInt(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.HIGH_CARDINALITY_VALUE,
+ int noDictionaryValue = Integer.parseInt(
+ CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.HIGH_CARDINALITY_VALUE,
CarbonCommonConstants.HIGH_CARDINALITY_VALUE_DEFAULTVALUE));
int[] columnSplits = colGrpModel.getColumnSplit();
int dimCardinalityIndex = 0;
int aggIndex = 0;
+ int[] dimLens = model.getSegmentProperties().getDimColumnsCardinality();
for (int i = 0; i < columnSplits.length; i++) {
if (colGrpModel.isColumnar(i) && dimLens[dimCardinalityIndex] < noDictionaryValue) {
this.aggKeyBlock[aggIndex++] = true;
@@ -293,11 +210,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
aggIndex++;
}
- if (dimensionCount < dimLens.length) {
- int allColsCount = getColsCount(dimensionCount);
+ if (model.getDimensionCount() < dimLens.length) {
+ int allColsCount = getColsCount(model.getDimensionCount());
List<Boolean> aggKeyBlockWithComplex = new ArrayList<Boolean>(allColsCount);
- for (int i = 0; i < dimensionCount; i++) {
- GenericDataType complexDataType = complexIndexMap.get(i);
+ for (int i = 0; i < model.getDimensionCount(); i++) {
+ GenericDataType complexDataType = model.getComplexIndexMap().get(i);
if (complexDataType != null) {
complexDataType.fillAggKeyBlock(aggKeyBlockWithComplex, this.aggKeyBlock);
} else {
@@ -314,35 +231,20 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
version = CarbonProperties.getInstance().getFormatVersion();
}
- private void initParameters(CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
- this.databaseName = carbonFactDataHandlerModel.getDatabaseName();
- this.tableBlockSize = carbonFactDataHandlerModel.getBlockSizeInMB();
- this.tableName = carbonFactDataHandlerModel.getTableName();
- this.type = carbonFactDataHandlerModel.getMeasureDataType();
- this.segmentProperties = carbonFactDataHandlerModel.getSegmentProperties();
- this.wrapperColumnSchemaList = carbonFactDataHandlerModel.getWrapperColumnSchema();
- this.colCardinality = carbonFactDataHandlerModel.getColCardinality();
- this.storeLocation = carbonFactDataHandlerModel.getStoreLocation();
- this.measureCount = carbonFactDataHandlerModel.getMeasureCount();
- this.mdkeyLength = carbonFactDataHandlerModel.getMdKeyLength();
- this.mdKeyIndex = carbonFactDataHandlerModel.getMdKeyIndex();
- this.noDictionaryCount = carbonFactDataHandlerModel.getNoDictionaryCount();
- this.colGrpModel = segmentProperties.getColumnGroupModel();
- this.completeDimLens = carbonFactDataHandlerModel.getDimLens();
- this.dimLens = this.segmentProperties.getDimColumnsCardinality();
- this.carbonDataFileAttributes = carbonFactDataHandlerModel.getCarbonDataFileAttributes();
- this.schemaUpdatedTimeStamp = carbonFactDataHandlerModel.getSchemaUpdatedTimeStamp();
+ private void initParameters(CarbonFactDataHandlerModel model) {
+
+ this.colGrpModel = model.getSegmentProperties().getColumnGroupModel();
//TODO need to pass carbon table identifier to metadata
- CarbonTable carbonTable = CarbonMetadata.getInstance()
- .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+ CarbonTable carbonTable =
+ CarbonMetadata.getInstance().getCarbonTable(
+ model.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + model.getTableName());
isDictDimension =
- CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(tableName));
+ CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(model.getTableName()));
- this.compactionFlow = carbonFactDataHandlerModel.isCompactionFlow();
// in compaction flow the measure with decimal type will come as spark decimal.
// need to convert it to byte array.
- if (compactionFlow) {
+ if (model.isCompactionFlow()) {
try {
numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.NUM_CORES_COMPACTING,
@@ -397,7 +299,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private void setComplexMapSurrogateIndex(int dimensionCount) {
int surrIndex = 0;
for (int i = 0; i < dimensionCount; i++) {
- GenericDataType complexDataType = complexIndexMap.get(i);
+ GenericDataType complexDataType = model.getComplexIndexMap().get(i);
if (complexDataType != null) {
List<GenericDataType> primitiveTypes = new ArrayList<GenericDataType>();
complexDataType.getAllPrimitiveChildren(primitiveTypes);
@@ -418,7 +320,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
public void initialise() throws CarbonDataWriterException {
fileManager = new FileManager();
- fileManager.setName(new File(this.storeLocation).getName());
+ fileManager.setName(new File(model.getStoreLocation()).getName());
setWritingConfiguration();
}
@@ -428,11 +330,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* @param row
* @throws CarbonDataWriterException
*/
- public void addDataToStore(Object[] row) throws CarbonDataWriterException {
+ public void addDataToStore(CarbonRow row) throws CarbonDataWriterException {
dataRows.add(row);
this.entryCount++;
- // if entry count reaches to leaf node size then we are ready to
- // write
+ // if entry count reaches to leaf node size then we are ready to write
// this to leaf node file and update the intermediate files
if (this.entryCount == this.blockletSize) {
try {
@@ -472,10 +373,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
/** update all keys based on the input row */
- void update(int rowId, Object[] row) {
- currentMDKey = (byte[]) row[mdKeyIndex];
- if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
- currentNoDictionaryKey = (byte[][]) row[mdKeyIndex - 1];
+ void update(int rowId, CarbonRow row) throws KeyGenException {
+ currentMDKey = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
+ if (model.getNoDictionaryCount() > 0 || model.getComplexIndexMap().size() > 0) {
+ currentNoDictionaryKey = WriteStepRowUtil.getNoDictAndComplexDimension(row);
}
if (rowId == 0) {
startKey = currentMDKey;
@@ -493,7 +394,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
// If SORT_COLUMNS is used, may need to update start/end keys since the they may
// contains dictionary columns that are not in SORT_COLUMNS, which need to be removed from
// start/end key
- int numberOfDictSortColumns = segmentProperties.getNumberOfDictSortColumns();
+ int numberOfDictSortColumns = model.getSegmentProperties().getNumberOfDictSortColumns();
if (numberOfDictSortColumns > 0) {
// if SORT_COLUMNS contain dictionary columns
int[] keySize = columnarSplitter.getBlockKeySize();
@@ -516,7 +417,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
// Do the same update for noDictionary start/end Key
- int numberOfNoDictSortColumns = segmentProperties.getNumberOfNoDictSortColumns();
+ int numberOfNoDictSortColumns = model.getSegmentProperties().getNumberOfNoDictSortColumns();
if (numberOfNoDictSortColumns > 0) {
// if sort_columns contain no-dictionary columns
if (noDictStartKey.length > numberOfNoDictSortColumns) {
@@ -538,161 +439,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
}
/**
- * Represent a page data for all columns, we store its data in columnar layout, so that
- * all processing apply to TablePage can be done in vectorized fashion.
- */
- class TablePage {
-
- // For all dimension and measure columns, we store the column data directly in the page,
- // the length of the page is the number of rows.
-
- // TODO: we should have separate class for key columns so that keys are stored together in
- // one vector to make it efficient for sorting
- VarLengthColumnPage[] dictDimensionPage;
- VarLengthColumnPage[] noDictDimensionPage;
- ComplexColumnPage[] complexDimensionPage;
- FixLengthColumnPage[] measurePage;
-
- // the num of rows in this page, it must be less than short value (65536)
- int pageSize;
-
- TablePage(int pageSize) {
- this.pageSize = pageSize;
- dictDimensionPage = new VarLengthColumnPage[dimensionCount];
- for (int i = 0; i < dictDimensionPage.length; i++) {
- dictDimensionPage[i] = new VarLengthColumnPage(pageSize);
- }
- noDictDimensionPage = new VarLengthColumnPage[noDictionaryCount];
- for (int i = 0; i < noDictDimensionPage.length; i++) {
- noDictDimensionPage[i] = new VarLengthColumnPage(pageSize);
- }
- complexDimensionPage = new ComplexColumnPage[getComplexColumnCount()];
- for (int i = 0; i < complexDimensionPage.length; i++) {
- // here we still do not the depth of the complex column, it will be initialized when
- // we get the first row.
- complexDimensionPage[i] = null;
- }
- measurePage = new FixLengthColumnPage[measureCount];
- for (int i = 0; i < measurePage.length; i++) {
- measurePage[i] = new FixLengthColumnPage(type[i], pageSize);
- }
- }
-
- /**
- * Add one row to the internal store, it will be converted into columnar layout
- * @param rowId Id of the input row
- * @param rows row object
- */
- void addRow(int rowId, Object[] rows) {
-
- // convert dictionary columns
- byte[] MDKey = (byte[]) rows[mdKeyIndex];
- if (columnarSplitter != null) {
- byte[][] splitKey = columnarSplitter.splitKey(MDKey);
- for (int i = 0; i < splitKey.length; i++) {
- dictDimensionPage[i].putByteArray(rowId, splitKey[i]);
- }
- }
-
- // convert noDictionary columns and complex columns.
- if (noDictionaryCount > 0 || complexColCount > 0) {
- byte[][] noDictAndComplex = (byte[][])(rows[mdKeyIndex - 1]);
- for (int i = 0; i < noDictAndComplex.length; i++) {
- if (i < noDictionaryCount) {
- // noDictionary columns, since it is variable length, we need to prepare each
- // element as LV encoded byte array (first two bytes are the length of the array)
- byte[] valueWithLength = addLengthToByteArray(noDictAndComplex[i]);
- noDictDimensionPage[i].putByteArray(rowId, valueWithLength);
- } else {
- // complex columns
- addComplexColumn(i - noDictionaryCount, rowId, noDictAndComplex[i]);
- }
- }
- }
-
- // convert measure columns
- for (int i = 0; i < type.length; i++) {
- Object value = rows[i];
-
- // in compaction flow the measure with decimal type will come as spark decimal.
- // need to convert it to byte array.
- if (type[i] == DataType.DECIMAL && compactionFlow) {
- BigDecimal bigDecimal = ((Decimal) rows[i]).toJavaBigDecimal();
- value = DataTypeUtil.bigDecimalToByte(bigDecimal);
- }
- measurePage[i].putData(rowId, value);
- }
- }
-
- /**
- * add a complex column into internal member compleDimensionPage
- * @param index index of the complexDimensionPage
- * @param rowId Id of the input row
- * @param complexColumns byte array the complex columm to be added, extracted of input row
- */
- // TODO: this function should be refactoried, ColumnPage should support complex type encoding
- // directly instead of doing it here
- private void addComplexColumn(int index, int rowId, byte[] complexColumns) {
- GenericDataType complexDataType = complexIndexMap.get(index + primitiveDimLens.length);
-
- // initialize the page if first row
- if (rowId == 0) {
- int depthInComplexColumn = complexDataType.getColsCount();
- complexDimensionPage[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
- }
-
- int depthInComplexColumn = complexDimensionPage[index].getDepth();
- // this is the encoded columnar data which will be added to page,
- // size of this list is the depth of complex column, we will fill it by input data
- List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>();
- for (int k = 0; k < depthInComplexColumn; k++) {
- encodedComplexColumnar.add(new ArrayList<byte[]>());
- }
-
- // encode the complex type data and fill columnsArray
- try {
- ByteBuffer byteArrayInput = ByteBuffer.wrap(complexColumns);
- ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
- complexDataType.parseAndBitPack(byteArrayInput, dataOutputStream, complexKeyGenerator);
- complexDataType.getColumnarDataForComplexType(encodedComplexColumnar,
- ByteBuffer.wrap(byteArrayOutput.toByteArray()));
- byteArrayOutput.close();
- } catch (IOException | KeyGenException e) {
- throw new CarbonDataWriterException(
- "Problem while bit packing and writing complex datatype", e);
- }
-
- for (int depth = 0; depth < depthInComplexColumn; depth++) {
- complexDimensionPage[index].putComplexData(rowId, depth, encodedComplexColumnar.get(depth));
- }
- }
-
- // Adds length as a short element (first 2 bytes) to the head of the input byte array
- private byte[] addLengthToByteArray(byte[] input) {
- byte[] output = new byte[input.length + 2];
- ByteBuffer buffer = ByteBuffer.wrap(output);
- buffer.putShort((short) input.length);
- buffer.put(input, 0, input.length);
- return output;
- }
-
- }
-
- /**
* generate the NodeHolder from the input rows (one page in case of V3 format)
*/
- private NodeHolder processDataRows(List<Object[]> dataRows)
- throws CarbonDataWriterException {
+ private NodeHolder processDataRows(List<CarbonRow> dataRows)
+ throws CarbonDataWriterException, KeyGenException {
if (dataRows.size() == 0) {
return new NodeHolder();
}
- TablePage tablePage = new TablePage(dataRows.size());
+ TablePage tablePage = new TablePage(model, dataRows.size());
IndexKey keys = new IndexKey(dataRows.size());
int rowId = 0;
// convert row to columnar data
- for (Object[] row : dataRows) {
+ for (CarbonRow row : dataRows) {
tablePage.addRow(rowId, row);
keys.update(rowId, row);
rowId++;
@@ -702,14 +461,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
// TODO: To make the encoding more transparent to the user, user should be enable to specify
// the encoding and compression method for each type when creating table.
- Codec codec = new Codec();
+ Codec codec = new Codec(model.getMeasureDataType());
IndexStorage[] dimColumns = codec.encodeAndCompressDimensions(tablePage);
Codec encodedMeasure = codec.encodeAndCompressMeasures(tablePage);
// prepare nullBitSet for writer, remove this after writer can accept TablePage
- BitSet[] nullBitSet = new BitSet[tablePage.measurePage.length];
+ BitSet[] nullBitSet = new BitSet[tablePage.getMeasurePage().length];
+ FixLengthColumnPage[] measurePages = tablePage.getMeasurePage();
for (int i = 0; i < nullBitSet.length; i++) {
- nullBitSet[i] = tablePage.measurePage[i].getNullBitSet();
+ nullBitSet[i] = measurePages[i].getNullBitSet();
}
LOGGER.info("Number Of records processed: " + dataRows.size());
@@ -767,10 +527,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
for (int i = 0; i < taskList.size(); i++) {
try {
taskList.get(i).get();
- } catch (InterruptedException e) {
- LOGGER.error(e, e.getMessage());
- throw new CarbonDataWriterException(e.getMessage(), e);
- } catch (ExecutionException e) {
+ } catch (InterruptedException | ExecutionException e) {
LOGGER.error(e, e.getMessage());
throw new CarbonDataWriterException(e.getMessage(), e);
}
@@ -780,7 +537,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private int getColsCount(int columnSplit) {
int count = 0;
for (int i = 0; i < columnSplit; i++) {
- GenericDataType complexDataType = complexIndexMap.get(i);
+ GenericDataType complexDataType = model.getComplexIndexMap().get(i);
if (complexDataType != null) {
count += complexDataType.getColsCount();
} else count++;
@@ -791,8 +548,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
// return the number of complex column after complex columns are expanded
private int getExpandedComplexColsCount() {
int count = 0;
- for (int i = 0; i < dimensionCount; i++) {
- GenericDataType complexDataType = complexIndexMap.get(i);
+ int dictDimensionCount = model.getDimensionCount();
+ for (int i = 0; i < dictDimensionCount; i++) {
+ GenericDataType complexDataType = model.getComplexIndexMap().get(i);
if (complexDataType != null) {
count += complexDataType.getColsCount();
}
@@ -802,7 +560,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
// return the number of complex column
private int getComplexColumnCount() {
- return complexIndexMap.size();
+ return model.getComplexIndexMap().size();
}
/**
@@ -850,27 +608,22 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE);
// if atleast one dimension is present then initialize column splitter otherwise null
int noOfColStore = colGrpModel.getNoOfColumnStore();
- int[] keyBlockSize = new int[noOfColStore + complexColCount];
+ int[] keyBlockSize = new int[noOfColStore + getExpandedComplexColsCount()];
- if (dimLens.length > 0) {
+ if (model.getDimLens().length > 0) {
//Using Variable length variable split generator
//This will help in splitting mdkey to columns. variable split is required because all
// columns which are part of
//row store will be in single column store
//e.g if {0,1,2,3,4,5} is dimension and {0,1,2) is row store dimension
//than below splitter will return column as {0,1,2}{3}{4}{5}
- this.columnarSplitter = this.segmentProperties.getFixedLengthKeySplitter();
+ this.columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter();
System.arraycopy(columnarSplitter.getBlockKeySize(), 0, keyBlockSize, 0, noOfColStore);
this.keyBlockHolder =
new CarbonKeyBlockHolder[this.columnarSplitter.getBlockKeySize().length];
} else {
this.keyBlockHolder = new CarbonKeyBlockHolder[0];
}
- this.complexKeyGenerator = new KeyGenerator[completeDimLens.length];
- for (int i = 0; i < completeDimLens.length; i++) {
- complexKeyGenerator[i] =
- KeyGeneratorFactory.getKeyGenerator(new int[] { completeDimLens[i] });
- }
for (int i = 0; i < keyBlockHolder.length; i++) {
this.keyBlockHolder[i] = new CarbonKeyBlockHolder(blockletSize);
@@ -882,6 +635,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
List<Integer> customMeasureIndexList =
new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ DataType[] type = model.getMeasureDataType();
for (int j = 0; j < type.length; j++) {
if (type[j] != DataType.BYTE && type[j] != DataType.DECIMAL) {
otherMeasureIndexList.add(j);
@@ -898,9 +652,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
for (int i = 0; i < customMeasureIndex.length; i++) {
customMeasureIndex[i] = customMeasureIndexList.get(i);
}
- setComplexMapSurrogateIndex(this.dimensionCount);
+ setComplexMapSurrogateIndex(model.getDimensionCount());
int[] blockKeySize = getBlockKeySizeWithComplexTypes(new MultiDimKeyVarLengthEquiSplitGenerator(
- CarbonUtil.getIncrementedCardinalityFullyFilled(completeDimLens.clone()), (byte) dimSet)
+ CarbonUtil.getIncrementedCardinalityFullyFilled(model.getDimLens().clone()), (byte) dimSet)
.getBlockKeySize());
System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore,
blockKeySize.length - noOfColStore);
@@ -924,8 +678,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
List<Integer> blockKeySizeWithComplex =
new ArrayList<Integer>(blockKeySizeWithComplexTypes.length);
- for (int i = 0; i < this.dimensionCount; i++) {
- GenericDataType complexDataType = complexIndexMap.get(i);
+ int dictDimensionCount = model.getDimensionCount();
+ for (int i = 0; i < dictDimensionCount; i++) {
+ GenericDataType complexDataType = model.getComplexIndexMap().get(i);
if (complexDataType != null) {
complexDataType.fillBlockKeySize(blockKeySizeWithComplex, primitiveBlockKeySize);
} else {
@@ -958,37 +713,37 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
*/
private CarbonDataWriterVo getDataWriterVo(int[] keyBlockSize) {
CarbonDataWriterVo carbonDataWriterVo = new CarbonDataWriterVo();
- carbonDataWriterVo.setStoreLocation(storeLocation);
- carbonDataWriterVo.setMeasureCount(measureCount);
- carbonDataWriterVo.setMdKeyLength(mdkeyLength);
- carbonDataWriterVo.setTableName(tableName);
+ carbonDataWriterVo.setStoreLocation(model.getStoreLocation());
+ carbonDataWriterVo.setMeasureCount(model.getMeasureCount());
+ carbonDataWriterVo.setTableName(model.getTableName());
carbonDataWriterVo.setKeyBlockSize(keyBlockSize);
carbonDataWriterVo.setFileManager(fileManager);
carbonDataWriterVo.setAggBlocks(aggKeyBlock);
carbonDataWriterVo.setIsComplexType(isComplexTypes());
- carbonDataWriterVo.setNoDictionaryCount(noDictionaryCount);
- carbonDataWriterVo.setCarbonDataFileAttributes(carbonDataFileAttributes);
- carbonDataWriterVo.setDatabaseName(databaseName);
- carbonDataWriterVo.setWrapperColumnSchemaList(wrapperColumnSchemaList);
+ carbonDataWriterVo.setNoDictionaryCount(model.getNoDictionaryCount());
+ carbonDataWriterVo.setCarbonDataFileAttributes(model.getCarbonDataFileAttributes());
+ carbonDataWriterVo.setDatabaseName(model.getDatabaseName());
+ carbonDataWriterVo.setWrapperColumnSchemaList(model.getWrapperColumnSchema());
carbonDataWriterVo.setIsDictionaryColumn(isDictDimension);
- carbonDataWriterVo.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
- carbonDataWriterVo.setColCardinality(colCardinality);
- carbonDataWriterVo.setSegmentProperties(segmentProperties);
- carbonDataWriterVo.setTableBlocksize(tableBlockSize);
+ carbonDataWriterVo.setCarbonDataDirectoryPath(model.getCarbonDataDirectoryPath());
+ carbonDataWriterVo.setColCardinality(model.getColCardinality());
+ carbonDataWriterVo.setSegmentProperties(model.getSegmentProperties());
+ carbonDataWriterVo.setTableBlocksize(model.getBlockSizeInMB());
carbonDataWriterVo.setBucketNumber(bucketNumber);
carbonDataWriterVo.setTaskExtension(taskExtension);
- carbonDataWriterVo.setSchemaUpdatedTimeStamp(schemaUpdatedTimeStamp);
+ carbonDataWriterVo.setSchemaUpdatedTimeStamp(model.getSchemaUpdatedTimeStamp());
return carbonDataWriterVo;
}
private boolean[] isComplexTypes() {
- int noOfColumn = colGrpModel.getNoOfColumnStore() + noDictionaryCount + complexIndexMap.size();
+ int noDictionaryCount = model.getNoDictionaryCount();
+ int noOfColumn = colGrpModel.getNoOfColumnStore() + noDictionaryCount + getComplexColumnCount();
int allColsCount = getColsCount(noOfColumn);
boolean[] isComplexType = new boolean[allColsCount];
List<Boolean> complexTypesList = new ArrayList<Boolean>(allColsCount);
for (int i = 0; i < noOfColumn; i++) {
- GenericDataType complexDataType = complexIndexMap.get(i - noDictionaryCount);
+ GenericDataType complexDataType = model.getComplexIndexMap().get(i - noDictionaryCount);
if (complexDataType != null) {
int count = complexDataType.getColsCount();
for (int j = 0; j < count; j++) {
@@ -1081,11 +836,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private final class Producer implements Callable<Void> {
private BlockletDataHolder blockletDataHolder;
- private List<Object[]> dataRows;
+ private List<CarbonRow> dataRows;
private int sequenceNumber;
private boolean isWriteAll;
- private Producer(BlockletDataHolder blockletDataHolder, List<Object[]> dataRows,
+ private Producer(BlockletDataHolder blockletDataHolder, List<CarbonRow> dataRows,
int sequenceNumber, boolean isWriteAll) {
this.blockletDataHolder = blockletDataHolder;
this.dataRows = dataRows;
@@ -1176,17 +931,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
this.isUseInvertedIndex = isUseInvertedIndex;
}
- public BlockSortThread(int index, byte[][] data, boolean b, boolean isNoDictionary,
+ public BlockSortThread(byte[][] data, boolean compression, boolean isNoDictionary,
boolean isSortRequired, boolean isUseInvertedIndex) {
- this.index = index;
this.data = data;
- isCompressionReq = b;
+ this.isCompressionReq = compression;
this.isNoDictionary = isNoDictionary;
this.isSortRequired = isSortRequired;
this.isUseInvertedIndex = isUseInvertedIndex;
}
@Override public IndexStorage call() throws Exception {
+ if (index == 1) {
+ int dd = 1 + 1;
+ }
if (isUseInvertedIndex) {
if (version == ColumnarFormatVersion.V3) {
return new BlockIndexerStorageForShort(this.data, isCompressionReq, isNoDictionary,
@@ -1210,8 +967,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
public class Codec {
private WriterCompressModel compressionModel;
private byte[][] encodedMeasureArray;
+ private DataType[] measureType;
- Codec() {
+ Codec(DataType[] measureType) {
+ this.measureType = measureType;
}
public WriterCompressModel getCompressionModel() {
@@ -1225,7 +984,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
public Codec encodeAndCompressMeasures(TablePage tablePage) {
// TODO: following conversion is required only because compress model requires them,
// remove then after the compress framework is refactoried
- FixLengthColumnPage[] measurePage = tablePage.measurePage;
+ FixLengthColumnPage[] measurePage = tablePage.getMeasurePage();
int measureCount = measurePage.length;
Object[] min = new Object[measurePage.length];
Object[] max = new Object[measurePage.length];
@@ -1238,8 +997,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
decimal[i] = measurePage[i].getStatistics().getDecimal();
}
// encode and compress measure column page
- compressionModel = ValueCompressionUtil
- .getWriterCompressModel(max, min, decimal, uniqueValue, type, new byte[measureCount]);
+ compressionModel =
+ ValueCompressionUtil.getWriterCompressModel(max, min, decimal, uniqueValue, measureType,
+ new byte[measureCount]);
encodedMeasureArray = encodeMeasure(compressionModel, measurePage);
return this;
}
@@ -1295,8 +1055,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* Encode and compress each column page. The work is done using a thread pool.
*/
private IndexStorage[] encodeAndCompressDimensions(TablePage tablePage) {
- int noDictionaryCount = tablePage.noDictDimensionPage.length;
- int complexColCount = tablePage.complexDimensionPage.length;
+ int noDictionaryCount = tablePage.getNoDictDimensionPage().length;
+ int complexColCount = tablePage.getComplexDimensionPage().length;
// thread pool size to be used for encoding dimension
// each thread will sort the column page data and compress it
@@ -1306,12 +1066,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
Callable<IndexStorage> callable;
List<Future<IndexStorage>> submit = new ArrayList<Future<IndexStorage>>(
- primitiveDimLens.length + noDictionaryCount + complexColCount);
+ model.getPrimitiveDimLens().length + noDictionaryCount + complexColCount);
int i = 0;
int dictionaryColumnCount = -1;
int noDictionaryColumnCount = -1;
int colGrpId = -1;
boolean isSortColumn = false;
+ SegmentProperties segmentProperties = model.getSegmentProperties();
for (i = 0; i < isDictDimension.length; i++) {
isSortColumn = i < segmentProperties.getNumberOfSortColumns();
if (isDictDimension[i]) {
@@ -1320,8 +1081,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
// dictionary dimension
callable =
new BlockSortThread(
- i,
- tablePage.dictDimensionPage[dictionaryColumnCount].getByteArrayPage(),
+ tablePage.getKeyColumnPage().getKeyVector(dictionaryColumnCount),
+ true,
+ false,
isSortColumn,
isUseInvertedIndex[i] & isSortColumn);
@@ -1330,14 +1092,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
callable = new ColGroupBlockStorage(
segmentProperties,
++colGrpId,
- tablePage.dictDimensionPage[dictionaryColumnCount].getByteArrayPage());
+ tablePage.getKeyColumnPage().getKeyVector(dictionaryColumnCount));
}
} else {
// no dictionary dimension
callable =
new BlockSortThread(
- i,
- tablePage.noDictDimensionPage[++noDictionaryColumnCount].getByteArrayPage(),
+ tablePage.getNoDictDimensionPage()[++noDictionaryColumnCount].getByteArrayPage(),
false,
true,
isSortColumn,
@@ -1349,12 +1110,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
// complex type column
for (int index = 0; index < getComplexColumnCount(); index++) {
- Iterator<byte[][]> iterator = tablePage.complexDimensionPage[index].iterator();
+ Iterator<byte[][]> iterator = tablePage.getComplexDimensionPage()[index].iterator();
while (iterator.hasNext()) {
+ byte[][] data = iterator.next();
callable =
new BlockSortThread(
i++,
- iterator.next(),
+ data,
false,
true);
submit.add(executorService.submit(callable));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 44958a4..df27dcc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -43,9 +43,8 @@ import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-/**
- * This class contains all the data required for processing and writing the carbon data
- */
+// This class contains all the data required for processing and writing the carbon data
+// TODO: we should try to minimize this class as refactorying loading process
public class CarbonFactDataHandlerModel {
/**
@@ -74,59 +73,25 @@ public class CarbonFactDataHandlerModel {
*/
private int measureCount;
/**
- * length of mdKey
- */
- private int mdKeyLength;
- /**
- * mdKey index in one row object
- */
- private int mdKeyIndex;
- /**
- * aggregators (e,g min, amx, sum)
- */
- private String[] aggregators;
- /**
- * custom aggregator class which contains the logic of merging data
- */
- private String[] aggregatorClass;
- /**
* local store location
*/
private String storeLocation;
/**
- * cardinality of all dimensions
- */
- private int[] factDimLens;
- /**
- * flag to check whether to merge data based on custom aggregator
- */
- private boolean isMergingRequestForCustomAgg;
- /**
* flag to check whether use inverted index
*/
private boolean[] isUseInvertedIndex;
+
/**
- * dimension cardinality
+ * length of each dimension, including dictionary, nodictioncy, complex dimension
*/
private int[] dimLens;
+
/**
- * array of fact table columns
- */
- private String[] factLevels;
- /**
- * array of aggregate levels
- */
- private String[] aggLevels;
- /**
- * flag for data writing request
- */
- private boolean isDataWritingRequest;
- /**
- * count of columns for which dictionary is not generated
+ * total number of no dictionary dimension in the table
*/
private int noDictionaryCount;
/**
- * total number of columns in table
+ * total number of dictionary dimension and complex dimension columns in table
*/
private int dimensionCount;
/**
@@ -185,10 +150,11 @@ public class CarbonFactDataHandlerModel {
private int taskExtension;
+ // key generator for complex dimension
+ private KeyGenerator[] complexDimensionKeyGenerator;
+
/**
* Create the model using @{@link CarbonDataLoadConfiguration}
- * @param configuration
- * @return CarbonFactDataHandlerModel
*/
public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
CarbonDataLoadConfiguration configuration, String storeLocation, int bucketId,
@@ -207,26 +173,6 @@ public class CarbonFactDataHandlerModel {
}
}
}
- List<Integer> dimsLenList = new ArrayList<Integer>();
- for (int eachDimLen : dimLensWithComplex) {
- if (eachDimLen != 0) dimsLenList.add(eachDimLen);
- }
- int[] dimLens = new int[dimsLenList.size()];
- for (int i = 0; i < dimsLenList.size(); i++) {
- dimLens[i] = dimsLenList.get(i);
- }
-
- int dimensionCount = configuration.getDimensionCount();
- int noDictionaryCount = configuration.getNoDictionaryCount();
- int complexDimensionCount = configuration.getComplexDimensionCount();
- int measureCount = configuration.getMeasureCount();
-
- int simpleDimsCount = dimensionCount - noDictionaryCount - complexDimensionCount;
- int[] simpleDimsLen = new int[simpleDimsCount];
- for (int i = 0; i < simpleDimsCount; i++) {
- simpleDimsLen[i] = dimLens[i];
- }
-
CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
identifier.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + identifier
.getTableName());
@@ -235,12 +181,22 @@ public class CarbonFactDataHandlerModel {
carbonTable.getMeasureByTableName(identifier.getTableName()));
int[] colCardinality =
CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchema);
+
SegmentProperties segmentProperties =
new SegmentProperties(wrapperColumnSchema, colCardinality);
- // Actual primitive dimension used to generate start & end key
- KeyGenerator keyGenerator = segmentProperties.getDimensionKeyGenerator();
+ int[] dimLens = configuration.calcDimensionLengths();
+
+ int dimensionCount = configuration.getDimensionCount();
+ int noDictionaryCount = configuration.getNoDictionaryCount();
+ int complexDimensionCount = configuration.getComplexColumnCount();
+ int measureCount = configuration.getMeasureCount();
+ int simpleDimsCount = dimensionCount - noDictionaryCount - complexDimensionCount;
+ int[] simpleDimsLen = new int[simpleDimsCount];
+ for (int i = 0; i < simpleDimsCount; i++) {
+ simpleDimsLen[i] = dimLens[i];
+ }
//To Set MDKey Index of each primitive type in complex type
int surrIndex = simpleDimsCount;
Iterator<Map.Entry<String, GenericDataType>> complexMap =
@@ -271,30 +227,23 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel
.setTableName(identifier.getTableName());
carbonFactDataHandlerModel.setMeasureCount(measureCount);
- carbonFactDataHandlerModel.setMdKeyLength(keyGenerator.getKeySizeInBytes());
carbonFactDataHandlerModel.setStoreLocation(storeLocation);
carbonFactDataHandlerModel.setDimLens(dimLens);
carbonFactDataHandlerModel.setNoDictionaryCount(noDictionaryCount);
- carbonFactDataHandlerModel
- .setDimensionCount(configuration.getDimensionCount() - noDictionaryCount);
+ carbonFactDataHandlerModel.setDimensionCount(
+ configuration.getDimensionCount() - noDictionaryCount);
carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
carbonFactDataHandlerModel.setSegmentProperties(segmentProperties);
carbonFactDataHandlerModel.setColCardinality(colCardinality);
- carbonFactDataHandlerModel.setDataWritingRequest(true);
- carbonFactDataHandlerModel.setMeasureDataType(CarbonDataProcessorUtil
- .getMeasureDataType(configuration.getMeasureCount(), configuration.getMeasureFields()));
- carbonFactDataHandlerModel.setFactDimLens(dimLens);
+ carbonFactDataHandlerModel.setMeasureDataType(configuration.getMeasureDataType());
carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
carbonFactDataHandlerModel.setPrimitiveDimLens(simpleDimsLen);
carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndex);
carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
- if (noDictionaryCount > 0 || complexDimensionCount > 0) {
- carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1);
- } else {
- carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
- }
+ carbonFactDataHandlerModel.setComplexDimensionKeyGenerator(
+ configuration.createKeyGeneratorForComplexDimension());
carbonFactDataHandlerModel.bucketId = bucketId;
carbonFactDataHandlerModel.segmentId = configuration.getSegmentId();
carbonFactDataHandlerModel.taskExtension = taskExtension;
@@ -315,8 +264,6 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.setDatabaseName(loadModel.getDatabaseName());
carbonFactDataHandlerModel.setTableName(tableName);
carbonFactDataHandlerModel.setMeasureCount(segmentProperties.getMeasures().size());
- carbonFactDataHandlerModel
- .setMdKeyLength(segmentProperties.getDimensionKeyGenerator().getKeySizeInBytes());
carbonFactDataHandlerModel.setStoreLocation(tempStoreLocation);
carbonFactDataHandlerModel.setDimLens(segmentProperties.getDimColumnsCardinality());
carbonFactDataHandlerModel.setSegmentProperties(segmentProperties);
@@ -337,14 +284,12 @@ public class CarbonFactDataHandlerModel {
Map<Integer, GenericDataType> complexIndexMap =
new HashMap<Integer, GenericDataType>(segmentProperties.getComplexDimensions().size());
carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
- carbonFactDataHandlerModel.setDataWritingRequest(true);
DataType[] aggType = new DataType[segmentProperties.getMeasures().size()];
int i = 0;
for (CarbonMeasure msr : segmentProperties.getMeasures()) {
aggType[i++] = msr.getDataType();
}
carbonFactDataHandlerModel.setMeasureDataType(aggType);
- carbonFactDataHandlerModel.setFactDimLens(segmentProperties.getDimColumnsCardinality());
String carbonDataDirectoryPath = CarbonDataProcessorUtil
.checkAndCreateCarbonStoreLocation(loadModel.getStorePath(), loadModel.getDatabaseName(),
tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
@@ -359,12 +304,6 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes);
carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
- if (segmentProperties.getNumberOfNoDictionaryDimension() > 0
- || segmentProperties.getComplexDimensions().size() > 0) {
- carbonFactDataHandlerModel.setMdKeyIndex(segmentProperties.getMeasures().size() + 1);
- } else {
- carbonFactDataHandlerModel.setMdKeyIndex(segmentProperties.getMeasures().size());
- }
return carbonFactDataHandlerModel;
}
@@ -426,22 +365,6 @@ public class CarbonFactDataHandlerModel {
this.measureCount = measureCount;
}
- public int getMdKeyLength() {
- return mdKeyLength;
- }
-
- public void setMdKeyLength(int mdKeyLength) {
- this.mdKeyLength = mdKeyLength;
- }
-
- public int getMdKeyIndex() {
- return mdKeyIndex;
- }
-
- public void setMdKeyIndex(int mdKeyIndex) {
- this.mdKeyIndex = mdKeyIndex;
- }
-
public String getStoreLocation() {
return storeLocation;
}
@@ -450,10 +373,6 @@ public class CarbonFactDataHandlerModel {
this.storeLocation = storeLocation;
}
- public void setFactDimLens(int[] factDimLens) {
- this.factDimLens = factDimLens;
- }
-
public int[] getDimLens() {
return dimLens;
}
@@ -462,10 +381,6 @@ public class CarbonFactDataHandlerModel {
this.dimLens = dimLens;
}
- public void setDataWritingRequest(boolean dataWritingRequest) {
- isDataWritingRequest = dataWritingRequest;
- }
-
public int getNoDictionaryCount() {
return noDictionaryCount;
}
@@ -590,5 +505,22 @@ public class CarbonFactDataHandlerModel {
public int getTaskExtension() {
return taskExtension;
}
+
+ public KeyGenerator[] getComplexDimensionKeyGenerator() {
+ return complexDimensionKeyGenerator;
+ }
+
+ public void setComplexDimensionKeyGenerator(KeyGenerator[] complexDimensionKeyGenerator) {
+ this.complexDimensionKeyGenerator = complexDimensionKeyGenerator;
+ }
+
+ public KeyGenerator getMDKeyGenerator() {
+ return segmentProperties.getDimensionKeyGenerator();
+ }
+
+ // return the number of complex columns
+ public int getComplexColumnCount() {
+ return complexIndexMap.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
index 42a32d7..16d2da0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
@@ -17,12 +17,13 @@
package org.apache.carbondata.processing.store;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
public interface CarbonFactHandler {
void initialise() throws CarbonDataWriterException;
- void addDataToStore(Object[] row) throws CarbonDataWriterException;
+ void addDataToStore(CarbonRow row) throws CarbonDataWriterException;
void finish() throws CarbonDataWriterException;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
new file mode 100644
index 0000000..cf1b746
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
+import org.apache.carbondata.core.datastore.page.FixLengthColumnPage;
+import org.apache.carbondata.core.datastore.page.KeyColumnPage;
+import org.apache.carbondata.core.datastore.page.VarLengthColumnPage;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.WriteStepRowUtil;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+
+import org.apache.spark.sql.types.Decimal;
+
+/**
+ * Represent a page data for all columns, we store its data in columnar layout, so that
+ * all processing apply to TablePage can be done in vectorized fashion.
+ */
+class TablePage {
+
+ // For all dimension and measure columns, we store the column data directly in the page,
+ // the length of the page is the number of rows.
+
+ // TODO: we should have separate class for key columns so that keys are stored together in
+ // one vector to make it efficient for sorting
+ private KeyColumnPage keyColumnPage;
+ private VarLengthColumnPage[] noDictDimensionPage;
+ private ComplexColumnPage[] complexDimensionPage;
+ private FixLengthColumnPage[] measurePage;
+
+ // the num of rows in this page, it must be less than short value (65536)
+ private int pageSize;
+
+ private CarbonFactDataHandlerModel model;
+
+ TablePage(CarbonFactDataHandlerModel model, int pageSize) {
+ this.model = model;
+ this.pageSize = pageSize;
+ keyColumnPage = new KeyColumnPage(pageSize,
+ model.getSegmentProperties().getDimensionPartitions().length);
+ noDictDimensionPage = new VarLengthColumnPage[model.getNoDictionaryCount()];
+ for (int i = 0; i < noDictDimensionPage.length; i++) {
+ noDictDimensionPage[i] = new VarLengthColumnPage(pageSize);
+ }
+ complexDimensionPage = new ComplexColumnPage[model.getComplexColumnCount()];
+ for (int i = 0; i < complexDimensionPage.length; i++) {
+ // here we still do not the depth of the complex column, it will be initialized when
+ // we get the first row.
+ complexDimensionPage[i] = null;
+ }
+ measurePage = new FixLengthColumnPage[model.getMeasureCount()];
+ DataType[] dataTypes = model.getMeasureDataType();
+ for (int i = 0; i < measurePage.length; i++) {
+ measurePage[i] = new FixLengthColumnPage(dataTypes[i], pageSize);
+ }
+ }
+
+ /**
+ * Add one row to the internal store, it will be converted into columnar layout
+ *
+ * @param rowId Id of the input row
+ * @param row row object
+ */
+ void addRow(int rowId, CarbonRow row) throws KeyGenException {
+ // convert each column category
+
+ // 1. convert dictionary columns
+ byte[] mdk = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
+ byte[][] keys = model.getSegmentProperties().getFixedLengthKeySplitter().splitKey(mdk);
+ keyColumnPage.putKey(rowId, keys);
+
+ // 2. convert noDictionary columns and complex columns.
+ int noDictionaryCount = noDictDimensionPage.length;
+ int complexColumnCount = complexDimensionPage.length;
+ if (noDictionaryCount > 0 || complexColumnCount > 0) {
+ byte[][] noDictAndComplex = WriteStepRowUtil.getNoDictAndComplexDimension(row);
+ for (int i = 0; i < noDictAndComplex.length; i++) {
+ if (i < noDictionaryCount) {
+ // noDictionary columns, since it is variable length, we need to prepare each
+ // element as LV encoded byte array (first two bytes are the length of the array)
+ byte[] valueWithLength = addLengthToByteArray(noDictAndComplex[i]);
+ noDictDimensionPage[i].putByteArray(rowId, valueWithLength);
+ } else {
+ // complex columns
+ addComplexColumn(i - noDictionaryCount, rowId, noDictAndComplex[i]);
+ }
+ }
+ }
+
+ // 3. convert measure columns
+ Object[] measureColumns = WriteStepRowUtil.getMeasure(row);
+ for (int i = 0; i < measurePage.length; i++) {
+ Object value = measureColumns[i];
+
+ // in compaction flow the measure with decimal type will come as Spark decimal.
+ // need to convert it to byte array.
+ if (measurePage[i].getDataType() == DataType.DECIMAL && model.isCompactionFlow()) {
+ BigDecimal bigDecimal = ((Decimal) value).toJavaBigDecimal();
+ value = DataTypeUtil.bigDecimalToByte(bigDecimal);
+ }
+ measurePage[i].putData(rowId, value);
+ }
+ }
+
+ /**
+ * add a complex column into internal member compleDimensionPage
+ *
+ * @param index index of the complexDimensionPage
+ * @param rowId Id of the input row
+ * @param complexColumns byte array the complex columm to be added, extracted of input row
+ */
+ // TODO: this function should be refactoried, ColumnPage should support complex type encoding
+ // directly instead of doing it here
+ private void addComplexColumn(int index, int rowId, byte[] complexColumns) {
+ GenericDataType complexDataType = model.getComplexIndexMap().get(
+ index + model.getPrimitiveDimLens().length);
+
+ // initialize the page if first row
+ if (rowId == 0) {
+ int depthInComplexColumn = complexDataType.getColsCount();
+ getComplexDimensionPage()[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
+ }
+
+ int depthInComplexColumn = getComplexDimensionPage()[index].getDepth();
+ // this is the encoded columnar data which will be added to page,
+ // size of this list is the depth of complex column, we will fill it by input data
+ List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>();
+ for (int k = 0; k < depthInComplexColumn; k++) {
+ encodedComplexColumnar.add(new ArrayList<byte[]>());
+ }
+
+ // encode the complex type data and fill columnsArray
+ try {
+ ByteBuffer byteArrayInput = ByteBuffer.wrap(complexColumns);
+ ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
+ complexDataType.parseAndBitPack(byteArrayInput, dataOutputStream,
+ model.getComplexDimensionKeyGenerator());
+ complexDataType.getColumnarDataForComplexType(encodedComplexColumnar,
+ ByteBuffer.wrap(byteArrayOutput.toByteArray()));
+ byteArrayOutput.close();
+ } catch (IOException | KeyGenException e) {
+ throw new CarbonDataWriterException("Problem while bit packing and writing complex datatype",
+ e);
+ }
+
+ for (int depth = 0; depth < depthInComplexColumn; depth++) {
+ getComplexDimensionPage()[index]
+ .putComplexData(rowId, depth, encodedComplexColumnar.get(depth));
+ }
+ }
+
+ // Adds length as a short element (first 2 bytes) to the head of the input byte array
+ private byte[] addLengthToByteArray(byte[] input) {
+ byte[] output = new byte[input.length + 2];
+ ByteBuffer buffer = ByteBuffer.wrap(output);
+ buffer.putShort((short) input.length);
+ buffer.put(input, 0, input.length);
+ return output;
+ }
+
+ public KeyColumnPage getKeyColumnPage() {
+ return keyColumnPage;
+ }
+
+ public VarLengthColumnPage[] getNoDictDimensionPage() {
+ return noDictDimensionPage;
+ }
+
+ public ComplexColumnPage[] getComplexDimensionPage() {
+ return complexDimensionPage;
+ }
+
+ public FixLengthColumnPage[] getMeasurePage() {
+ return measurePage;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
index def2aaa..8fa8432 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
@@ -39,14 +39,6 @@ public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage
}
}
- @Deprecated
- private ColGroupDataHolder colGrpDataHolder;
-
- @Deprecated
- public ColGroupBlockStorage(DataHolder colGrpDataHolder) {
- this.colGrpDataHolder = (ColGroupDataHolder) colGrpDataHolder;
- }
-
/**
* sorting is not required for colgroup storage and hence return true
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 770d24c..69c4eb1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -72,9 +72,7 @@ import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterE
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.io.IOUtils;
-public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<T>
-
-{
+public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<T> {
private static final LogService LOGGER =
LogServiceFactory.getLogService(AbstractFactDataWriter.class.getName());
@@ -141,10 +139,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
* file size at any given point
*/
private long currentFileSize;
- /**
- * size reserved in one file for writing block meta data. It will be in percentage
- */
- private int spaceReservedForBlockMetaSize;
protected FileOutputStream fileOutputStream;
@@ -171,7 +165,10 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
this.fileSizeInBytes =
(long) dataWriterVo.getTableBlocksize() * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
* CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
- this.spaceReservedForBlockMetaSize = Integer.parseInt(propInstance
+ /*
+ size reserved in one file for writing block meta data. It will be in percentage
+ */
+ int spaceReservedForBlockMetaSize = Integer.parseInt(propInstance
.getProperty(CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE,
CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE_DEFAULT));
this.dataBlockSize = fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index ce53ec8..8fbf9c0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -32,8 +32,6 @@ public class CarbonDataWriterVo {
private int measureCount;
- private int mdKeyLength;
-
private String tableName;
private IFileManagerComposite fileManager;
@@ -52,8 +50,6 @@ public class CarbonDataWriterVo {
private List<ColumnSchema> wrapperColumnSchemaList;
- private int numberOfNoDictionaryColumn;
-
private boolean[] isDictionaryColumn;
private String carbonDataDirectoryPath;
@@ -99,13 +95,6 @@ public class CarbonDataWriterVo {
}
/**
- * @param mdKeyLength the mdKeyLength to set
- */
- public void setMdKeyLength(int mdKeyLength) {
- this.mdKeyLength = mdKeyLength;
- }
-
- /**
* @return the tableName
*/
public String getTableName() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index 18f1b2e..4c4fead 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -357,8 +357,8 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
// calculate the size of data chunks
try {
for (int i = 0; i < nodeHolderList.get(0).getKeyArray().length; i++) {
- dataChunkBytes[i] = CarbonUtil.getByteArray(CarbonMetadataUtil
- .getDataChunk3(nodeHolderList, thriftColumnSchemaList,
+ dataChunkBytes[i] = CarbonUtil.getByteArray(
+ CarbonMetadataUtil.getDataChunk3(nodeHolderList, thriftColumnSchemaList,
dataWriterVo.getSegmentProperties(), i, true));
blockletDataSize += dataChunkBytes[i].length;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 34d822c..740b5dc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -32,13 +32,10 @@ import java.util.Set;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.constants.IgnoreDictionary;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -56,7 +53,6 @@ import org.apache.carbondata.processing.datatypes.StructDataType;
import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
import org.apache.commons.lang3.ArrayUtils;
@@ -93,32 +89,6 @@ public final class CarbonDataProcessorUtil {
}
/**
- * Utility method to get level cardinality string
- *
- * @param dimCardinalities
- * @param aggDims
- * @return level cardinality string
- */
- public static String getLevelCardinalitiesString(Map<String, String> dimCardinalities,
- String[] aggDims) {
- StringBuilder sb = new StringBuilder();
-
- for (int i = 0; i < aggDims.length; i++) {
- String string = dimCardinalities.get(aggDims[i]);
- if (string != null) {
- sb.append(string);
- sb.append(CarbonCommonConstants.COMA_SPC_CHARACTER);
- }
- }
- String resultStr = sb.toString();
- if (resultStr.endsWith(CarbonCommonConstants.COMA_SPC_CHARACTER)) {
- resultStr = resultStr
- .substring(0, resultStr.length() - CarbonCommonConstants.COMA_SPC_CHARACTER.length());
- }
- return resultStr;
- }
-
- /**
* @param storeLocation
*/
public static void renameBadRecordsFromInProgressToNormal(String storeLocation) {
@@ -237,9 +207,9 @@ public final class CarbonDataProcessorUtil {
break;
}
- if (!field.hasDictionaryEncoding() && field.getColumn().isDimesion()) {
+ if (!field.hasDictionaryEncoding() && field.getColumn().isDimension()) {
noDictionaryMapping.add(true);
- } else if (field.getColumn().isDimesion()) {
+ } else if (field.getColumn().isDimension()) {
noDictionaryMapping.add(false);
}
}
@@ -253,9 +223,9 @@ public final class CarbonDataProcessorUtil {
public static boolean[] getIsUseInvertedIndex(DataField[] fields) {
List<Boolean> isUseInvertedIndexList = new ArrayList<Boolean>();
for (DataField field : fields) {
- if (field.getColumn().isUseInvertedIndex() && field.getColumn().isDimesion()) {
+ if (field.getColumn().isUseInvertedIndex() && field.getColumn().isDimension()) {
isUseInvertedIndexList.add(true);
- } else if (field.getColumn().isDimesion()) {
+ } else if (field.getColumn().isDimension()) {
isUseInvertedIndexList.add(false);
}
}
@@ -349,13 +319,6 @@ public final class CarbonDataProcessorUtil {
return true;
}
- public static boolean isHeaderValid(String tableName, String header,
- CarbonDataLoadSchema schema, String delimiter) {
- String convertedDelimiter = CarbonUtil.delimiterConverter(delimiter);
- String[] csvHeader = getColumnFields(header.toLowerCase(), convertedDelimiter);
- return isHeaderValid(tableName, csvHeader, schema);
- }
-
/**
* This method update the column Name
*
@@ -388,25 +351,6 @@ public final class CarbonDataProcessorUtil {
return columnNames;
}
- /**
- * Splits header to fields using delimiter.
- * @param header
- * @param delimiter
- * @return
- */
- public static String[] getColumnFields(String header, String delimiter) {
- delimiter = CarbonUtil.delimiterConverter(delimiter);
- String[] columnNames = header.split(delimiter);
- String tmpCol;
- for (int i = 0; i < columnNames.length; i++) {
- tmpCol = columnNames[i].replaceAll("\"", "");
- columnNames[i] = tmpCol.trim();
- }
-
- return columnNames;
- }
-
-
public static DataType[] getMeasureDataType(int measureCount, String databaseName,
String tableName) {
DataType[] type = new DataType[measureCount];
@@ -449,45 +393,6 @@ public final class CarbonDataProcessorUtil {
}
/**
- * This method will convert surrogate key to MD key and fill the row in format
- * required by the writer for further processing
- *
- * @param row
- * @param segmentProperties
- * @param measureCount
- * @param noDictionaryCount
- * @param complexDimensionCount
- * @return
- * @throws KeyGenException
- */
- public static Object[] convertToMDKeyAndFillRow(CarbonRow row,
- SegmentProperties segmentProperties, int measureCount, int noDictionaryCount,
- int complexDimensionCount) throws KeyGenException {
- Object[] outputRow = null;
- // adding one for the high cardinality dims byte array.
- if (noDictionaryCount > 0 || complexDimensionCount > 0) {
- outputRow = new Object[measureCount + 1 + 1];
- } else {
- outputRow = new Object[measureCount + 1];
- }
- int l = 0;
- int index = 0;
- Object[] measures = row.getObjectArray(IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex());
- for (int i = 0; i < measureCount; i++) {
- outputRow[l++] = measures[index++];
- }
- outputRow[l] = row.getObject(IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex());
- int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
- int[] dimsArray = row.getIntArray(IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex());
- for (int i = 0; i < highCardExcludedRows.length; i++) {
- highCardExcludedRows[i] = dimsArray[i];
- }
- outputRow[outputRow.length - 1] =
- segmentProperties.getDimensionKeyGenerator().generateKey(highCardExcludedRows);
- return outputRow;
- }
-
- /**
* This method will get the store location for the given path, segment id and partition id
*
* @return data directory path