You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/05/12 13:57:12 UTC
[3/7] carbondata git commit: [CARBONDATA-1015] Refactory write step
and add ColumnPage in data load This closes #852
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 059c734..a515f0b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -141,7 +141,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
match {
case parser.Success(field, _) => field.asInstanceOf[Field]
case failureOrError => throw new MalformedCarbonCommandException(
- s"Unsupported data type: $col.getType")
+ s"Unsupported data type: $col.getDataType")
}
// the data type of the decimal type will be like decimal(10,0)
// so checking the start of the string and taking the precision and scale.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 81ee408..690f6ef 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -43,6 +44,8 @@ import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.spark.sql.types.Decimal;
+
/**
* This class will process the query result and convert the data
* into a format compatible for data load
@@ -89,7 +92,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
/**
* agg type defined for measures
*/
- private char[] aggType;
+ private DataType[] aggType;
/**
* segment id
*/
@@ -243,14 +246,14 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
* This method will convert the spark decimal to java big decimal type
*
* @param value
- * @param aggType
+ * @param type
* @return
*/
- private Object getConvertedMeasureValue(Object value, char aggType) {
- switch (aggType) {
- case CarbonCommonConstants.BIG_DECIMAL_MEASURE:
+ private Object getConvertedMeasureValue(Object value, DataType type) {
+ switch (type) {
+ case DECIMAL:
if (value != null) {
- value = ((org.apache.spark.sql.types.Decimal) value).toJavaBigDecimal();
+ value = ((Decimal) value).toJavaBigDecimal();
}
return value;
default:
@@ -404,6 +407,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
* initialise aggregation type for measures for their storage format
*/
private void initAggType() {
- aggType = CarbonDataProcessorUtil.initAggType(carbonTable, tableName, measureCount);
+ aggType = CarbonDataProcessorUtil.initDataType(carbonTable, tableName, measureCount);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index 0e14660..c1aafcd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -82,7 +82,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
sortParameters.getDimColCount(),
sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(),
- sortParameters.getNoDictionaryCount(), sortParameters.getAggType(),
+ sortParameters.getNoDictionaryCount(), sortParameters.getMeasureDataType(),
sortParameters.getNoDictionaryDimnesionColumn(),
sortParameters.getNoDictionarySortColumn());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index 60231c5..c8977ac 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -142,7 +142,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
return new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(),
sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(),
- sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn(),
+ sortParameters.getMeasureDataType(), sortParameters.getNoDictionaryDimnesionColumn(),
this.sortParameters.getNoDictionarySortColumn());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index 44f11f7..24109e4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -22,9 +22,9 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.DataTypeUtil;
/**
@@ -40,7 +40,7 @@ public class UnsafeCarbonRowPage {
private int measureSize;
- private char[] aggType;
+ private DataType[] measureDataType;
private long[] nullSetWords;
@@ -55,13 +55,13 @@ public class UnsafeCarbonRowPage {
private boolean saveToDisk;
public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
- boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, char[] aggType,
+ boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
MemoryBlock memoryBlock, boolean saveToDisk) {
this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
this.dimensionSize = dimensionSize;
this.measureSize = measureSize;
- this.aggType = aggType;
+ this.measureDataType = type;
this.saveToDisk = saveToDisk;
this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
buffer = new IntPointerBuffer(memoryBlock);
@@ -116,24 +116,30 @@ public class UnsafeCarbonRowPage {
for (int mesCount = 0; mesCount < measureSize; mesCount++) {
Object value = row[mesCount + dimensionSize];
if (null != value) {
- if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
- Double val = (Double) value;
- CarbonUnsafe.unsafe.putDouble(baseObject, address + size, val);
- size += 8;
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
- Long val = (Long) value;
- CarbonUnsafe.unsafe.putLong(baseObject, address + size, val);
- size += 8;
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- BigDecimal val = (BigDecimal) value;
- byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
- CarbonUnsafe.unsafe.putShort(baseObject, address + size,
- (short) bigDecimalInBytes.length);
- size += 2;
- CarbonUnsafe.unsafe
- .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
- address + size, bigDecimalInBytes.length);
- size += bigDecimalInBytes.length;
+ switch (measureDataType[mesCount]) {
+ case SHORT:
+ case INT:
+ case LONG:
+ Long val = (Long) value;
+ CarbonUnsafe.unsafe.putLong(baseObject, address + size, val);
+ size += 8;
+ break;
+ case DOUBLE:
+ Double doubleVal = (Double) value;
+ CarbonUnsafe.unsafe.putDouble(baseObject, address + size, doubleVal);
+ size += 8;
+ break;
+ case DECIMAL:
+ BigDecimal decimalVal = (BigDecimal) value;
+ byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
+ CarbonUnsafe.unsafe.putShort(baseObject, address + size,
+ (short) bigDecimalInBytes.length);
+ size += 2;
+ CarbonUnsafe.unsafe
+ .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+ address + size, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ break;
}
set(nullSetWords, mesCount);
} else {
@@ -187,22 +193,28 @@ public class UnsafeCarbonRowPage {
for (int mesCount = 0; mesCount < measureSize; mesCount++) {
if (isSet(nullSetWords, mesCount)) {
- if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
- Double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
- size += 8;
- rowToFill[dimensionSize + mesCount] = val;
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
- Long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
- size += 8;
- rowToFill[dimensionSize + mesCount] = val;
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
- byte[] bigDecimalInBytes = new byte[aShort];
- size += 2;
- CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
- CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
- size += bigDecimalInBytes.length;
- rowToFill[dimensionSize + mesCount] = bigDecimalInBytes;
+ switch (measureDataType[mesCount]) {
+ case SHORT:
+ case INT:
+ case LONG:
+ Long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
+ size += 8;
+ rowToFill[dimensionSize + mesCount] = val;
+ break;
+ case DOUBLE:
+ Double doubleVal = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
+ size += 8;
+ rowToFill[dimensionSize + mesCount] = doubleVal;
+ break;
+ case DECIMAL:
+ short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
+ byte[] bigDecimalInBytes = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ rowToFill[dimensionSize + mesCount] = bigDecimalInBytes;
+ break;
}
} else {
rowToFill[dimensionSize + mesCount] = null;
@@ -258,33 +270,34 @@ public class UnsafeCarbonRowPage {
for (int mesCount = 0; mesCount < measureSize; mesCount++) {
if (isSet(nullSetWords, mesCount)) {
- if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
- double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
- size += 8;
- stream.writeDouble(val);
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
- long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
- size += 8;
- stream.writeLong(val);
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
- byte[] bigDecimalInBytes = new byte[aShort];
- size += 2;
- CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
- CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
- size += bigDecimalInBytes.length;
- stream.writeShort(aShort);
- stream.write(bigDecimalInBytes);
+ switch (measureDataType[mesCount]) {
+ case SHORT:
+ case INT:
+ case LONG:
+ long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
+ size += 8;
+ stream.writeLong(val);
+ break;
+ case DOUBLE:
+ double doubleVal = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
+ size += 8;
+ stream.writeDouble(doubleVal);
+ break;
+ case DECIMAL:
+ short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
+ byte[] bigDecimalInBytes = new byte[aShort];
+ size += 2;
+ CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+ size += bigDecimalInBytes.length;
+ stream.writeShort(aShort);
+ stream.write(bigDecimalInBytes);
+ break;
}
}
}
}
- private Object[] getRow(long address) {
- Object[] row = new Object[dimensionSize + measureSize];
- return getRow(address, row);
- }
-
public void freeMemory() {
buffer.freeMemory();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 40608fa..a9c0cb7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -115,7 +115,7 @@ public class UnsafeSortDataRows {
this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
parameters.getNoDictionarySortColumn(),
parameters.getDimColCount() + parameters.getComplexDimColCount(),
- parameters.getMeasureColCount(), parameters.getAggType(), baseBlock,
+ parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock,
!UnsafeMemoryManager.INSTANCE.isMemoryAvailable());
// Delete if any older file exists in sort temp folder
deleteSortLocationIfExists();
@@ -178,10 +178,14 @@ public class UnsafeSortDataRows {
dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
- rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+ rowPage = new UnsafeCarbonRowPage(
+ parameters.getNoDictionaryDimnesionColumn(),
parameters.getNoDictionarySortColumn(),
parameters.getDimColCount() + parameters.getComplexDimColCount(),
- parameters.getMeasureColCount(), parameters.getAggType(), memoryBlock, saveToDisk);
+ parameters.getMeasureColCount(),
+ parameters.getMeasureDataType(),
+ memoryBlock,
+ saveToDisk);
bytesAdded += rowPage.addRow(rowBatch[i]);
} catch (Exception e) {
LOGGER.error(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index cfdb69a..aee4e51 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Future;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
@@ -122,7 +123,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
private int noDictionaryCount;
- private char[] aggType;
+ private DataType[] measureDataType;
private int numberOfObjectRead;
/**
@@ -150,7 +151,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
// set mdkey length
this.fileBufferSize = parameters.getFileBufferSize();
this.executorService = Executors.newFixedThreadPool(1);
- this.aggType = parameters.getAggType();
+ this.measureDataType = parameters.getMeasureDataType();
this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1;
comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
@@ -323,15 +324,21 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
for (int mesCount = 0; mesCount < measureCount; mesCount++) {
if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
- if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
- row[dimensionCount + mesCount] = stream.readDouble();
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
- row[dimensionCount + mesCount] = stream.readLong();
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- short aShort = stream.readShort();
- byte[] bigDecimalInBytes = new byte[aShort];
- stream.readFully(bigDecimalInBytes);
- row[dimensionCount + mesCount] = bigDecimalInBytes;
+ switch (measureDataType[mesCount]) {
+ case SHORT:
+ case INT:
+ case LONG:
+ row[dimensionCount + mesCount] = stream.readLong();
+ break;
+ case DOUBLE:
+ row[dimensionCount + mesCount] = stream.readDouble();
+ break;
+ case DECIMAL:
+ short aShort = stream.readShort();
+ byte[] bigDecimalInBytes = new byte[aShort];
+ stream.readFully(bigDecimalInBytes);
+ row[dimensionCount + mesCount] = bigDecimalInBytes;
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index e52dc8a..90c3b69 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -31,7 +31,7 @@ import java.util.concurrent.Callable;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.newflow.sort.unsafe.holder.SortTempChunkHolder;
@@ -278,7 +278,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException {
int dimCount = 0;
int size = 0;
- char[] aggType = mergerParameters.getAggType();
+ DataType[] type = mergerParameters.getMeasureDataType();
for (; dimCount < noDictionarycolumnMapping.length; dimCount++) {
if (noDictionarycolumnMapping[dimCount]) {
byte[] col = (byte[]) row[dimCount];
@@ -310,21 +310,25 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
for (int mesCount = 0; mesCount < measureSize; mesCount++) {
Object value = row[mesCount + dimensionSize];
if (null != value) {
- if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
- Double val = (Double) value;
- rowData.putDouble(size, val);
- size += 8;
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
- Long val = (Long) value;
- rowData.putLong(size, val);
- size += 8;
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- byte[] bigDecimalInBytes = (byte[]) value;
- rowData.putShort(size, (short)bigDecimalInBytes.length);
- size += 2;
- for (int i = 0; i < bigDecimalInBytes.length; i++) {
- rowData.put(size++, bigDecimalInBytes[i]);
- }
+ switch (type[mesCount]) {
+ case SHORT:
+ case INT:
+ case LONG:
+ rowData.putLong(size, (Long) value);
+ size += 8;
+ break;
+ case DOUBLE:
+ rowData.putDouble(size, (Double) value);
+ size += 8;
+ break;
+ case DECIMAL:
+ byte[] bigDecimalInBytes = (byte[]) value;
+ rowData.putShort(size, (short)bigDecimalInBytes.length);
+ size += 2;
+ for (int i = 0; i < bigDecimalInBytes.length; i++) {
+ rowData.put(size++, bigDecimalInBytes[i]);
+ }
+ break;
}
UnsafeCarbonRowPage.set(nullSetWords, mesCount);
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
index c50f335..0f0a5b0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -26,11 +26,11 @@ import java.util.concurrent.Future;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
@@ -64,7 +64,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
private boolean[] isNoDictionaryDimensionColumn;
- private char[] aggType;
+ private DataType[] measureDataType;
private int dimensionCount;
@@ -115,8 +115,8 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
dimensionCount = configuration.getDimensionCount() - noDictWithComplextCount;
isNoDictionaryDimensionColumn =
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
- aggType = CarbonDataProcessorUtil
- .getAggType(configuration.getMeasureCount(), configuration.getMeasureFields());
+ measureDataType = CarbonDataProcessorUtil
+ .getMeasureDataType(configuration.getMeasureCount(), configuration.getMeasureFields());
CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
.createCarbonFactDataHandlerModel(configuration,
@@ -266,7 +266,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
for (; l < this.measureCount; l++) {
Object value = row.getObject(l + this.dimensionWithComplexCount);
if (null != value) {
- if (aggType[l] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+ if (measureDataType[l] == DataType.DECIMAL) {
BigDecimal val = (BigDecimal) value;
outputRow[l] = DataTypeUtil.bigDecimalToByte(val);
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index a9e762d..d20292c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -29,7 +29,7 @@ import java.util.concurrent.Callable;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
import org.apache.carbondata.processing.util.NonDictionaryUtil;
@@ -251,7 +251,8 @@ public class IntermediateFileMerger implements Callable<Void> {
new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(),
mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(),
mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
- mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn(),
+ mergerParameters.getMeasureDataType(),
+ mergerParameters.getNoDictionaryDimnesionColumn(),
mergerParameters.getNoDictionarySortColumn());
// initialize
@@ -319,7 +320,7 @@ public class IntermediateFileMerger implements Callable<Void> {
return;
}
try {
- char[] aggType = mergerParameters.getAggType();
+ DataType[] aggType = mergerParameters.getMeasureDataType();
int[] mdkArray = (int[]) row[0];
byte[][] nonDictArray = (byte[][]) row[1];
int mdkIndex = 0;
@@ -339,27 +340,27 @@ public class IntermediateFileMerger implements Callable<Void> {
for (int counter = 0; counter < mergerParameters.getMeasureColCount(); counter++) {
if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
stream.write((byte) 1);
- if (aggType[counter] == CarbonCommonConstants.BYTE_VALUE_MEASURE) {
- Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
- stream.writeDouble(val);
- } else if (aggType[counter] == CarbonCommonConstants.DOUBLE_MEASURE) {
- Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
- stream.writeDouble(val);
- } else if (aggType[counter] == CarbonCommonConstants.BIG_INT_MEASURE) {
- Long val = (Long) NonDictionaryUtil.getMeasure(fieldIndex, row);
- stream.writeLong(val);
- } else if (aggType[counter] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
- stream.writeInt(bigDecimalInBytes.length);
- stream.write(bigDecimalInBytes);
+ switch (aggType[counter]) {
+ case SHORT:
+ case INT:
+ case LONG:
+ Long val = (Long) NonDictionaryUtil.getMeasure(fieldIndex, row);
+ stream.writeLong(val);
+ break;
+ case DOUBLE:
+ stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
+ break;
+ case DECIMAL:
+ byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
+ stream.writeInt(bigDecimalInBytes.length);
+ stream.write(bigDecimalInBytes);
+ break;
}
} else {
stream.write((byte) 0);
}
-
fieldIndex++;
}
-
} catch (IOException e) {
throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index eba5433..af654e2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -256,7 +257,7 @@ public class SortDataRows {
stream.writeInt(entryCountLocal);
int complexDimColCount = parameters.getComplexDimColCount();
int dimColCount = parameters.getDimColCount() + complexDimColCount;
- char[] aggType = parameters.getAggType();
+ DataType[] type = parameters.getMeasureDataType();
boolean[] noDictionaryDimnesionMapping = parameters.getNoDictionaryDimnesionColumn();
Object[] row = null;
for (int i = 0; i < entryCountLocal; i++) {
@@ -285,17 +286,21 @@ public class SortDataRows {
Object value = row[mesCount + dimColCount];
if (null != value) {
stream.write((byte) 1);
- if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
- Double val = (Double) value;
- stream.writeDouble(val);
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
- Long val = (Long) value;
- stream.writeLong(val);
- } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- BigDecimal val = (BigDecimal) value;
- byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
- stream.writeInt(bigDecimalInBytes.length);
- stream.write(bigDecimalInBytes);
+ switch (type[mesCount]) {
+ case SHORT:
+ case INT:
+ case LONG:
+ stream.writeLong((Long) value);
+ break;
+ case DOUBLE:
+ stream.writeDouble((Double) value);
+ break;
+ case DECIMAL:
+ BigDecimal val = (BigDecimal) value;
+ byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+ stream.writeInt(bigDecimalInBytes.length);
+ stream.write(bigDecimalInBytes);
+ break;
}
} else {
stream.write((byte) 0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index 7ef8f8e..8ac1491 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.schema.metadata.SortObserver;
@@ -88,7 +89,7 @@ public class SortParameters {
private String tableName;
- private char[] aggType;
+ private DataType[] measureDataType;
/**
* To know how many columns are of high cardinality.
@@ -137,7 +138,7 @@ public class SortParameters {
parameters.bufferSize = bufferSize;
parameters.databaseName = databaseName;
parameters.tableName = tableName;
- parameters.aggType = aggType;
+ parameters.measureDataType = measureDataType;
parameters.noDictionaryCount = noDictionaryCount;
parameters.partitionID = partitionID;
parameters.segmentId = segmentId;
@@ -270,12 +271,12 @@ public class SortParameters {
this.tableName = tableName;
}
- public char[] getAggType() {
- return aggType;
+ public DataType[] getMeasureDataType() {
+ return measureDataType;
}
- public void setAggType(char[] aggType) {
- this.aggType = aggType;
+ public void setMeasureDataType(DataType[] measureDataType) {
+ this.measureDataType = measureDataType;
}
public int getNoDictionaryCount() {
@@ -458,9 +459,9 @@ public class SortParameters {
CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
- char[] aggType = CarbonDataProcessorUtil
- .getAggType(configuration.getMeasureCount(), configuration.getMeasureFields());
- parameters.setAggType(aggType);
+ DataType[] measureDataType = CarbonDataProcessorUtil
+ .getMeasureDataType(configuration.getMeasureCount(), configuration.getMeasureFields());
+ parameters.setMeasureDataType(measureDataType);
return parameters;
}
@@ -560,10 +561,10 @@ public class SortParameters {
CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
- char[] aggType = CarbonDataProcessorUtil
- .getAggType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
+ DataType[] type = CarbonDataProcessorUtil
+ .getMeasureDataType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
parameters.getTableName());
- parameters.setAggType(aggType);
+ parameters.setMeasureDataType(type);
return parameters;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index 6695a5b..a4fdec1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Future;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
@@ -125,7 +126,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
private int noDictionaryCount;
- private char[] aggType;
+ private DataType[] aggType;
/**
* to store whether dimension is of dictionary type or not
@@ -150,7 +151,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
* @param isNoDictionaryDimensionColumn
*/
public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
- int measureCount, int fileBufferSize, int noDictionaryCount, char[] aggType,
+ int measureCount, int fileBufferSize, int noDictionaryCount, DataType[] aggType,
boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn) {
// set temp file
this.tempFile = tempFile;
@@ -338,15 +339,21 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
// read measure values
for (int i = 0; i < this.measureCount; i++) {
if (stream.readByte() == 1) {
- if (aggType[i] == CarbonCommonConstants.DOUBLE_MEASURE) {
- measures[index++] = stream.readDouble();
- } else if (aggType[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
- measures[index++] = stream.readLong();
- } else {
- int len = stream.readInt();
- byte[] buff = new byte[len];
- stream.readFully(buff);
- measures[index++] = buff;
+ switch (aggType[i]) {
+ case SHORT:
+ case INT:
+ case LONG:
+ measures[index++] = stream.readLong();
+ break;
+ case DOUBLE:
+ measures[index++] = stream.readDouble();
+ break;
+ case DECIMAL:
+ int len = stream.readInt();
+ byte[] buff = new byte[len];
+ stream.readFully(buff);
+ measures[index++] = buff;
+ break;
}
} else {
measures[index++] = null;