You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/01 17:59:07 UTC
[2/2] carbondata git commit: [CARBONDATA-2388][SDK]Avro Record
Complex Type Implementation
[CARBONDATA-2388][SDK]Avro Record Complex Type Implementation
Avro Complex DataType Support.
AVRO Complex type. Supported Datatype - ARRAYS and RECORDS.
Carbon Supported DataType - ARRAYS and STRUCT
SDK support to handle complex datatype.
Carbon Complex Type Support
- Support for NonDictionary Fields.
- Existing Complex type bug fixes.
This closes #2209
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3202cf51
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3202cf51
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3202cf51
Branch: refs/heads/master
Commit: 3202cf517ab1a6805deab27baeb5f9d44094ee87
Parents: 7edef8f
Author: sounakr <so...@gmail.com>
Authored: Mon Apr 23 10:18:10 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue May 1 23:28:53 2018 +0530
----------------------------------------------------------------------
.../core/metadata/datatype/DataTypes.java | 6 +-
.../core/metadata/schema/SchemaReader.java | 4 +-
.../schema/table/TableSchemaBuilder.java | 17 +-
.../schema/table/column/CarbonColumn.java | 39 ++++
.../impl/DictionaryBasedResultCollector.java | 6 +-
.../core/scan/complextypes/ArrayQueryType.java | 6 +-
.../scan/complextypes/PrimitiveQueryType.java | 38 +++-
.../core/scan/complextypes/StructQueryType.java | 7 +-
.../core/scan/executor/util/QueryUtil.java | 4 +
.../core/scan/filter/GenericQueryType.java | 3 +-
.../executer/RowLevelFilterExecuterImpl.java | 2 +-
.../core/scan/result/BlockletScannedResult.java | 2 +-
.../apache/carbondata/core/util/CarbonUtil.java | 1 +
.../scan/complextypes/ArrayQueryTypeTest.java | 4 +-
.../complextypes/PrimitiveQueryTypeTest.java | 6 +-
.../scan/complextypes/StructQueryTypeTest.java | 2 +-
integration/spark-common-test/pom.xml | 6 +
.../TestNonTransactionalCarbonTable.scala | 100 ++++++++++-
.../spark/sql/hive/CarbonFileMetastore.scala | 28 ++-
.../apache/spark/util/SparkTypeConverter.scala | 58 ++++--
.../processing/datatypes/ArrayDataType.java | 19 +-
.../processing/datatypes/GenericDataType.java | 14 +-
.../processing/datatypes/PrimitiveDataType.java | 179 ++++++++++++++++---
.../processing/datatypes/StructDataType.java | 29 ++-
.../loading/CarbonDataLoadConfiguration.java | 25 ++-
.../processing/loading/DataField.java | 1 +
.../loading/DataLoadProcessBuilder.java | 2 +
.../converter/impl/FieldEncoderFactory.java | 25 +--
.../CarbonRowDataWriterProcessorStepImpl.java | 3 +-
.../sort/sortdata/SortParameters.java | 6 +-
.../store/CarbonFactDataHandlerModel.java | 8 +-
.../carbondata/processing/store/TablePage.java | 4 +-
.../util/CarbonDataProcessorUtil.java | 28 ++-
.../carbondata/sdk/file/AvroCarbonWriter.java | 27 ++-
.../sdk/file/CarbonWriterBuilder.java | 18 +-
.../org/apache/carbondata/sdk/file/Field.java | 122 ++++++++++++-
.../sdk/file/AvroCarbonWriterTest.java | 107 ++++++++++-
.../streaming/CarbonStreamInputFormat.java | 3 +
.../streaming/CarbonStreamRecordReader.java | 2 +-
39 files changed, 816 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
index ad21eaa..dc89a41 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
@@ -63,9 +63,9 @@ public class DataTypes {
static final int SHORT_INT_TYPE_ID = 16;
static final int LEGACY_LONG_TYPE_ID = 17;
static final int DECIMAL_TYPE_ID = 10;
- static final int ARRAY_TYPE_ID = 11;
- static final int STRUCT_TYPE_ID = 12;
- static final int MAP_TYPE_ID = 13;
+ public static final int ARRAY_TYPE_ID = 11;
+ public static final int STRUCT_TYPE_ID = 12;
+ public static final int MAP_TYPE_ID = 13;
/**
* create a DataType instance from uniqueId of the DataType
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
index be3906b..57370f6 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
@@ -82,9 +82,7 @@ public class SchemaReader {
public static TableInfo inferSchema(AbsoluteTableIdentifier identifier,
boolean isCarbonFileProvider) throws IOException {
- // This routine is going to infer schema from the carbondata file footer
- // Convert the ColumnSchema -> TableSchema -> TableInfo.
- // Return the TableInfo.
+
org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
.inferSchema(identifier.getTablePath(), identifier.getTableName(), isCarbonFileProvider);
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 617d58f..2c29be0 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.DecimalType;
import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.datatype.StructType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
@@ -111,7 +112,8 @@ public class TableSchemaBuilder {
if (isSortColumn ||
field.getDataType() == DataTypes.STRING ||
field.getDataType() == DataTypes.DATE ||
- field.getDataType() == DataTypes.TIMESTAMP) {
+ field.getDataType() == DataTypes.TIMESTAMP ||
+ DataTypes.isStructType(field.getDataType())) {
newColumn.setDimensionColumn(true);
} else {
newColumn.setDimensionColumn(false);
@@ -128,6 +130,9 @@ public class TableSchemaBuilder {
newColumn.setColumnUniqueId(field.getFieldName());
newColumn.setColumnReferenceId(newColumn.getColumnUniqueId());
newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn));
+ if (field.getDataType().isComplexType()) {
+ newColumn.setNumberOfChild(((StructType) field.getDataType()).getFields().size());
+ }
if (DataTypes.isDecimal(field.getDataType())) {
DecimalType decimalType = (DecimalType) field.getDataType();
newColumn.setPrecision(decimalType.getPrecision());
@@ -139,10 +144,18 @@ public class TableSchemaBuilder {
} else {
otherColumns.add(newColumn);
}
-
if (newColumn.isDimensionColumn()) {
newColumn.setUseInvertedIndex(true);
}
+ if (field.getDataType().isComplexType()) {
+ if (((StructType) field.getDataType()).getFields().size() > 0) {
+ // This field has children.
+ List<StructField> fields = ((StructType) field.getDataType()).getFields();
+ for (int i = 0; i < fields.size(); i ++) {
+ addColumn(fields.get(i), false);
+ }
+ }
+ }
return this;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
index c888418..e19e329 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
@@ -52,6 +52,21 @@ public class CarbonColumn implements Serializable {
*/
protected ColumnIdentifier columnIdentifier;
+ /**
+ * Date Format
+ */
+ private String dateFormat;
+
+ /**
+ * TimeStamp Format.
+ */
+ private String timestampFormat;
+
+ /**
+ * useActualData
+ */
+ private boolean useActualData;
+
public CarbonColumn(ColumnSchema columnSchema, int ordinal, int schemaOrdinal) {
this.columnSchema = columnSchema;
this.ordinal = ordinal;
@@ -180,4 +195,28 @@ public class CarbonColumn implements Serializable {
public int getSchemaOrdinal() {
return this.schemaOrdinal;
}
+
+ public String getDateFormat() {
+ return dateFormat;
+ }
+
+ public void setDateFormat(String dateFormat) {
+ this.dateFormat = dateFormat;
+ }
+
+ public String getTimestampFormat() {
+ return timestampFormat;
+ }
+
+ public void setTimestampFormat(String timestampFormat) {
+ this.timestampFormat = timestampFormat;
+ }
+
+ public boolean getUseActualData() {
+ return useActualData;
+ }
+
+ public void setUseActualData(boolean useActualData) {
+ this.useActualData = useActualData;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index bb048aa..60f14a4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -134,6 +134,10 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
row[order[i]] =
DataTypeUtil.getDataBasedOnDataType(scannedResult.getBlockletId(), DataTypes.STRING);
}
+ } else if (complexDataTypeArray[i]) {
+ // Complex Type With No Dictionary Encoding.
+ row[order[i]] = comlexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
+ .getDataBasedOnDataType(ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
} else {
row[order[i]] = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(
noDictionaryKeys[noDictionaryColumnIndex++],
@@ -146,7 +150,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
}
} else if (complexDataTypeArray[i]) {
row[order[i]] = comlexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
- .getDataBasedOnDataTypeFromSurrogates(
+ .getDataBasedOnDataType(
ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
dictionaryColumnIndex++;
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
index 24c1c9b..81e9651 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
@@ -85,14 +85,14 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType
children.fillRequiredBlockData(blockChunkHolder);
}
- @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) {
- int dataLength = surrogateData.getInt();
+ @Override public Object getDataBasedOnDataType(ByteBuffer dataBuffer) {
+ int dataLength = dataBuffer.getInt();
if (dataLength == -1) {
return null;
}
Object[] data = new Object[dataLength];
for (int i = 0; i < dataLength; i++) {
- data[i] = children.getDataBasedOnDataTypeFromSurrogates(surrogateData);
+ data[i] = children.getDataBasedOnDataType(dataBuffer);
}
return DataTypeUtil.getDataTypeConverter().wrapWithGenericArrayData(data);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index 8c75caf..2db590b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.core.keygenerator.mdkey.Bits;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.scan.filter.GenericQueryType;
import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
import org.apache.carbondata.core.util.DataTypeUtil;
@@ -43,8 +44,10 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
private boolean isDirectDictionary;
+ private boolean isDictionary;
+
public PrimitiveQueryType(String name, String parentname, int blockIndex,
- org.apache.carbondata.core.metadata.datatype.DataType dataType, int keySize,
+ DataType dataType, int keySize,
Dictionary dictionary, boolean isDirectDictionary) {
super(name, parentname, blockIndex);
this.dataType = dataType;
@@ -53,6 +56,7 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
this.name = name;
this.parentname = parentname;
this.isDirectDictionary = isDirectDictionary;
+ this.isDictionary = (dictionary != null && isDirectDictionary == false);
}
@Override public void addChildren(GenericQueryType children) {
@@ -84,6 +88,9 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
DimensionRawColumnChunk[] rawColumnChunks, int rowNumber,
int pageNumber, DataOutputStream dataOutputStream) throws IOException {
byte[] currentVal = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber);
+ if (!this.isDictionary) {
+ dataOutputStream.writeInt(currentVal.length);
+ }
dataOutputStream.write(currentVal);
}
@@ -92,20 +99,35 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
readBlockDataChunk(blockChunkHolder);
}
- @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) {
- byte[] data = new byte[keySize];
- surrogateData.get(data);
- Bits bit = new Bits(new int[]{keySize * 8});
- int surrgateValue = (int)bit.getKeyArray(data, 0)[0];
+ @Override public Object getDataBasedOnDataType(ByteBuffer dataBuffer) {
Object actualData = null;
+
if (isDirectDictionary) {
- DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
- .getDirectDictionaryGenerator(dataType);
+ // Direct Dictionary Column
+ byte[] data = new byte[keySize];
+ dataBuffer.get(data);
+ Bits bit = new Bits(new int[] { keySize * 8 });
+ int surrgateValue = (int) bit.getKeyArray(data, 0)[0];
+ DirectDictionaryGenerator directDictionaryGenerator =
+ DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(dataType);
actualData = directDictionaryGenerator.getValueFromSurrogate(surrgateValue);
+ } else if (!isDictionary) {
+ // No Dictionary Columns
+ int size = dataBuffer.getInt();
+ byte[] value = new byte[size];
+ dataBuffer.get(value, 0, size);
+ actualData = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(value, this.dataType);
} else {
+ // Dictionary Column
+ byte[] data = new byte[keySize];
+ dataBuffer.get(data);
+ Bits bit = new Bits(new int[] { keySize * 8 });
+ int surrgateValue = (int) bit.getKeyArray(data, 0)[0];
String dictionaryValueForKey = dictionary.getDictionaryValueForKey(surrgateValue);
actualData = DataTypeUtil.getDataBasedOnDataType(dictionaryValueForKey, this.dataType);
}
+
return actualData;
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
index 1064694..9ff8252 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -101,13 +101,14 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
}
}
- @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) {
- int childLength = surrogateData.getInt();
+ @Override public Object getDataBasedOnDataType(ByteBuffer dataBuffer) {
+ int childLength = dataBuffer.getInt();
Object[] fields = new Object[childLength];
for (int i = 0; i < childLength; i++) {
- fields[i] = children.get(i).getDataBasedOnDataTypeFromSurrogates(surrogateData);
+ fields[i] = children.get(i).getDataBasedOnDataType(dataBuffer);
}
return DataTypeUtil.getDataTypeConverter().wrapWithGenericRow(fields);
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 1765efa..cc31efc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -777,6 +777,10 @@ public class QueryUtil {
boolean isDirectDictionary = CarbonUtil
.hasEncoding(dimension.getListOfChildDimensions().get(i).getEncoder(),
Encoding.DIRECT_DICTIONARY);
+ boolean isDictionary = CarbonUtil
+ .hasEncoding(dimension.getListOfChildDimensions().get(i).getEncoder(),
+ Encoding.DICTIONARY);
+
parentQueryType.addChildren(
new PrimitiveQueryType(dimension.getListOfChildDimensions().get(i).getColName(),
dimension.getColName(), ++parentBlockIndex,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
index b5d8d82..fe65669 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
@@ -43,5 +43,6 @@ public interface GenericQueryType {
void fillRequiredBlockData(RawBlockletColumnChunks blockChunkHolder) throws IOException;
- Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData);
+ Object getDataBasedOnDataType(ByteBuffer dataBuffer);
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index 2bd49ed..7e92aef 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -440,7 +440,7 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
complexType.parseBlocksAndReturnComplexColumnByteArray(
blockChunkHolder.getDimensionRawColumnChunks(), index, pageIndex, dataOutputStream);
record[dimColumnEvaluatorInfo.getRowIndex()] = complexType
- .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
+ .getDataBasedOnDataType(ByteBuffer.wrap(byteStream.toByteArray()));
byteStream.close();
} catch (IOException e) {
LOGGER.info(e.getMessage());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index 35d4f51..b85945f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -289,7 +289,7 @@ public abstract class BlockletScannedResult {
pageFilteredRowId == null ? j : pageFilteredRowId[pageCounter][j], pageCounter,
dataOutput);
Object data = vectorInfos[i].genericQueryType
- .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
+ .getDataBasedOnDataType(ByteBuffer.wrap(byteStream.toByteArray()));
vector.putObject(vectorOffset++, data);
} catch (IOException e) {
LOGGER.error(e);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index a5351a0..b42167d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2348,6 +2348,7 @@ public final class CarbonUtil {
try {
fistFilePath = filePaths.get(0);
} catch (Exception e) {
+ // Check if we can infer the schema from the hive metastore.
LOGGER.error("CarbonData file is not present in the table location");
throw new IOException("CarbonData file is not present in the table location");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/test/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryTypeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryTypeTest.java b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryTypeTest.java
index e882f4e..3c2c374 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryTypeTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryTypeTest.java
@@ -41,7 +41,7 @@ public class ArrayQueryTypeTest {
surrogateData.put(1, (byte) 0xFF);
surrogateData.put(2, (byte) 0xFF);
surrogateData.put(3, (byte) 0xFF);
- assertNull(arrayQueryType.getDataBasedOnDataTypeFromSurrogates(surrogateData));
+ assertNull(arrayQueryType.getDataBasedOnDataType(surrogateData));
}
@Test public void testGetDataBasedOnDataTypeFromSurrogates() {
@@ -50,6 +50,6 @@ public class ArrayQueryTypeTest {
arrayQueryType.setName("testName");
arrayQueryType.setParentname("testName");
arrayQueryType.addChildren(arrayQueryType);
- assertNotNull(arrayQueryType.getDataBasedOnDataTypeFromSurrogates(surrogateData));
+ assertNotNull(arrayQueryType.getDataBasedOnDataType(surrogateData));
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
index 3236f16..757f2b9 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
@@ -84,7 +84,7 @@ public class PrimitiveQueryTypeTest {
Object expectedValue = 1313043000000L;
Object actualValue =
- primitiveQueryTypeForTimeStamp.getDataBasedOnDataTypeFromSurrogates(surrogateData);
+ primitiveQueryTypeForTimeStamp.getDataBasedOnDataType(surrogateData);
assertEquals(expectedValue, actualValue);
}
@@ -97,9 +97,9 @@ public class PrimitiveQueryTypeTest {
}
};
Object expectedValue = primitiveQueryTypeForTimeStampForIsDictionaryFalse
- .getDataBasedOnDataTypeFromSurrogates(surrogateData);
+ .getDataBasedOnDataType(surrogateData);
Object actualValue = primitiveQueryTypeForTimeStampForIsDictionaryFalse
- .getDataBasedOnDataTypeFromSurrogates(surrogateData);
+ .getDataBasedOnDataType(surrogateData);
assertEquals(expectedValue, actualValue);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
index b09d9dd..18b85c6 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
@@ -47,7 +47,7 @@ public class StructQueryTypeTest {
structQueryType.addChildren(arrayQueryType);
List children = new ArrayList();
children.add(arrayQueryType);
- assertNotNull(structQueryType.getDataBasedOnDataTypeFromSurrogates(surrogateData));
+ assertNotNull(structQueryType.getDataBasedOnDataType(surrogateData));
}
@Test public void testGetColsCount() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index d70fa2e..9f184e6 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -136,6 +136,12 @@
<artifactId>jmockit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>tech.allegro.schema.json2avro</groupId>
+ <artifactId>converter</artifactId>
+ <version>0.2.5</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index f1bda31..a6af4a6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -17,13 +17,13 @@
package org.apache.carbondata.spark.testsuite.createTable
-import java.io.{File, FileFilter}
+import java.io.{File, FileFilter, IOException}
import java.util
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
-import org.junit.Assert
+import org.junit.{Assert, Test}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -31,9 +31,14 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
-
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.commons.lang.CharEncoding
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter
+
+import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
@@ -637,7 +642,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
cleanTestData()
}
-
test("test huge data write with one batch having bad record") {
val exception =
@@ -648,4 +652,88 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
.contains("Data load failed due to bad record"))
}
-}
+
+
+ def buildAvroTestData(rows: Int, options: util.Map[String, String]): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ val newAvroSchema = "{" + " \"type\" : \"record\", " + " \"name\" : \"userInfo\", " +
+ " \"namespace\" : \"my.example\", " +
+ " \"fields\" : [{\"name\" : \"username\", " +
+ " \"type\" : \"string\", " + " \"default\" : \"NONE\"}, " +
+ " {\"name\" : \"age\", " + " \"type\" : \"int\", " +
+ " \"default\" : -1}, " + "{\"name\" : \"address\", " +
+ " \"type\" : { " + " \"type\" : \"record\", " +
+ " \"name\" : \"mailing_address\", " + " \"fields\" : [ {" +
+ " \"name\" : \"street\", " +
+ " \"type\" : \"string\", " +
+ " \"default\" : \"NONE\"}, { " + " \"name\" : \"city\", " +
+ " \"type\" : \"string\", " + " \"default\" : \"NONE\"}, " +
+ " ]}, " + " \"default\" : {} " + " } " + "}"
+ val mySchema = "{" + " \"name\": \"address\", " + " \"type\": \"record\", " +
+ " \"fields\": [ " +
+ " { \"name\": \"name\", \"type\": \"string\"}, " +
+ " { \"name\": \"age\", \"type\": \"int\"}, " + " { " +
+ " \"name\": \"address\", " + " \"type\": { " +
+ " \"type\" : \"record\", " + " \"name\" : \"my_address\", " +
+ " \"fields\" : [ " +
+ " {\"name\": \"street\", \"type\": \"string\"}, " +
+ " {\"name\": \"city\", \"type\": \"string\"} " + " ]} " + " } " +
+ "] " + "}"
+ val json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", " +
+ "\"city\":\"bang\"}}"
+ // conversion to GenericData.Record
+ val nn = new org.apache.avro.Schema.Parser().parse(mySchema)
+ val converter = new JsonAvroConverter
+ val record = converter
+ .convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn)
+ val fields = new Array[Field](3)
+ fields(0) = new Field("name", DataTypes.STRING)
+ fields(1) = new Field("age", DataTypes.STRING)
+ // fields[1] = new Field("age", DataTypes.INT);
+ val fld = new util.ArrayList[StructField]
+ fld.add(new StructField("street", DataTypes.STRING))
+ fld.add(new StructField("city", DataTypes.STRING))
+ fields(2) = new Field("address", "struct", fld)
+ try {
+ val writer = CarbonWriter.builder.withSchema(new Schema(fields))
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+ var i = 0
+ while (i < rows) {
+ writer.write(record)
+ i = i + 1
+ }
+ writer.close()
+ }
+ catch {
+ case e: Exception => {
+ e.printStackTrace()
+ Assert.fail(e.getMessage)
+ }
+ }
+ }
+
+ def buildAvroTestDataSingleFile(): Any = {
+ FileUtils.deleteDirectory(new File(writerPath))
+ buildAvroTestData(3, null)
+ }
+
+ test("Read sdk writer Avro output ") {
+ buildAvroTestDataSingleFile()
+ assert(new File(writerPath).exists())
+ sql("DROP TABLE IF EXISTS sdkOutputTable")
+ sql(
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+ |'$writerPath' """.stripMargin)
+
+ sql("select * from sdkOutputTable").show(false)
+
+ checkAnswer(sql("select * from sdkOutputTable"), Seq(
+ Row("bob", "10", Row("abc","bang")),
+ Row("bob", "10", Row("abc","bang")),
+ Row("bob", "10", Row("abc","bang"))))
+
+ sql("DROP TABLE sdkOutputTable")
+ // drop table should not delete the files
+ assert(new File(writerPath).exists())
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index c61471a..2d24abf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -42,8 +42,9 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.{table, SchemaReader}
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable}
+import org.apache.carbondata.core.metadata.schema.table
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.SchemaReader
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.ThriftWriter
@@ -223,9 +224,26 @@ class CarbonFileMetastore extends CarbonMetaStore {
val tablePath = identifier.getTablePath
val wrapperTableInfo =
if (inferSchema) {
- val thriftTableInfo = schemaConverter
- .fromWrapperToExternalTableInfo(SchemaReader.inferSchema(identifier, false),
- dbName, tableName)
+ val carbonTbl = CarbonMetadata.getInstance().getCarbonTable(dbName, tableName)
+ val tblInfoFromCache = if (carbonTbl != null) {
+ carbonTbl.getTableInfo
+ } else {
+ null
+ }
+
+ val thriftTableInfo : TableInfo = if (tblInfoFromCache != null) {
+ // In case the TableInfo is present in the Carbon Metadata Cache
+ // then get the tableinfo from the cache rather than infering from
+ // the CarbonData file.
+ schemaConverter
+ .fromWrapperToExternalTableInfo(tblInfoFromCache, dbName, tableName)
+ } else {
+ schemaConverter
+ .fromWrapperToExternalTableInfo(SchemaReader
+ .inferSchema(identifier, false),
+ dbName, tableName)
+ }
+
val wrapperTableInfo =
schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, tablePath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
index 65210b8..fe11b98 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
@@ -97,16 +97,32 @@ private[spark] object SparkTypeConverter {
def getStructChildren(table: CarbonTable, dimName: String): String = {
table.getChildren(dimName).asScala.map(childDim => {
childDim.getDataType.getName.toLowerCase match {
- case "array" => s"${
+ case "array" => if (table.isTransactionalTable) {s"${
childDim.getColName.substring(dimName.length + 1)
- }:array<${ getArrayChildren(table, childDim.getColName) }>"
- case "struct" => s"${
+ }:array<${ getArrayChildren(table, childDim.getColName) }>" } else {
+ // For non Transactional table the Childrends of Struct Columns
+ // are not appended with their parent.
+ s"${
+ childDim.getColName
+ }:array<${ getArrayChildren(table, childDim.getColName) }>"
+ }
+ case "struct" => if (table.isTransactionalTable) { s"${
childDim.getColName.substring(dimName.length + 1)
}:struct<${ table.getChildren(childDim.getColName)
.asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",")
- }>"
- case dType => s"${ childDim.getColName
+ }>"} else {
+ s"${
+ childDim.getColName
+ }:struct<${ table.getChildren(childDim.getColName)
+ .asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",")
+ }>"
+ }
+ case dType => if (table.isTransactionalTable) {
+ s"${ childDim.getColName
.substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
+ } else {
+ s"${ childDim.getColName}:${ addDecimalScaleAndPrecision(childDim, dType) }"
+ }
}
}).mkString(",")
}
@@ -123,13 +139,31 @@ private[spark] object SparkTypeConverter {
private def recursiveMethod(
table: CarbonTable, dimName: String, childDim: CarbonDimension) = {
childDim.getDataType.getName.toLowerCase match {
- case "array" => s"${
- childDim.getColName.substring(dimName.length + 1)
- }:array<${ getArrayChildren(table, childDim.getColName) }>"
- case "struct" => s"${
- childDim.getColName.substring(dimName.length + 1)
- }:struct<${ getStructChildren(table, childDim.getColName) }>"
- case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
+ case "array" => if (table.isTransactionalTable) {
+ s"${
+ childDim.getColName.substring(dimName.length + 1)
+ }:array<${ getArrayChildren(table, childDim.getColName) }>"
+ } else {
+ // For non Transactional table the Childrends of Struct Columns
+ // are not appended with their parent.
+ s"${
+ childDim.getColName
+ }:array<${ getArrayChildren(table, childDim.getColName) }>"
+ }
+ case "struct" => if (table.isTransactionalTable) {
+ s"${
+ childDim.getColName.substring(dimName.length + 1)
+ }:struct<${ getStructChildren(table, childDim.getColName) }>"
+ } else {
+ s"${
+ childDim.getColName
+ }:struct<${ getStructChildren(table, childDim.getColName) }>"
+ }
+ case dType => if (table.isTransactionalTable) {
+ s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
+ } else {
+ s"${ childDim.getColName }:${ dType }"
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 86a6744..fb198ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -147,6 +147,10 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
}
+ @Override public boolean getIsColumnDictionary() {
+ return true;
+ }
+
@Override
public void writeByteArray(ArrayObject input, DataOutputStream dataOutputStream)
throws IOException, DictionaryGenerationException {
@@ -168,22 +172,21 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
children.fillCardinality(dimCardWithComplex);
}
- /**
- * parse byte array and bit pack
- */
@Override
- public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
- KeyGenerator[] generator) throws IOException, KeyGenException {
+ public void parseComplexValue(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
+ KeyGenerator[] generator)
+ throws IOException, KeyGenException {
int dataLength = byteArrayInput.getInt();
dataOutputStream.writeInt(dataLength);
if (children instanceof PrimitiveDataType) {
- dataOutputStream.writeInt(generator[children.getSurrogateIndex()].getKeySizeInBytes());
+ if (children.getIsColumnDictionary()) {
+ dataOutputStream.writeInt(generator[children.getSurrogateIndex()].getKeySizeInBytes());
+ }
}
for (int i = 0; i < dataLength; i++) {
- children.parseAndBitPack(byteArrayInput, dataOutputStream, generator);
+ children.parseComplexValue(byteArrayInput, dataOutputStream, generator);
}
-
}
/*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index 77c00d9..21ad83d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -72,15 +72,23 @@ public interface GenericDataType<T> {
void setSurrogateIndex(int surrIndex);
/**
- * converts integer surrogate to bit packed surrogate value
+ * Returns true in case the column has Dictionary Encoding.
+ * @return
+ */
+ boolean getIsColumnDictionary();
+
+ /**
+ * Parse the Complex Datatype from the ByteBuffer.
* @param byteArrayInput
* @param dataOutputStream
* @param generator
+ * @return
* @throws IOException
* @throws KeyGenException
*/
- void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
- KeyGenerator[] generator) throws IOException, KeyGenException;
+ void parseComplexValue(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
+ KeyGenerator[] generator)
+ throws IOException, KeyGenException;
/**
* @return columns count of each complex type
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 55fa23b..fa60bf6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.processing.datatypes;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -39,13 +40,16 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.keygenerator.KeyGenerator;
import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.processing.loading.dictionary.DictionaryServerClientDictionary;
import org.apache.carbondata.processing.loading.dictionary.DirectDictionary;
import org.apache.carbondata.processing.loading.dictionary.PreCreatedDictionary;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
/**
* Primitive DataType stateless object used in data loading
@@ -94,6 +98,13 @@ public class PrimitiveDataType implements GenericDataType<Object> {
private CarbonDimension carbonDimension;
+ private boolean isDictionary;
+
+ private boolean isEmptyBadRecord;
+
+ private String nullformat;
+
+
private PrimitiveDataType(int outputArrayIndex, int dataCounter) {
this.outputArrayIndex = outputArrayIndex;
this.dataCounter = dataCounter;
@@ -105,28 +116,45 @@ public class PrimitiveDataType implements GenericDataType<Object> {
* @param name
* @param parentname
* @param columnId
+ * @param dimensionOrdinal
+ * @param isDictionary
*/
- public PrimitiveDataType(String name, String parentname, String columnId, int dimensionOrdinal) {
+ public PrimitiveDataType(String name, String parentname, String columnId, int dimensionOrdinal,
+ boolean isDictionary, String nullformat, boolean isEmptyBadRecord) {
this.name = name;
this.parentname = parentname;
this.columnId = columnId;
+ this.isDictionary = isDictionary;
+ this.nullformat = nullformat;
+ this.isEmptyBadRecord = isEmptyBadRecord;
}
/**
- * constructor
- *
- * @param name
+ * Constructor
+ * @param carbonColumn
* @param parentname
* @param columnId
+ * @param carbonDimension
+ * @param cache
+ * @param absoluteTableIdentifier
+ * @param client
+ * @param useOnePass
+ * @param localCache
+ * @param nullFormat
+ * @param isEmptyBadRecords
*/
- public PrimitiveDataType(String name, String parentname, String columnId,
+ public PrimitiveDataType(CarbonColumn carbonColumn, String parentname, String columnId,
CarbonDimension carbonDimension, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
- Map<Object, Integer> localCache) {
- this.name = name;
+ Map<Object, Integer> localCache, String nullFormat, Boolean isEmptyBadRecords) {
+ this.name = carbonColumn.getColName();
this.parentname = parentname;
this.columnId = columnId;
this.carbonDimension = carbonDimension;
+ this.isDictionary = isDictionaryDimension(carbonDimension);
+ this.nullformat = nullFormat;
+ this.isEmptyBadRecord = isEmptyBadRecords;
+
DictionaryColumnUniqueIdentifier identifier =
new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
carbonDimension.getColumnIdentifier(), carbonDimension.getDataType());
@@ -134,7 +162,7 @@ public class PrimitiveDataType implements GenericDataType<Object> {
if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory
.getDirectDictionaryGenerator(carbonDimension.getDataType()));
- } else {
+ } else if (carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
Dictionary dictionary = null;
if (useOnePass) {
if (CarbonUtil.isFileExistsForGivenColumn(identifier)) {
@@ -160,6 +188,14 @@ public class PrimitiveDataType implements GenericDataType<Object> {
}
}
+ private boolean isDictionaryDimension(CarbonDimension carbonDimension) {
+ if (carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
/*
* primitive column will not have any child column
*/
@@ -211,38 +247,123 @@ public class PrimitiveDataType implements GenericDataType<Object> {
/*
* set surrogate index
*/
- @Override
- public void setSurrogateIndex(int surrIndex) {
- index = surrIndex;
+ @Override public void setSurrogateIndex(int surrIndex) {
+ if (this.carbonDimension != null && !this.carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
+ index = 0;
+ } else if (this.carbonDimension == null && isDictionary == false) {
+ index = 0;
+ } else {
+ index = surrIndex;
+ }
+ }
+
+ @Override public boolean getIsColumnDictionary() {
+ return isDictionary;
}
@Override public void writeByteArray(Object input, DataOutputStream dataOutputStream)
throws IOException, DictionaryGenerationException {
+
String parsedValue =
input == null ? null : DataTypeUtil.parseValue(input.toString(), carbonDimension);
- Integer surrogateKey;
- if (null == parsedValue) {
- surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
- } else {
- surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
- if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
+ if (this.isDictionary) {
+ Integer surrogateKey;
+ if (null == parsedValue) {
surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+ } else {
+ surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
+ if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
+ surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+ }
+ }
+ dataOutputStream.writeInt(surrogateKey);
+ } else {
+ // Transform into ByteArray for No Dictionary.
+ // TODO have to refactor and place all the cases present in NonDictionaryFieldConverterImpl
+ if (null == parsedValue && this.carbonDimension.getDataType() != DataTypes.STRING) {
+ updateNullValue(dataOutputStream);
+ } else if (null == parsedValue || parsedValue.equals(nullformat)) {
+ updateNullValue(dataOutputStream);
+ } else {
+ String dateFormat = null;
+ if (this.carbonDimension.getDataType() == DataTypes.DATE) {
+ dateFormat = this.carbonDimension.getDateFormat();
+ } else if (this.carbonDimension.getDataType() == DataTypes.TIMESTAMP) {
+ dateFormat = this.carbonDimension.getTimestampFormat();
+ }
+
+ try {
+ if (!this.carbonDimension.getUseActualData()) {
+ byte[] value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue,
+ this.carbonDimension.getDataType(), dateFormat);
+ if (this.carbonDimension.getDataType() == DataTypes.STRING
+ && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
+ throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
+ + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+ }
+ updateValueToByteStream(dataOutputStream, value);
+ } else {
+ Object value = DataTypeUtil.getDataDataTypeForNoDictionaryColumn(parsedValue,
+ this.carbonDimension.getDataType(), dateFormat);
+ if (this.carbonDimension.getDataType() == DataTypes.STRING
+ && value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
+ throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
+ + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+ }
+ if (parsedValue.length() > 0) {
+ updateValueToByteStream(dataOutputStream,
+ parsedValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+ } else {
+ updateNullValue(dataOutputStream);
+ }
+ }
+ } catch (CarbonDataLoadingException e) {
+ throw e;
+ } catch (Throwable ex) {
+ // TODO have to implemented the Bad Records LogHolder.
+ // Same like NonDictionaryFieldConverterImpl.
+ throw ex;
+ }
}
}
- dataOutputStream.writeInt(surrogateKey);
}
- @Override
- public void fillCardinality(List<Integer> dimCardWithComplex) {
+ private void updateValueToByteStream(DataOutputStream dataOutputStream, byte[] value)
+ throws IOException {
+ dataOutputStream.writeInt(value.length);
+ dataOutputStream.write(value);
+ }
+
+ private void updateNullValue(DataOutputStream dataOutputStream) throws IOException {
+ if (this.carbonDimension.getDataType() == DataTypes.STRING) {
+ dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+ } else {
+ dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
+ }
+ }
+
+ @Override public void fillCardinality(List<Integer> dimCardWithComplex) {
+ if (!this.carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
+ return;
+ }
dimCardWithComplex.add(dictionaryGenerator.size());
}
@Override
- public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
- KeyGenerator[] generator) throws IOException, KeyGenException {
- int data = byteArrayInput.getInt();
- byte[] v = generator[index].generateKey(new int[] { data });
- dataOutputStream.write(v);
+ public void parseComplexValue(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
+ KeyGenerator[] generator)
+ throws IOException, KeyGenException {
+ if (!this.isDictionary) {
+ int sizeOfData = byteArrayInput.getInt();
+ dataOutputStream.writeInt(sizeOfData);
+ byte[] bb = new byte[sizeOfData];
+ byteArrayInput.get(bb, 0, sizeOfData);
+ dataOutputStream.write(bb);
+ } else {
+ int data = byteArrayInput.getInt();
+ byte[] v = generator[index].generateKey(new int[] { data });
+ dataOutputStream.write(v);
+ }
}
/*
@@ -326,8 +447,16 @@ public class PrimitiveDataType implements GenericDataType<Object> {
@Override
public GenericDataType<Object> deepCopy() {
PrimitiveDataType dataType = new PrimitiveDataType(this.outputArrayIndex, 0);
+ dataType.carbonDimension = this.carbonDimension;
+ dataType.isDictionary = this.isDictionary;
+ dataType.parentname = this.parentname;
+ dataType.columnId = this.columnId;
+ dataType.dictionaryGenerator = this.dictionaryGenerator;
+ dataType.isEmptyBadRecord = this.isEmptyBadRecord;
+ dataType.nullformat = this.nullformat;
dataType.setKeySize(this.keySize);
dataType.setSurrogateIndex(this.index);
+
return dataType;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index a340ab1..36899a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -146,6 +146,10 @@ public class StructDataType implements GenericDataType<StructObject> {
}
+ @Override public boolean getIsColumnDictionary() {
+ return true;
+ }
+
@Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream)
throws IOException, DictionaryGenerationException {
dataOutputStream.writeInt(children.size());
@@ -175,20 +179,29 @@ public class StructDataType implements GenericDataType<StructObject> {
}
}
- /*
- * parse bytearray and bit pack
+ /**
+ *
+ * @param byteArrayInput
+ * @param dataOutputStream
+ * @param generator
+ * @return
+ * @throws IOException
+ * @throws KeyGenException
*/
- @Override
- public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
- KeyGenerator[] generator) throws IOException, KeyGenException {
+ @Override public void parseComplexValue(ByteBuffer byteArrayInput,
+ DataOutputStream dataOutputStream, KeyGenerator[] generator)
+ throws IOException, KeyGenException {
int childElement = byteArrayInput.getInt();
dataOutputStream.writeInt(childElement);
+
for (int i = 0; i < childElement; i++) {
if (children.get(i) instanceof PrimitiveDataType) {
- dataOutputStream.writeInt(generator[children.get(i).getSurrogateIndex()]
- .getKeySizeInBytes());
+ if (children.get(i).getIsColumnDictionary()) {
+ dataOutputStream
+ .writeInt(generator[children.get(i).getSurrogateIndex()].getKeySizeInBytes());
+ }
}
- children.get(i).parseAndBitPack(byteArrayInput, dataOutputStream, generator);
+ children.get(i).parseComplexValue(byteArrayInput, dataOutputStream, generator);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index ad1c84c..9418efb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -87,7 +87,9 @@ public class CarbonDataLoadConfiguration {
private int noDictionaryCount;
- private int complexColumnCount;
+ private int complexDictionaryColumnCount;
+
+ private int complexNonDictionaryColumnCount;
/**
* schema updated time stamp to be used for restructure scenarios
@@ -128,13 +130,17 @@ public class CarbonDataLoadConfiguration {
CarbonColumn column = dataField.getColumn();
if (column.isDimension()) {
dimensionCount++;
- if (!dataField.hasDictionaryEncoding()) {
+ if (column.isComplex()) {
+ if (!dataField.hasDictionaryEncoding()) {
+ complexNonDictionaryColumnCount++;
+ } else {
+ complexDictionaryColumnCount++;
+ }
+ } else if (!dataField.hasDictionaryEncoding()) {
noDictionaryCount++;
}
}
- if (column.isComplex()) {
- complexColumnCount++;
- }
+
if (column.isMeasure()) {
measureCount++;
}
@@ -153,8 +159,8 @@ public class CarbonDataLoadConfiguration {
return noDictionaryCount;
}
- public int getComplexColumnCount() {
- return complexColumnCount;
+ public int getComplexDictionaryColumnCount() {
+ return complexDictionaryColumnCount;
}
public int getMeasureCount() {
@@ -387,4 +393,9 @@ public class CarbonDataLoadConfiguration {
public void setCarbonTransactionalTable(boolean carbonTransactionalTable) {
this.carbonTransactionalTable = carbonTransactionalTable;
}
+
+ public int getComplexNonDictionaryColumnCount() {
+ return complexNonDictionaryColumnCount;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
index dc2fbbb..4018f75 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
@@ -69,5 +69,6 @@ public class DataField implements Serializable {
public void setUseActualData(boolean useActualData) {
this.useActualData = useActualData;
+ this.column.setUseActualData(useActualData);
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 9c1d113..028c404 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -230,8 +230,10 @@ public final class DataLoadProcessBuilder {
DataField dataField = new DataField(column);
if (column.getDataType() == DataTypes.DATE) {
dataField.setDateFormat(loadModel.getDateFormat());
+ column.setDateFormat(loadModel.getDateFormat());
} else if (column.getDataType() == DataTypes.TIMESTAMP) {
dataField.setTimestampFormat(loadModel.getTimestampformat());
+ column.setTimestampFormat(loadModel.getTimestampformat());
}
if (column.isComplex()) {
complexDataFields.add(dataField);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 193d192..567a8b5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -115,8 +115,8 @@ public class FieldEncoderFactory {
}
} else if (dataField.getColumn().isComplex()) {
return new ComplexFieldConverterImpl(
- createComplexType(dataField, cache, absoluteTableIdentifier,
- client, useOnePass, localCache), index);
+ createComplexDataType(dataField, cache, absoluteTableIdentifier,
+ client, useOnePass, localCache, index, nullFormat, isEmptyBadRecord), index);
} else {
return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
}
@@ -128,12 +128,13 @@ public class FieldEncoderFactory {
/**
* Create parser for the carbon column.
*/
- private static GenericDataType createComplexType(DataField dataField,
+ private static GenericDataType createComplexDataType(DataField dataField,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
- Map<Object, Integer> localCache) {
+ Map<Object, Integer> localCache, int index, String nullFormat, Boolean isEmptyBadRecords) {
return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
- absoluteTableIdentifier, client, useOnePass, localCache);
+ absoluteTableIdentifier, client, useOnePass, localCache, index, nullFormat,
+ isEmptyBadRecords);
}
/**
@@ -141,10 +142,11 @@ public class FieldEncoderFactory {
*
* @return GenericDataType
*/
+
private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
- Map<Object, Integer> localCache) {
+ Map<Object, Integer> localCache, int index, String nullFormat, Boolean isEmptyBadRecords) {
DataType dataType = carbonColumn.getDataType();
if (DataTypes.isArrayType(dataType)) {
List<CarbonDimension> listOfChildDimensions =
@@ -155,7 +157,7 @@ public class FieldEncoderFactory {
for (CarbonDimension dimension : listOfChildDimensions) {
arrayDataType.addChildren(
createComplexType(dimension, carbonColumn.getColName(), cache, absoluteTableIdentifier,
- client, useOnePass, localCache));
+ client, useOnePass, localCache, index, nullFormat, isEmptyBadRecords));
}
return arrayDataType;
} else if (DataTypes.isStructType(dataType)) {
@@ -167,15 +169,16 @@ public class FieldEncoderFactory {
for (CarbonDimension dimension : dimensions) {
structDataType.addChildren(
createComplexType(dimension, carbonColumn.getColName(), cache, absoluteTableIdentifier,
- client, useOnePass, localCache));
+ client, useOnePass, localCache, index, nullFormat, isEmptyBadRecords));
}
return structDataType;
} else if (DataTypes.isMapType(dataType)) {
throw new UnsupportedOperationException("Complex type Map is not supported yet");
} else {
- return new PrimitiveDataType(carbonColumn.getColName(), parentName,
- carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
- absoluteTableIdentifier, client, useOnePass, localCache);
+ return new PrimitiveDataType(carbonColumn, parentName, carbonColumn.getColumnId(),
+ (CarbonDimension) carbonColumn, cache, absoluteTableIdentifier, client, useOnePass,
+ localCache, nullFormat, isEmptyBadRecords);
}
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 6cf1dcd..ea75cd2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -110,7 +110,8 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
writeCounter = new long[iterators.length];
dimensionWithComplexCount = configuration.getDimensionCount();
noDictWithComplextCount =
- configuration.getNoDictionaryCount() + configuration.getComplexColumnCount();
+ configuration.getNoDictionaryCount() + configuration.getComplexDictionaryColumnCount()
+ + configuration.getComplexNonDictionaryColumnCount();
dimensionCount = configuration.getDimensionCount() - noDictWithComplextCount;
isNoDictionaryDimensionColumn =
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 4d333ed..705350c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -364,9 +364,11 @@ public class SortParameters implements Serializable {
parameters.setTaskNo(configuration.getTaskNo());
parameters.setMeasureColCount(configuration.getMeasureCount());
parameters.setDimColCount(
- configuration.getDimensionCount() - configuration.getComplexColumnCount());
+ configuration.getDimensionCount() - (configuration.getComplexDictionaryColumnCount()
+ + configuration.getComplexNonDictionaryColumnCount()));
parameters.setNoDictionaryCount(configuration.getNoDictionaryCount());
- parameters.setComplexDimColCount(configuration.getComplexColumnCount());
+ parameters.setComplexDimColCount(configuration.getComplexDictionaryColumnCount() + configuration
+ .getComplexNonDictionaryColumnCount());
parameters.setNoDictionaryDimnesionColumn(
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index aaf20c7..2ec85fc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -199,7 +199,8 @@ public class CarbonFactDataHandlerModel {
int dimensionCount = configuration.getDimensionCount();
int noDictionaryCount = configuration.getNoDictionaryCount();
- int complexDimensionCount = configuration.getComplexColumnCount();
+ int complexDimensionCount = configuration.getComplexDictionaryColumnCount() + configuration
+ .getComplexNonDictionaryColumnCount();
int measureCount = configuration.getMeasureCount();
int simpleDimsCount = dimensionCount - noDictionaryCount - complexDimensionCount;
@@ -210,8 +211,8 @@ public class CarbonFactDataHandlerModel {
//To Set MDKey Index of each primitive type in complex type
int surrIndex = simpleDimsCount;
Iterator<Map.Entry<String, GenericDataType>> complexMap =
- CarbonDataProcessorUtil.getComplexTypesMap(configuration.getDataFields()).entrySet()
- .iterator();
+ CarbonDataProcessorUtil.getComplexTypesMap(configuration.getDataFields(), configuration)
+ .entrySet().iterator();
Map<Integer, GenericDataType> complexIndexMap = new HashMap<>(complexDimensionCount);
while (complexMap.hasNext()) {
Map.Entry<String, GenericDataType> complexDataType = complexMap.next();
@@ -609,5 +610,6 @@ public class CarbonFactDataHandlerModel {
public DataMapWriterListener getDataMapWriterlistener() {
return dataMapWriterlistener;
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index f22d1c1..26a634b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -191,7 +191,7 @@ public class TablePage {
}
/**
- * add a complex column into internal member compleDimensionPage
+ * add a complex column into internal member complexDimensionPage
*
* @param index index of the complexDimensionPage
* @param rowId Id of the input row
@@ -222,7 +222,7 @@ public class TablePage {
ByteBuffer byteArrayInput = ByteBuffer.wrap(complexColumns);
ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
- complexDataType.parseAndBitPack(byteArrayInput, dataOutputStream,
+ complexDataType.parseComplexValue(byteArrayInput, dataOutputStream,
model.getComplexDimensionKeyGenerator());
complexDataType.getColumnarDataForComplexType(encodedComplexColumnar,
ByteBuffer.wrap(byteArrayOutput.toByteArray()));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 392ad59..6ba05a2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -267,14 +267,24 @@ public final class CarbonDataProcessorUtil {
return dimString.toString();
}
+ private static String isDictionaryType(CarbonDimension dimension) {
+ Boolean isDictionary = true;
+ if (!(dimension.hasEncoding(Encoding.DICTIONARY))) {
+ isDictionary = false;
+ }
+ return isDictionary.toString();
+ }
+
/**
* This method will return all the child dimensions under complex dimension
*/
private static void addAllComplexTypeChildren(CarbonDimension dimension, StringBuilder dimString,
String parent) {
+
dimString.append(dimension.getColName()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(dimension.getDataType()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(parent).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
+ .append(isDictionaryType(dimension)).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(dimension.getColumnId()).append(CarbonCommonConstants.HASH_SPC_CHARACTER);
for (int i = 0; i < dimension.getNumberOfChild(); i++) {
CarbonDimension childDim = dimension.getListOfChildDimensions().get(i);
@@ -284,6 +294,7 @@ public final class CarbonDataProcessorUtil {
dimString.append(childDim.getColName()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(childDim.getDataType()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(dimension.getColName()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
+ .append(isDictionaryType(dimension)).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(childDim.getColumnId()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
.append(childDim.getOrdinal()).append(CarbonCommonConstants.HASH_SPC_CHARACTER);
}
@@ -291,11 +302,21 @@ public final class CarbonDataProcessorUtil {
}
// TODO: need to simplify it. Not required create string first.
- public static Map<String, GenericDataType> getComplexTypesMap(DataField[] dataFields) {
+ public static Map<String, GenericDataType> getComplexTypesMap(DataField[] dataFields,
+ CarbonDataLoadConfiguration configuration) {
String complexTypeString = getComplexTypeString(dataFields);
+
if (null == complexTypeString || complexTypeString.equals("")) {
return new LinkedHashMap<>();
}
+
+ String nullFormat =
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+ .toString();
+ boolean isEmptyBadRecord = Boolean.parseBoolean(
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+ .toString());
+
Map<String, GenericDataType> complexTypesMap = new LinkedHashMap<String, GenericDataType>();
String[] hierarchies = complexTypeString.split(CarbonCommonConstants.SEMICOLON_SPC_CHARACTER);
for (int i = 0; i < hierarchies.length; i++) {
@@ -312,8 +333,9 @@ public final class CarbonDataProcessorUtil {
} else if (levelInfo[1].toLowerCase().contains(CarbonCommonConstants.STRUCT)) {
g.addChildren(new StructDataType(levelInfo[0], levelInfo[2], levelInfo[3]));
} else {
- g.addChildren(new PrimitiveDataType(levelInfo[0], levelInfo[2], levelInfo[3],
- Integer.parseInt(levelInfo[4])));
+ g.addChildren(new PrimitiveDataType(levelInfo[0], levelInfo[2], levelInfo[4],
+ Integer.parseInt(levelInfo[5]), levelInfo[3].contains("true"), nullFormat,
+ isEmptyBadRecord));
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 458dea8..8f1994a 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -64,12 +64,12 @@ class AvroCarbonWriter extends CarbonWriter {
this.writable = new ObjectArrayWritable();
}
- private String[] avroToCsv(GenericData.Record avroRecord) {
+ private Object[] avroToCsv(GenericData.Record avroRecord) {
if (avroSchema == null) {
avroSchema = avroRecord.getSchema();
}
List<Schema.Field> fields = avroSchema.getFields();
- String[] csvField = new String[fields.size()];
+ Object [] csvField = new String[fields.size()];
for (int i = 0; i < fields.size(); i++) {
csvField[i] = avroFieldToString(fields.get(i), avroRecord.get(i));
}
@@ -88,9 +88,25 @@ class AvroCarbonWriter extends CarbonWriter {
case FLOAT:
out.append(fieldValue.toString());
break;
+ case RECORD:
+ List<Schema.Field> fields = fieldType.schema().getFields();
+ String delimiter = null;
+ for (int i = 0; i < fields.size(); i ++) {
+ if (i == 0) {
+ delimiter = "$";
+ } else {
+ delimiter = ":";
+ }
+ if (i != (fields.size() - 1)) {
+ out.append(avroFieldToString(fields.get(i), ((GenericData.Record) fieldValue).get(i)))
+ .append(delimiter);
+ } else {
+ out.append(avroFieldToString(fields.get(i), ((GenericData.Record) fieldValue).get(i)));
+ }
+ }
+ break;
default:
throw new UnsupportedOperationException();
- // TODO: convert complex type
}
return out.toString();
}
@@ -104,7 +120,7 @@ class AvroCarbonWriter extends CarbonWriter {
GenericData.Record record = (GenericData.Record) object;
// convert Avro record to CSV String[]
- String[] csvRecord = avroToCsv(record);
+ Object[] csvRecord = avroToCsv(record);
writable.set(csvRecord);
recordWriter.write(NullWritable.get(), writable);
} catch (Exception e) {
@@ -116,8 +132,7 @@ class AvroCarbonWriter extends CarbonWriter {
/**
* Flush and close the writer
*/
- @Override
- public void close() throws IOException {
+ @Override public void close() throws IOException {
try {
recordWriter.close(context);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 7ee22ed..3d5c77c 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -379,9 +380,20 @@ public class CarbonWriterBuilder {
}
for (Field field : schema.getFields()) {
if (null != field) {
- tableSchemaBuilder.addColumn(
- new StructField(field.getFieldName(), field.getDataType()),
- sortColumnsList.contains(field.getFieldName()));
+ if (field.getChildren() != null && field.getChildren().size() > 0) {
+ // Loop through the inner columns and for a StructData
+ List<StructField> structFieldsArray =
+ new ArrayList<StructField>(field.getChildren().size());
+ String parentName = field.getFieldName();
+ for (StructField childFld : field.getChildren()) {
+ structFieldsArray.add(new StructField(childFld.getFieldName(), childFld.getDataType()));
+ }
+ DataType complexType = DataTypes.createStructType(structFieldsArray);
+ tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), complexType), false);
+ } else {
+ tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), field.getDataType()),
+ sortColumnsList.contains(field.getFieldName()));
+ }
}
}
String tableName;