You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/09/18 13:38:16 UTC
[2/4] carbondata git commit: [CARBONDATA-2896][Refactor] Adaptive
Encoding for Primitive data types
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
index 29e3060..29a4098 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
/**
* Implementation for BloomFilter DataMap to rebuild the datamap for main table with existing data
@@ -61,8 +62,12 @@ public class BloomDataMapBuilder extends AbstractBloomDataMapWriter implements D
}
@Override
- protected byte[] convertNonDictionaryValue(int indexColIdx, byte[] value) {
- return value;
+ protected byte[] convertNonDictionaryValue(int indexColIdx, Object value) {
+ // no dictionary measure columns will be of original data, so convert it to bytes
+ if (DataTypeUtil.isPrimitiveColumn(indexColumns.get(indexColIdx).getDataType())) {
+ return CarbonUtil.getValueAsBytes(indexColumns.get(indexColIdx).getDataType(), value);
+ }
+ return (byte[]) value;
}
@Override
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
----------------------------------------------------------------------
diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
index cad9787..61bd036 100644
--- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
+++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.Predicate;
@@ -73,11 +74,14 @@ public class BloomDataMapWriter extends AbstractBloomDataMapWriter {
}
}
- protected byte[] convertNonDictionaryValue(int indexColIdx, byte[] value) {
+ protected byte[] convertNonDictionaryValue(int indexColIdx, Object value) {
if (DataTypes.VARCHAR == indexColumns.get(indexColIdx).getDataType()) {
- return DataConvertUtil.getRawBytesForVarchar(value);
+ return DataConvertUtil.getRawBytesForVarchar((byte[]) value);
+ } else if (DataTypeUtil.isPrimitiveColumn(indexColumns.get(indexColIdx).getDataType())) {
+ // get bytes for the original value of the no dictionary column
+ return CarbonUtil.getValueAsBytes(indexColumns.get(indexColIdx).getDataType(), value);
} else {
- return DataConvertUtil.getRawBytes(value);
+ return DataConvertUtil.getRawBytes((byte[]) value);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index 7cd241a..5525941 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -245,7 +245,7 @@ public class StoreCreator {
date.setEncodingList(encodings);
date.setColumnUniqueId(UUID.randomUUID().toString());
date.setDimensionColumn(true);
- date.setColumnReferenceId(id.getColumnUniqueId());
+ date.setColumnReferenceId(date.getColumnUniqueId());
date.setSchemaOrdinal(schemaOrdinal++);
if (sortColumns.contains(date.getColumnName())) {
date.setSortColumn(true);
@@ -263,7 +263,7 @@ public class StoreCreator {
if (sortColumns.contains(country.getColumnName())) {
country.setSortColumn(true);
}
- country.setColumnReferenceId(id.getColumnUniqueId());
+ country.setColumnReferenceId(country.getColumnUniqueId());
columnSchemas.add(country);
ColumnSchema name = new ColumnSchema();
@@ -276,7 +276,7 @@ public class StoreCreator {
if (sortColumns.contains(name.getColumnName())) {
name.setSortColumn(true);
}
- name.setColumnReferenceId(id.getColumnUniqueId());
+ name.setColumnReferenceId(name.getColumnUniqueId());
columnSchemas.add(name);
ColumnSchema phonetype = new ColumnSchema();
@@ -289,7 +289,7 @@ public class StoreCreator {
if (sortColumns.contains(phonetype.getColumnName())) {
phonetype.setSortColumn(true);
}
- phonetype.setColumnReferenceId(id.getColumnUniqueId());
+ phonetype.setColumnReferenceId(phonetype.getColumnUniqueId());
columnSchemas.add(phonetype);
ColumnSchema serialname = new ColumnSchema();
@@ -302,7 +302,7 @@ public class StoreCreator {
if (sortColumns.contains(serialname.getColumnName())) {
serialname.setSortColumn(true);
}
- serialname.setColumnReferenceId(id.getColumnUniqueId());
+ serialname.setColumnReferenceId(serialname.getColumnUniqueId());
columnSchemas.add(serialname);
ColumnSchema salary = new ColumnSchema();
salary.setColumnName("salary");
@@ -310,11 +310,13 @@ public class StoreCreator {
salary.setEncodingList(new ArrayList<Encoding>());
salary.setColumnUniqueId(UUID.randomUUID().toString());
salary.setDimensionColumn(false);
- salary.setColumnReferenceId(id.getColumnUniqueId());
+ salary.setColumnReferenceId(salary.getColumnUniqueId());
salary.setSchemaOrdinal(schemaOrdinal++);
columnSchemas.add(salary);
- tableSchema.setListOfColumns(columnSchemas);
+ // rearrange the column schema based on the sort order, if sort columns exists
+ List<ColumnSchema> columnSchemas1 = reArrangeColumnSchema(columnSchemas);
+ tableSchema.setListOfColumns(columnSchemas1);
SchemaEvolution schemaEvol = new SchemaEvolution();
schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>());
tableSchema.setSchemaEvolution(schemaEvol);
@@ -352,6 +354,29 @@ public class StoreCreator {
return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName());
}
+ private List<ColumnSchema> reArrangeColumnSchema(List<ColumnSchema> columnSchemas) {
+ List<ColumnSchema> newColumnSchema = new ArrayList<>(columnSchemas.size());
+ // add sort columns first
+ for (ColumnSchema columnSchema : columnSchemas) {
+ if (columnSchema.isSortColumn()) {
+ newColumnSchema.add(columnSchema);
+ }
+ }
+ // add other dimension columns
+ for (ColumnSchema columnSchema : columnSchemas) {
+ if (!columnSchema.isSortColumn() && columnSchema.isDimensionColumn()) {
+ newColumnSchema.add(columnSchema);
+ }
+ }
+ // add measure columns
+ for (ColumnSchema columnSchema : columnSchemas) {
+ if (!columnSchema.isDimensionColumn()) {
+ newColumnSchema.add(columnSchema);
+ }
+ }
+ return newColumnSchema;
+ }
+
private void writeDictionary(String factFilePath, CarbonTable table) throws Exception {
BufferedReader reader = new BufferedReader(new InputStreamReader(
new FileInputStream(factFilePath), "UTF-8"));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index be40b13..e810829 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -86,7 +86,9 @@ object DataLoadProcessBuilderOnSpark {
val sortParameters = SortParameters.createSortParameters(configuration)
val rowComparator: Comparator[Array[AnyRef]] =
if (sortParameters.getNoDictionaryCount > 0) {
- new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn)
+ new NewRowComparator(sortParameters.getNoDictionaryDimnesionColumn,
+ sortParameters.getNoDictionarySortColumn,
+ sortParameters.getNoDictDataType)
} else {
new NewRowComparatorForNormalDims(sortParameters.getDimColCount)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index c97732a..727191c 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -805,7 +805,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
model.setSegmentId("0")
store.createCarbonStore(model)
FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_0/0"))
- store.setSortColumns(new util.ArrayList[String](Seq("country,phonetype").asJava))
+ store.setSortColumns(new util.ArrayList[String](Seq("country","phonetype").asJava))
model = store.createTableAndLoadModel(false)
model.setSegmentId("1")
store.createCarbonStore(model)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 0584fb1..1897c87 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -52,7 +52,7 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSch
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonUtil, TaskMetricsMap}
+import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil, TaskMetricsMap}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.datamap.bloom.DataConvertUtil
import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
@@ -264,8 +264,17 @@ class RawBytesReadSupport(segmentProperties: SegmentProperties, indexColumns: Ar
rtn(i) = if (indexCol2IdxInDictArray.contains(col.getColName)) {
surrogatKeys(indexCol2IdxInDictArray(col.getColName)).toInt.asInstanceOf[Integer]
} else if (indexCol2IdxInNoDictArray.contains(col.getColName)) {
- data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex(
+ val bytes = data(0).asInstanceOf[ByteArrayWrapper].getNoDictionaryKeyByIndex(
indexCol2IdxInNoDictArray(col.getColName))
+ // no dictionary primitive columns are expected to be in original data while loading,
+ // so convert it to original data
+ if (DataTypeUtil.isPrimitiveColumn(col.getDataType)) {
+ val dataFromBytes = DataTypeUtil
+ .getDataBasedOnDataTypeForNoDictionaryColumn(bytes, col.getDataType)
+ dataFromBytes
+ } else {
+ bytes
+ }
} else {
// measures start from 1
val value = data(1 + indexCol2IdxInMeasureArray(col.getColName))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
index ad6823d..fcb6110 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -42,10 +42,10 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA
assertResult(2)(result.length)
assertResult("table_info1")(result(0).getString(0))
- // 2087 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata
- assertResult(2216)(result(0).getLong(1))
+ // 2220 is the size of carbon table. Note that since 1.5.0, we add additional compressor name in metadata
+ assertResult(2220)(result(0).getLong(1))
assertResult("table_info2")(result(1).getString(0))
- assertResult(2216)(result(1).getLong(1))
+ assertResult(2220)(result(1).getLong(1))
}
override def afterAll: Unit = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 4d85296..616edeb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.BucketingInfo;
import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
@@ -339,6 +340,45 @@ public class CarbonDataLoadConfiguration {
return type;
}
+ /**
+ * Get the data types of the no dictionary and the complex dimensions of the table
+ *
+ * @return
+ */
+ public CarbonColumn[] getNoDictAndComplexDimensions() {
+ List<Integer> noDicOrCompIndexes = new ArrayList<>(dataFields.length);
+ int noDicCount = 0;
+ for (int i = 0; i < dataFields.length; i++) {
+ if (dataFields[i].getColumn().isDimension() && (
+ !(dataFields[i].getColumn().hasEncoding(Encoding.DICTIONARY)) || dataFields[i].getColumn()
+ .isComplex())) {
+ noDicOrCompIndexes.add(i);
+ noDicCount++;
+ }
+ }
+
+ CarbonColumn[] dims = new CarbonColumn[noDicCount];
+ for (int i = 0; i < dims.length; i++) {
+ dims[i] = dataFields[noDicOrCompIndexes.get(i)].getColumn();
+ }
+ return dims;
+ }
+
+ /**
+ * Get the sort column mapping of the table
+ *
+ * @return
+ */
+ public boolean[] getSortColumnMapping() {
+ boolean[] sortColumnMapping = new boolean[dataFields.length];
+ for (int i = 0; i < sortColumnMapping.length; i++) {
+ if (dataFields[i].getColumn().getColumnSchema().isSortColumn()) {
+ sortColumnMapping[i] = true;
+ }
+ }
+ return sortColumnMapping;
+ }
+
public int[] calcDimensionLengths() {
int[] dimLensWithComplex = getCardinalityFinder().getCardinality();
if (!isSortTable()) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 86f273d..7dfe95f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.datatypes.ArrayDataType;
import org.apache.carbondata.processing.datatypes.GenericDataType;
import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
@@ -111,6 +112,11 @@ public class FieldEncoderFactory {
createComplexDataType(dataField, absoluteTableIdentifier,
client, useOnePass, localCache, index, nullFormat, isEmptyBadRecord), index);
} else {
+ // if the no dictionary column is a numeric column then treat is as measure col
+ // so that the adaptive encoding can be applied on it easily
+ if (DataTypeUtil.isPrimitiveColumn(dataField.getColumn().getDataType())) {
+ return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
+ }
return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
}
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
index 9cbd607..20278e4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -20,8 +20,6 @@ import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.loading.DataField;
import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
@@ -39,10 +37,6 @@ public class MeasureFieldConverterImpl implements FieldConverter {
private int index;
- private DataType dataType;
-
- private CarbonMeasure measure;
-
private String nullformat;
private boolean isEmptyBadRecord;
@@ -51,8 +45,6 @@ public class MeasureFieldConverterImpl implements FieldConverter {
public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index,
boolean isEmptyBadRecord) {
- this.dataType = dataField.getColumn().getDataType();
- this.measure = (CarbonMeasure) dataField.getColumn();
this.nullformat = nullformat;
this.index = index;
this.isEmptyBadRecord = isEmptyBadRecord;
@@ -73,20 +65,20 @@ public class MeasureFieldConverterImpl implements FieldConverter {
Object output;
boolean isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(literalValue);
if (literalValue == null || isNull) {
- String message = logHolder.getColumnMessageMap().get(measure.getColName());
+ String message = logHolder.getColumnMessageMap().get(dataField.getColumn().getColName());
if (null == message) {
- message = CarbonDataProcessorUtil
- .prepareFailureReason(measure.getColName(), measure.getDataType());
- logHolder.getColumnMessageMap().put(measure.getColName(), message);
+ message = CarbonDataProcessorUtil.prepareFailureReason(dataField.getColumn().getColName(),
+ dataField.getColumn().getDataType());
+ logHolder.getColumnMessageMap().put(dataField.getColumn().getColName(), message);
}
return null;
} else if (literalValue.length() == 0) {
if (isEmptyBadRecord) {
- String message = logHolder.getColumnMessageMap().get(measure.getColName());
+ String message = logHolder.getColumnMessageMap().get(dataField.getColumn().getColName());
if (null == message) {
- message = CarbonDataProcessorUtil
- .prepareFailureReason(measure.getColName(), measure.getDataType());
- logHolder.getColumnMessageMap().put(measure.getColName(), message);
+ message = CarbonDataProcessorUtil.prepareFailureReason(dataField.getColumn().getColName(),
+ dataField.getColumn().getDataType());
+ logHolder.getColumnMessageMap().put(dataField.getColumn().getColName(), message);
}
logHolder.setReason(message);
}
@@ -96,18 +88,24 @@ public class MeasureFieldConverterImpl implements FieldConverter {
} else {
try {
if (dataField.isUseActualData()) {
- output =
- DataTypeUtil.getMeasureValueBasedOnDataType(literalValue, dataType, measure, true);
+ output = DataTypeUtil
+ .getMeasureValueBasedOnDataType(literalValue, dataField.getColumn().getDataType(),
+ dataField.getColumn().getColumnSchema().getScale(),
+ dataField.getColumn().getColumnSchema().getPrecision(), true);
} else {
- output = DataTypeUtil.getMeasureValueBasedOnDataType(literalValue, dataType, measure);
+ output = DataTypeUtil
+ .getMeasureValueBasedOnDataType(literalValue, dataField.getColumn().getDataType(),
+ dataField.getColumn().getColumnSchema().getScale(),
+ dataField.getColumn().getColumnSchema().getPrecision());
}
return output;
} catch (NumberFormatException e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Can not convert value to Numeric type value. Value considered as null.");
}
- logHolder.setReason(
- CarbonDataProcessorUtil.prepareFailureReason(measure.getColName(), dataType));
+ logHolder.setReason(CarbonDataProcessorUtil
+ .prepareFailureReason(dataField.getColumn().getColName(),
+ dataField.getColumn().getDataType()));
return null;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
index 64b64f5..3a325a4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/partition/impl/RawRowComparator.java
@@ -21,7 +21,10 @@ import java.util.Comparator;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
/**
* comparator for the converted row. The row has not been rearranged as 3-parted yet.
@@ -30,23 +33,38 @@ import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
public class RawRowComparator implements Comparator<CarbonRow> {
private int[] sortColumnIndices;
private boolean[] isSortColumnNoDict;
+ private DataType[] noDicDataTypes;
- public RawRowComparator(int[] sortColumnIndices, boolean[] isSortColumnNoDict) {
+ public RawRowComparator(int[] sortColumnIndices, boolean[] isSortColumnNoDict,
+ DataType[] noDicDataTypes) {
this.sortColumnIndices = sortColumnIndices;
this.isSortColumnNoDict = isSortColumnNoDict;
+ this.noDicDataTypes = noDicDataTypes;
}
@Override
public int compare(CarbonRow o1, CarbonRow o2) {
int diff = 0;
int i = 0;
+ int noDicIdx = 0;
for (int colIdx : sortColumnIndices) {
if (isSortColumnNoDict[i]) {
- byte[] colA = (byte[]) o1.getObject(colIdx);
- byte[] colB = (byte[]) o2.getObject(colIdx);
- diff = UnsafeComparer.INSTANCE.compareTo(colA, colB);
- if (diff != 0) {
- return diff;
+ if (DataTypeUtil.isPrimitiveColumn(noDicDataTypes[noDicIdx])) {
+ // for no dictionary numeric column get comparator based on the data type
+ SerializableComparator comparator = org.apache.carbondata.core.util.comparator.Comparator
+ .getComparator(noDicDataTypes[noDicIdx]);
+ int difference = comparator.compare(o1.getObject(colIdx), o2.getObject(colIdx));
+ if (difference != 0) {
+ return difference;
+ }
+ noDicIdx++;
+ } else {
+ byte[] colA = (byte[]) o1.getObject(colIdx);
+ byte[] colB = (byte[]) o2.getObject(colIdx);
+ diff = UnsafeComparer.INSTANCE.compareTo(colA, colB);
+ if (diff != 0) {
+ return diff;
+ }
}
} else {
int colA = (int) o1.getObject(colIdx);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
index 1ad7879..844e45e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
@@ -23,7 +23,7 @@ package org.apache.carbondata.processing.loading.row;
*/
public class IntermediateSortTempRow {
private int[] dictSortDims;
- private byte[][] noDictSortDims;
+ private Object[] noDictSortDims;
/**
* this will be used for intermediate merger when
* no sort field and measure field will not be
@@ -35,14 +35,14 @@ public class IntermediateSortTempRow {
*/
private Object[] measures;
- public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
+ public IntermediateSortTempRow(int[] dictSortDims, Object[] noDictSortDims,
byte[] noSortDimsAndMeasures) {
this.dictSortDims = dictSortDims;
this.noDictSortDims = noDictSortDims;
this.noSortDimsAndMeasures = noSortDimsAndMeasures;
}
- public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
+ public IntermediateSortTempRow(int[] dictSortDims, Object[] noDictSortDims,
Object[] measures) {
this.dictSortDims = dictSortDims;
this.noDictSortDims = noDictSortDims;
@@ -57,7 +57,7 @@ public class IntermediateSortTempRow {
return measures;
}
- public byte[][] getNoDictSortDims() {
+ public Object[] getNoDictSortDims() {
return noDictSortDims;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index 697f590..edfd317 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -23,10 +23,13 @@ import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.memory.CarbonUnsafe;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.CarbonUnsafeUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
@@ -65,6 +68,14 @@ public class SortStepRowHandler implements Serializable {
private DataType[] dataTypes;
+ private DataType[] noDictSortDataTypes;
+
+ private boolean[] noDictSortColMapping;
+
+ private DataType[] noDictNoSortDataTypes;
+
+ private boolean[] noDictNoSortColMapping;
+
/**
* constructor
* @param tableFieldStat table field stat
@@ -85,6 +96,16 @@ public class SortStepRowHandler implements Serializable {
this.complexDimIdx = tableFieldStat.getComplexDimIdx();
this.measureIdx = tableFieldStat.getMeasureIdx();
this.dataTypes = tableFieldStat.getMeasureDataType();
+ this.noDictSortDataTypes = tableFieldStat.getNoDictSortDataType();
+ noDictSortColMapping = new boolean[noDictSortDataTypes.length];
+ for (int i = 0; i < noDictSortDataTypes.length; i++) {
+ noDictSortColMapping[i] = DataTypeUtil.isPrimitiveColumn(noDictSortDataTypes[i]);
+ }
+ this.noDictNoSortDataTypes = tableFieldStat.getNoDictNoSortDataType();
+ noDictNoSortColMapping = new boolean[noDictNoSortDataTypes.length];
+ for (int i = 0; i < noDictNoSortDataTypes.length; i++) {
+ noDictNoSortColMapping[i] = DataTypeUtil.isPrimitiveColumn(noDictNoSortDataTypes[i]);
+ }
}
/**
@@ -108,8 +129,8 @@ public class SortStepRowHandler implements Serializable {
try {
int[] dictDims
= new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
- byte[][] nonDictArray = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt
- + this.varcharDimCnt + this.complexDimCnt ][];
+ Object[] nonDictArray = new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt
+ + this.varcharDimCnt + this.complexDimCnt];
Object[] measures = new Object[this.measureCnt];
// convert dict & data
@@ -125,19 +146,19 @@ public class SortStepRowHandler implements Serializable {
// convert no-dict & sort
idxAcc = 0;
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
- nonDictArray[idxAcc++] = (byte[]) row[this.noDictSortDimIdx[idx]];
+ nonDictArray[idxAcc++] = row[this.noDictSortDimIdx[idx]];
}
// convert no-dict & no-sort
for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
- nonDictArray[idxAcc++] = (byte[]) row[this.noDictNoSortDimIdx[idx]];
+ nonDictArray[idxAcc++] = row[this.noDictNoSortDimIdx[idx]];
}
// convert varchar dims
for (int idx = 0; idx < this.varcharDimCnt; idx++) {
- nonDictArray[idxAcc++] = (byte[]) row[this.varcharDimIdx[idx]];
+ nonDictArray[idxAcc++] = row[this.varcharDimIdx[idx]];
}
// convert complex dims
for (int idx = 0; idx < this.complexDimCnt; idx++) {
- nonDictArray[idxAcc++] = (byte[]) row[this.complexDimIdx[idx]];
+ nonDictArray[idxAcc++] = row[this.complexDimIdx[idx]];
}
// convert measure data
@@ -178,7 +199,7 @@ public class SortStepRowHandler implements Serializable {
public IntermediateSortTempRow readWithoutNoSortFieldConvert(
DataInputStream inputStream) throws IOException {
int[] dictSortDims = new int[this.dictSortDimCnt];
- byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
+ Object[] noDictSortDims = new Object[this.noDictSortDimCnt];
// read dict & sort dim data
for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
@@ -187,10 +208,8 @@ public class SortStepRowHandler implements Serializable {
// read no-dict & sort data
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
- short len = inputStream.readShort();
- byte[] bytes = new byte[len];
- inputStream.readFully(bytes);
- noDictSortDims[idx] = bytes;
+ // for no dict measure column get the original data
+ noDictSortDims[idx] = getDataForNoDictSortColumn(inputStream, idx);
}
// read no-dict dims & measures
@@ -213,9 +232,9 @@ public class SortStepRowHandler implements Serializable {
public IntermediateSortTempRow readWithNoSortFieldConvert(
DataInputStream inputStream) throws IOException {
int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
- byte[][] noDictSortDims =
- new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
- + this.complexDimCnt][];
+ Object[] noDictSortDims =
+ new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
+ + this.complexDimCnt];
// read dict & sort dim data
for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
@@ -224,10 +243,8 @@ public class SortStepRowHandler implements Serializable {
// read no-dict & sort data
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
- short len = inputStream.readShort();
- byte[] bytes = new byte[len];
- inputStream.readFully(bytes);
- noDictSortDims[idx] = bytes;
+ // for no dict measure column get the original data
+ noDictSortDims[idx] = getDataForNoDictSortColumn(inputStream, idx);
}
// read no-dict dims & measures
@@ -240,8 +257,63 @@ public class SortStepRowHandler implements Serializable {
return new IntermediateSortTempRow(dictSortDims, noDictSortDims,measure);
}
+ /**
+ * Return the data from the stream according to the column type
+ *
+ * @param inputStream
+ * @param idx
+ * @throws IOException
+ */
+ private Object getDataForNoDictSortColumn(DataInputStream inputStream, int idx)
+ throws IOException {
+ if (this.noDictSortColMapping[idx]) {
+ return readDataFromStream(inputStream, idx);
+ } else {
+ short len = inputStream.readShort();
+ byte[] bytes = new byte[len];
+ inputStream.readFully(bytes);
+ return bytes;
+ }
+ }
+
+ /**
+ * Read the data from the stream
+ *
+ * @param inputStream
+ * @param idx
+ * @return
+ * @throws IOException
+ */
+ private Object readDataFromStream(DataInputStream inputStream, int idx) throws IOException {
+ DataType dataType = noDictSortDataTypes[idx];
+ Object data = null;
+ if (!inputStream.readBoolean()) {
+ return null;
+ }
+ if (dataType == DataTypes.BOOLEAN) {
+ data = inputStream.readBoolean();
+ } else if (dataType == DataTypes.BYTE) {
+ data = inputStream.readByte();
+ } else if (dataType == DataTypes.SHORT) {
+ data = inputStream.readShort();
+ } else if (dataType == DataTypes.INT) {
+ data = inputStream.readInt();
+ } else if (dataType == DataTypes.LONG) {
+ data = inputStream.readLong();
+ } else if (dataType == DataTypes.DOUBLE) {
+ data = inputStream.readDouble();
+ } else if (dataType == DataTypes.FLOAT) {
+ data = inputStream.readFloat();
+ } else if (dataType == DataTypes.BYTE_ARRAY || DataTypes.isDecimal(dataType)) {
+ byte[] bytes =
+ inputStream.readUTF().getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
+ data = bytes;
+ }
+ return data;
+ }
+
private void unpackNoSortFromBytes(byte[] noSortDimsAndMeasures, int[] dictDims,
- byte[][] noDictDims, Object[] measures) {
+ Object[] noDictDims, Object[] measures) {
ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
// read dict_no_sort
for (int i = dictSortDimCnt; i < dictDims.length; i++) {
@@ -251,10 +323,15 @@ public class SortStepRowHandler implements Serializable {
int noDictIndex = noDictSortDimCnt;
// read no_dict_no_sort
for (int i = 0; i < noDictNoSortDimCnt; i++) {
- short len = rowBuffer.getShort();
- byte[] bytes = new byte[len];
- rowBuffer.get(bytes);
- noDictDims[noDictIndex++] = bytes;
+ // for no dict measure column get the original data
+ if (this.noDictNoSortColMapping[i]) {
+ noDictDims[noDictIndex++] = getDataFromRowBuffer(noDictNoSortDataTypes[i], rowBuffer);
+ } else {
+ short len = rowBuffer.getShort();
+ byte[] bytes = new byte[len];
+ rowBuffer.get(bytes);
+ noDictDims[noDictIndex++] = bytes;
+ }
}
// read varchar dims
@@ -275,39 +352,49 @@ public class SortStepRowHandler implements Serializable {
// read measure
int measureCnt = measures.length;
- DataType tmpDataType;
Object tmpContent;
for (short idx = 0 ; idx < measureCnt; idx++) {
- if ((byte) 0 == rowBuffer.get()) {
- measures[idx] = null;
- continue;
- }
+ tmpContent = getDataFromRowBuffer(dataTypes[idx], rowBuffer);
+ measures[idx] = tmpContent;
+ }
+ }
- tmpDataType = dataTypes[idx];
- if (DataTypes.BOOLEAN == tmpDataType) {
- if ((byte) 1 == rowBuffer.get()) {
- tmpContent = true;
- } else {
- tmpContent = false;
- }
- } else if (DataTypes.SHORT == tmpDataType) {
- tmpContent = rowBuffer.getShort();
- } else if (DataTypes.INT == tmpDataType) {
- tmpContent = rowBuffer.getInt();
- } else if (DataTypes.LONG == tmpDataType) {
- tmpContent = rowBuffer.getLong();
- } else if (DataTypes.DOUBLE == tmpDataType) {
- tmpContent = rowBuffer.getDouble();
- } else if (DataTypes.isDecimal(tmpDataType)) {
- short len = rowBuffer.getShort();
- byte[] decimalBytes = new byte[len];
- rowBuffer.get(decimalBytes);
- tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
+ /**
+ * Retrieve/Get the data from the row buffer.
+ *
+ * @param tmpDataType
+ * @param rowBuffer
+ * @return
+ */
+ private Object getDataFromRowBuffer(DataType tmpDataType, ByteBuffer rowBuffer) {
+ Object tmpContent;
+ if ((byte) 0 == rowBuffer.get()) {
+ return null;
+ }
+
+ if (DataTypes.BOOLEAN == tmpDataType) {
+ if ((byte) 1 == rowBuffer.get()) {
+ tmpContent = true;
} else {
- throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
+ tmpContent = false;
}
- measures[idx] = tmpContent;
+ } else if (DataTypes.SHORT == tmpDataType) {
+ tmpContent = rowBuffer.getShort();
+ } else if (DataTypes.INT == tmpDataType) {
+ tmpContent = rowBuffer.getInt();
+ } else if (DataTypes.LONG == tmpDataType) {
+ tmpContent = rowBuffer.getLong();
+ } else if (DataTypes.DOUBLE == tmpDataType) {
+ tmpContent = rowBuffer.getDouble();
+ } else if (DataTypes.isDecimal(tmpDataType)) {
+ short len = rowBuffer.getShort();
+ byte[] decimalBytes = new byte[len];
+ rowBuffer.get(decimalBytes);
+ tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
+ } else {
+ throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
}
+ return tmpContent;
}
/**
@@ -327,9 +414,14 @@ public class SortStepRowHandler implements Serializable {
// write no-dict & sort dim
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
- byte[] bytes = sortTempRow.getNoDictSortDims()[idx];
- outputStream.writeShort(bytes.length);
- outputStream.write(bytes);
+ if (this.noDictSortColMapping[idx]) {
+ // write the original data to the stream
+ writeDataToStream(sortTempRow.getNoDictSortDims()[idx], outputStream, idx);
+ } else {
+ byte[] bytes = (byte[]) sortTempRow.getNoDictSortDims()[idx];
+ outputStream.writeShort(bytes.length);
+ outputStream.write(bytes);
+ }
}
// write packed no-sort dim & measure
@@ -359,9 +451,14 @@ public class SortStepRowHandler implements Serializable {
// write no-dict & sort
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
- byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
- outputStream.writeShort(bytes.length);
- outputStream.write(bytes);
+ if (this.noDictSortColMapping[idx]) {
+ // write the original data to the stream
+ writeDataToStream(row[this.noDictSortDimIdx[idx]], outputStream, idx);
+ } else {
+ byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
+ outputStream.writeShort(bytes.length);
+ outputStream.write(bytes);
+ }
}
// pack no-sort
@@ -376,6 +473,46 @@ public class SortStepRowHandler implements Serializable {
}
/**
+ * Write the data to stream
+ *
+ * @param data
+ * @param outputStream
+ * @param idx
+ * @throws IOException
+ */
+ private void writeDataToStream(Object data, DataOutputStream outputStream, int idx)
+ throws IOException {
+ DataType dataType = noDictSortDataTypes[idx];
+ if (null == data) {
+ outputStream.writeBoolean(false);
+ } else {
+ outputStream.writeBoolean(true);
+ if (dataType == DataTypes.BOOLEAN) {
+ outputStream.writeBoolean((boolean) data);
+ } else if (dataType == DataTypes.BYTE) {
+ outputStream.writeByte((byte) data);
+ } else if (dataType == DataTypes.SHORT) {
+ outputStream.writeShort((short) data);
+ } else if (dataType == DataTypes.INT) {
+ outputStream.writeInt((int) data);
+ } else if (dataType == DataTypes.LONG) {
+ outputStream.writeLong((long) data);
+ } else if (dataType == DataTypes.DOUBLE) {
+ outputStream.writeDouble((double) data);
+ } else if (DataTypes.isDecimal(dataType)) {
+ BigDecimal val = (BigDecimal) data;
+ byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+ outputStream.writeShort(bigDecimalInBytes.length);
+ outputStream.write(bigDecimalInBytes);
+ } else if (dataType == DataTypes.FLOAT) {
+ outputStream.writeFloat((float) data);
+ } else if (dataType == DataTypes.BYTE_ARRAY) {
+ outputStream.writeUTF(data.toString());
+ }
+ }
+ }
+
+ /**
* Read intermediate sort temp row from unsafe memory.
* This method is used during merge sort phase for off-heap sort.
*
@@ -430,9 +567,9 @@ public class SortStepRowHandler implements Serializable {
int size = 0;
int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
- byte[][] noDictSortDims =
- new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
- + this.complexDimCnt][];
+ Object[] noDictSortDims =
+ new Object[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
+ + this.complexDimCnt];
// read dict & sort dim
for (int idx = 0; idx < dictSortDimCnt; idx++) {
@@ -444,11 +581,24 @@ public class SortStepRowHandler implements Serializable {
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
size += 2;
- byte[] bytes = new byte[length];
- CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
- bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
- size += length;
- noDictSortDims[idx] = bytes;
+ if (this.noDictSortColMapping[idx]) {
+ // get the original data from the unsafe memory
+ if (0 == length) {
+ // if the length is 0, the the data is null
+ noDictSortDims[idx] = null;
+ } else {
+ Object data = CarbonUnsafeUtil
+ .getDataFromUnsafe(noDictSortDataTypes[idx], baseObject, address, size, length);
+ size += length;
+ noDictSortDims[idx] = data;
+ }
+ } else {
+ byte[] bytes = new byte[length];
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+ size += length;
+ noDictSortDims[idx] = bytes;
+ }
}
// read no-sort dims & measures
@@ -487,13 +637,26 @@ public class SortStepRowHandler implements Serializable {
for (int idx = 0; idx < noDictSortDimCnt; idx++) {
short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
size += 2;
- byte[] bytes = new byte[length];
- CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
- bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
- size += length;
+ if (this.noDictSortColMapping[idx]) {
+ // get the original data from unsafe memory
+ if (0 == length) {
+ // if the length is 0, then the data is null
+ writeDataToStream(null, outputStream, idx);
+ } else {
+ Object data = CarbonUnsafeUtil
+ .getDataFromUnsafe(noDictSortDataTypes[idx], baseObject, address, size, length);
+ size += length;
+ writeDataToStream(data, outputStream, idx);
+ }
+ } else {
+ byte[] bytes = new byte[length];
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObject, address + size, bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+ size += length;
- outputStream.writeShort(length);
- outputStream.write(bytes);
+ outputStream.writeShort(length);
+ outputStream.write(bytes);
+ }
}
// packed no-sort & measure
@@ -534,13 +697,31 @@ public class SortStepRowHandler implements Serializable {
// write no-dict & sort
for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
- byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
- CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length);
- size += 2;
- CarbonUnsafe.getUnsafe()
- .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
- bytes.length);
- size += bytes.length;
+ if (this.noDictSortColMapping[idx]) {
+ Object data = row[this.noDictSortDimIdx[idx]];
+ if (null == data) {
+ // if the data is null, then write only the length as 0.
+ CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) 0);
+ size += 2;
+ } else {
+ int sizeInBytes = this.noDictSortDataTypes[idx].getSizeInBytes();
+ CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) sizeInBytes);
+ size += 2;
+ // put data to unsafe according to the data types
+ CarbonUnsafeUtil
+ .putDataToUnsafe(noDictSortDataTypes[idx], data, baseObject, address, size,
+ sizeInBytes);
+ size += sizeInBytes;
+ }
+ } else {
+ byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
+ CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length);
+ size += 2;
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
+ bytes.length);
+ size += bytes.length;
+ }
}
// convert pack no-sort
@@ -574,9 +755,15 @@ public class SortStepRowHandler implements Serializable {
}
// convert no-dict & no-sort
for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
- byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]];
- rowBuffer.putShort((short) bytes.length);
- rowBuffer.put(bytes);
+ if (this.noDictNoSortColMapping[idx]) {
+ // put the original data to buffer
+ putDataToRowBuffer(this.noDictNoSortDataTypes[idx], row[this.noDictNoSortDimIdx[idx]],
+ rowBuffer);
+ } else {
+ byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]];
+ rowBuffer.putShort((short) bytes.length);
+ rowBuffer.put(bytes);
+ }
}
// convert varchar dims
for (int idx = 0; idx < this.varcharDimCnt; idx++) {
@@ -592,37 +779,45 @@ public class SortStepRowHandler implements Serializable {
}
// convert measure
- Object tmpValue;
- DataType tmpDataType;
for (int idx = 0; idx < this.measureCnt; idx++) {
- tmpValue = row[this.measureIdx[idx]];
- tmpDataType = this.dataTypes[idx];
- if (null == tmpValue) {
- rowBuffer.put((byte) 0);
- continue;
- }
- rowBuffer.put((byte) 1);
- if (DataTypes.BOOLEAN == tmpDataType) {
- if ((boolean) tmpValue) {
- rowBuffer.put((byte) 1);
- } else {
- rowBuffer.put((byte) 0);
- }
- } else if (DataTypes.SHORT == tmpDataType) {
- rowBuffer.putShort((Short) tmpValue);
- } else if (DataTypes.INT == tmpDataType) {
- rowBuffer.putInt((Integer) tmpValue);
- } else if (DataTypes.LONG == tmpDataType) {
- rowBuffer.putLong((Long) tmpValue);
- } else if (DataTypes.DOUBLE == tmpDataType) {
- rowBuffer.putDouble((Double) tmpValue);
- } else if (DataTypes.isDecimal(tmpDataType)) {
- byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue);
- rowBuffer.putShort((short) decimalBytes.length);
- rowBuffer.put(decimalBytes);
+ putDataToRowBuffer(this.dataTypes[idx], row[this.measureIdx[idx]], rowBuffer);
+ }
+ }
+
+ /**
+ * Put the data to the row buffer
+ *
+ * @param tmpDataType
+ * @param tmpValue
+ * @param rowBuffer
+ */
+ private void putDataToRowBuffer(DataType tmpDataType, Object tmpValue, ByteBuffer rowBuffer) {
+ if (null == tmpValue) {
+ rowBuffer.put((byte) 0);
+ return;
+ }
+ rowBuffer.put((byte) 1);
+ if (DataTypes.BOOLEAN == tmpDataType) {
+ if ((boolean) tmpValue) {
+ rowBuffer.put((byte) 1);
} else {
- throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
+ rowBuffer.put((byte) 0);
}
+ } else if (DataTypes.SHORT == tmpDataType) {
+ rowBuffer.putShort((Short) tmpValue);
+ } else if (DataTypes.INT == tmpDataType) {
+ rowBuffer.putInt((Integer) tmpValue);
+ } else if (DataTypes.LONG == tmpDataType) {
+ rowBuffer.putLong((Long) tmpValue);
+ } else if (DataTypes.DOUBLE == tmpDataType) {
+ rowBuffer.putDouble((Double) tmpValue);
+ } else if (DataTypes.isDecimal(tmpDataType)) {
+ byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue);
+ rowBuffer.putShort((short) decimalBytes.length);
+ rowBuffer.put(decimalBytes);
+ } else {
+ throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
index 8f29cee..b0109fa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -20,7 +20,11 @@ package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
import java.util.Comparator;
import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.CarbonUnsafeUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
@@ -52,6 +56,7 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
long rowA = rowL.address;
long rowB = rowR.address;
int sizeInDictPartA = 0;
+ int noDicSortIdx = 0;
int sizeInNonDictPartA = 0;
int sizeInDictPartB = 0;
@@ -60,25 +65,50 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
if (isNoDictionary) {
short lengthA = CarbonUnsafe.getUnsafe().getShort(baseObjectL,
rowA + dictSizeInMemory + sizeInNonDictPartA);
- byte[] byteArr1 = new byte[lengthA];
sizeInNonDictPartA += 2;
- CarbonUnsafe.getUnsafe()
- .copyMemory(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA,
- byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA);
- sizeInNonDictPartA += lengthA;
-
short lengthB = CarbonUnsafe.getUnsafe().getShort(baseObjectR,
rowB + dictSizeInMemory + sizeInNonDictPartB);
- byte[] byteArr2 = new byte[lengthB];
sizeInNonDictPartB += 2;
- CarbonUnsafe.getUnsafe()
- .copyMemory(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB,
- byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB);
- sizeInNonDictPartB += lengthB;
+ DataType dataType = tableFieldStat.getNoDictDataType()[noDicSortIdx++];
+ if (DataTypeUtil.isPrimitiveColumn(dataType)) {
+ Object data1 = null;
+ if (0 != lengthA) {
+ data1 = CarbonUnsafeUtil
+ .getDataFromUnsafe(dataType, baseObjectL, rowA + dictSizeInMemory,
+ sizeInNonDictPartA, lengthA);
+ sizeInNonDictPartA += lengthA;
+ }
+ Object data2 = null;
+ if (0 != lengthB) {
+ data2 = CarbonUnsafeUtil
+ .getDataFromUnsafe(dataType, baseObjectR, rowB + dictSizeInMemory,
+ sizeInNonDictPartB, lengthB);
+ sizeInNonDictPartB += lengthB;
+ }
+ // use the data type based comparator for the no dictionary encoded columns
+ SerializableComparator comparator =
+ org.apache.carbondata.core.util.comparator.Comparator.getComparator(dataType);
+ int difference = comparator.compare(data1, data2);
+ if (difference != 0) {
+ return difference;
+ }
+ } else {
+ byte[] byteArr1 = new byte[lengthA];
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA, byteArr1,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA);
+ sizeInNonDictPartA += lengthA;
+
+ byte[] byteArr2 = new byte[lengthB];
+ CarbonUnsafe.getUnsafe()
+ .copyMemory(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB, byteArr2,
+ CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB);
+ sizeInNonDictPartB += lengthB;
- int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
- if (difference != 0) {
- return difference;
+ int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+ if (difference != 0) {
+ return difference;
+ }
}
} else {
int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeInDictPartA);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index 102b057..b805d37 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
@@ -43,6 +44,8 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
private IntermediateSortTempRow currentRow;
+ private DataType[] noDictDataType;
+
public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
boolean[] noDictSortColumnMapping) {
this.actualSize = merger.getEntryCount();
@@ -52,8 +55,10 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
for (UnsafeCarbonRowPage rowPage: rowPages) {
rowPage.setReadConvertedNoSortField();
}
+ this.noDictDataType = rowPages[0].getTableFieldStat().getNoDictDataType();
LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
- this.comparator = new IntermediateSortTempRowComparator(noDictSortColumnMapping);
+ this.comparator =
+ new IntermediateSortTempRowComparator(noDictSortColumnMapping, noDictDataType);
}
public boolean hasNext() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index 02ffd68..baa9e71 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -45,7 +45,8 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
this.rowPage = rowPage;
LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
this.comparator = new IntermediateSortTempRowComparator(
- rowPage.getTableFieldStat().getIsSortColNoDictFlags());
+ rowPage.getTableFieldStat().getIsSortColNoDictFlags(),
+ rowPage.getTableFieldStat().getNoDictDataType());
this.rowPage.setReadConvertedNoSortField();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 7c3c056..a991d4c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -109,7 +109,8 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
this.tableFieldStat = new TableFieldStat(parameters);
this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
this.executorService = Executors.newFixedThreadPool(1);
- comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn());
+ comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn(),
+ parameters.getNoDictDataType());
this.convertNoSortFields = convertNoSortFields;
initialize();
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index ac13d24..7683bbc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -273,19 +273,19 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
int dictIndex = 0;
int nonDicIndex = 0;
int[] dim = new int[this.dimensionCount];
- byte[][] nonDicArray = new byte[this.noDictWithComplextCount][];
+ Object[] nonDicArray = new Object[this.noDictWithComplextCount];
// read dimension values
int dimCount = 0;
for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
if (isNoDictionaryDimensionColumn[dimCount]) {
- nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount);
+ nonDicArray[nonDicIndex++] = row.getObject(dimCount);
} else {
dim[dictIndex++] = (int) row.getObject(dimCount);
}
}
for (; dimCount < this.dimensionWithComplexCount; dimCount++) {
- nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount);
+ nonDicArray[nonDicIndex++] = row.getObject(dimCount);
}
Object[] measures = new Object[measureCount];
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
index e3bc97f..ae9ec3d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.processing.loading.partition.impl.RangePartitionerI
import org.apache.carbondata.processing.loading.partition.impl.RawRowComparator;
import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.commons.lang3.StringUtils;
@@ -134,12 +135,16 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
// sort the range bounds (sort in carbon is a little different from what we think)
Arrays.sort(convertedSortColumnRanges,
new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
- sortColumnRangeInfo.getIsSortColumnNoDict()));
+ sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil
+ .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
+ configuration.getTableIdentifier().getTableName())));
// range partitioner to dispatch rows by sort columns
this.partitioner = new RangePartitionerImpl(convertedSortColumnRanges,
new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
- sortColumnRangeInfo.getIsSortColumnNoDict()));
+ sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil
+ .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
+ configuration.getTableIdentifier().getTableName())));
}
// only convert sort column fields
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index ce8b62f..b921675 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -266,8 +266,13 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
Object[] newData = new Object[data.length];
for (int i = 0; i < data.length; i++) {
if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
- newData[i] = DataTypeUtil
- .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]);
+ if (DataTypeUtil.isPrimitiveColumn(dataTypes[i])) {
+ // keep the no dictionary measure column as original data
+ newData[i] = data[orderOfData[i]];
+ } else {
+ newData[i] = DataTypeUtil
+ .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]);
+ }
} else {
// if this is a complex column then recursively comver the data into Byte Array.
if (dataTypes[i].isComplexType()) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 0fc229a..1aa6da8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -91,6 +91,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
* boolean mapping for no dictionary columns in schema
*/
private boolean[] noDictionaryColMapping;
+
+ private boolean[] sortColumnMapping;
/**
* boolean mapping for long string dimension
*/
@@ -275,7 +277,15 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
preparedRow[i] = dictionaryValues[dictionaryIndex++];
} else {
// no dictionary dims
- preparedRow[i] = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
+ byte[] noDictionaryKeyByIndex = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
+ if (DataTypeUtil.isPrimitiveColumn(dims.getDataType())) {
+ // no dictionary measure columns are expected as original data
+ preparedRow[i] = DataTypeUtil
+ .getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeyByIndex,
+ dims.getDataType());
+ } else {
+ preparedRow[i] = noDictionaryKeyByIndex;
+ }
}
}
// fill all the measures
@@ -357,6 +367,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
measureCount = carbonTable.getMeasureByTableName(tableName).size();
List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
noDictionaryColMapping = new boolean[dimensions.size()];
+ sortColumnMapping = new boolean[dimensions.size()];
isVarcharDimMapping = new boolean[dimensions.size()];
int i = 0;
for (CarbonDimension dimension : dimensions) {
@@ -364,6 +375,9 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
i++;
continue;
}
+ if (dimension.isSortColumn()) {
+ sortColumnMapping[i] = true;
+ }
noDictionaryColMapping[i] = true;
if (dimension.getColumnSchema().getDataType() == DataTypes.VARCHAR) {
isVarcharDimMapping[i] = true;
@@ -395,8 +409,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
return SortParameters
.createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName,
dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount,
- noDictionaryCount, segmentId,
- carbonLoadModel.getTaskNo(), noDictionaryColMapping, isVarcharDimMapping, true);
+ noDictionaryCount, segmentId, carbonLoadModel.getTaskNo(), noDictionaryColMapping,
+ sortColumnMapping, isVarcharDimMapping, true);
}
/**
@@ -404,14 +418,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
* sort temp files
*/
private void initializeFinalThreadMergerForMergeSort() {
- boolean[] noDictionarySortColumnMapping = null;
- if (noDictionaryColMapping.length == this.segmentProperties.getNumberOfSortColumns()) {
- noDictionarySortColumnMapping = noDictionaryColMapping;
- } else {
- noDictionarySortColumnMapping = new boolean[this.segmentProperties.getNumberOfSortColumns()];
- System.arraycopy(noDictionaryColMapping, 0,
- noDictionarySortColumnMapping, 0, noDictionarySortColumnMapping.length);
- }
+ boolean[] noDictionarySortColumnMapping = CarbonDataProcessorUtil
+ .getNoDictSortColMapping(carbonTable.getDatabaseName(), carbonTable.getTableName());
sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index b877d52..2911c05 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.util.ByteUtil;
@@ -53,6 +54,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
private SegmentProperties segprop;
private CarbonLoadModel loadModel;
private PartitionSpec partitionSpec;
+
+ CarbonColumn[] noDicAndComplexColumns;
/**
* record holder heap
*/
@@ -86,6 +89,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
carbonFactDataHandlerModel.setCompactionFlow(true);
carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
+ this.noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns();
dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
}
@@ -200,7 +204,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
* @throws SliceMergerException
*/
private void addRow(Object[] carbonTuple) throws SliceMergerException {
- CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segprop);
+ CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segprop, noDicAndComplexColumns);
try {
this.dataHandler.addDataToStore(row);
} catch (CarbonDataWriterException e) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 2dc79a3..00fbc7a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
@@ -37,6 +38,8 @@ public class RowResultProcessor {
private CarbonFactHandler dataHandler;
private SegmentProperties segmentProperties;
+ private CarbonColumn[] noDicAndComplexColumns;
+
private static final LogService LOGGER =
LogServiceFactory.getLogService(RowResultProcessor.class.getName());
@@ -59,6 +62,7 @@ public class RowResultProcessor {
//Note: set compaction flow just to convert decimal type
carbonFactDataHandlerModel.setCompactionFlow(true);
carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
+ noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns();
dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
}
@@ -97,7 +101,8 @@ public class RowResultProcessor {
}
private void addRow(Object[] carbonTuple) throws CarbonDataWriterException {
- CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
+ CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties,
+ noDicAndComplexColumns);
try {
this.dataHandler.addDataToStore(row);
} catch (CarbonDataWriterException e) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
index 9b6d1e8..54fa99e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
@@ -19,7 +19,10 @@ package org.apache.carbondata.processing.sort.sortdata;
import java.util.Comparator;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
/**
@@ -31,11 +34,15 @@ public class IntermediateSortTempRowComparator implements Comparator<Intermediat
*/
private boolean[] isSortColumnNoDictionary;
+ private DataType[] noDicSortDataTypes;
+
/**
* @param isSortColumnNoDictionary isSortColumnNoDictionary
*/
- public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary) {
+ public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary,
+ DataType[] noDicSortDataTypes) {
this.isSortColumnNoDictionary = isSortColumnNoDictionary;
+ this.noDicSortDataTypes = noDicSortDataTypes;
}
/**
@@ -45,18 +52,31 @@ public class IntermediateSortTempRowComparator implements Comparator<Intermediat
int diff = 0;
int dictIndex = 0;
int nonDictIndex = 0;
+ int noDicTypeIdx = 0;
for (boolean isNoDictionary : isSortColumnNoDictionary) {
if (isNoDictionary) {
- byte[] byteArr1 = rowA.getNoDictSortDims()[nonDictIndex];
- byte[] byteArr2 = rowB.getNoDictSortDims()[nonDictIndex];
- nonDictIndex++;
+ if (DataTypeUtil.isPrimitiveColumn(noDicSortDataTypes[noDicTypeIdx])) {
+ // use data types based comparator for the no dictionary measure columns
+ SerializableComparator comparator = org.apache.carbondata.core.util.comparator.Comparator
+ .getComparator(noDicSortDataTypes[noDicTypeIdx]);
+ int difference = comparator.compare(rowA.getNoDictSortDims()[nonDictIndex],
+ rowB.getNoDictSortDims()[nonDictIndex]);
+ if (difference != 0) {
+ return difference;
+ }
+ noDicTypeIdx++;
+ } else {
+ byte[] byteArr1 = (byte[]) rowA.getNoDictSortDims()[nonDictIndex];
+ byte[] byteArr2 = (byte[]) rowB.getNoDictSortDims()[nonDictIndex];
- int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
- if (difference != 0) {
- return difference;
+ int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+ if (difference != 0) {
+ return difference;
+ }
}
+ nonDictIndex++;
} else {
int dimFieldA = rowA.getDictSortDims()[dictIndex];
int dimFieldB = rowB.getDictSortDims()[dictIndex];
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
index f47ecc7..4dff644 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
@@ -20,7 +20,10 @@ package org.apache.carbondata.processing.sort.sortdata;
import java.io.Serializable;
import java.util.Comparator;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.comparator.SerializableComparator;
public class NewRowComparator implements Comparator<Object[]>, Serializable {
private static final long serialVersionUID = -1739874611112709436L;
@@ -28,13 +31,20 @@ public class NewRowComparator implements Comparator<Object[]>, Serializable {
/**
* mapping of dictionary dimensions and no dictionary of sort_column.
*/
- private boolean[] noDictionarySortColumnMaping;
+ private boolean[] noDicDimColMapping;
+
+ private DataType[] noDicDataTypes;
+
+ private boolean[] noDicSortColumnMapping;
/**
- * @param noDictionarySortColumnMaping
+ * @param noDicDimColMapping
*/
- public NewRowComparator(boolean[] noDictionarySortColumnMaping) {
- this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
+ public NewRowComparator(boolean[] noDicDimColMapping, boolean[] noDicSortColumnMapping,
+ DataType[] noDicDataTypes) {
+ this.noDicDimColMapping = noDicDimColMapping;
+ this.noDicSortColumnMapping = noDicSortColumnMapping;
+ this.noDicDataTypes = noDicDataTypes;
}
/**
@@ -43,15 +53,31 @@ public class NewRowComparator implements Comparator<Object[]>, Serializable {
public int compare(Object[] rowA, Object[] rowB) {
int diff = 0;
int index = 0;
+ int dataTypeIdx = 0;
+ int noDicSortIdx = 0;
- for (boolean isNoDictionary : noDictionarySortColumnMaping) {
- if (isNoDictionary) {
- byte[] byteArr1 = (byte[]) rowA[index];
- byte[] byteArr2 = (byte[]) rowB[index];
+ for (int i = 0; i < noDicDimColMapping.length; i++) {
+ if (noDicDimColMapping[i]) {
+ if (noDicSortColumnMapping[noDicSortIdx++]) {
+ if (DataTypeUtil.isPrimitiveColumn(noDicDataTypes[dataTypeIdx])) {
+ // use data types based comparator for the no dictionary measure columns
+ SerializableComparator comparator =
+ org.apache.carbondata.core.util.comparator.Comparator
+ .getComparator(noDicDataTypes[dataTypeIdx]);
+ int difference = comparator.compare(rowA[index], rowB[index]);
+ if (difference != 0) {
+ return difference;
+ }
+ dataTypeIdx++;
+ } else {
+ byte[] byteArr1 = (byte[]) rowA[index];
+ byte[] byteArr2 = (byte[]) rowB[index];
- int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
- if (difference != 0) {
- return difference;
+ int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+ if (difference != 0) {
+ return difference;
+ }
+ }
}
} else {
int dimFieldA = (int) rowA[index];
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c8f70630/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index a5caf7b..730c729 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -203,7 +203,9 @@ public class SortDataRows {
toSort = new Object[entryCount][];
System.arraycopy(recordHolderList, 0, toSort, 0, entryCount);
if (parameters.getNumberOfNoDictSortColumns() > 0) {
- Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionarySortColumn()));
+ Arrays.sort(toSort, new NewRowComparator(parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getNoDictionarySortColumn(),
+ parameters.getNoDictDataType()));
} else {
Arrays.sort(toSort, new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));
}
@@ -315,7 +317,8 @@ public class SortDataRows {
long startTime = System.currentTimeMillis();
if (parameters.getNumberOfNoDictSortColumns() > 0) {
Arrays.sort(recordHolderArray,
- new NewRowComparator(parameters.getNoDictionarySortColumn()));
+ new NewRowComparator(parameters.getNoDictionaryDimnesionColumn(),
+ parameters.getNoDictionarySortColumn(), parameters.getNoDictDataType()));
} else {
Arrays.sort(recordHolderArray,
new NewRowComparatorForNormalDims(parameters.getNumberOfSortColumns()));