You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/05/11 08:11:26 UTC
carbondata git commit: [CARBONDATA-2460] [CARBONDATA-2461]
[CARBONDATA-2462] Fixed bug in AvroCarbonWriter
Repository: carbondata
Updated Branches:
refs/heads/master 61afa42da -> 3d8b085a5
[CARBONDATA-2460] [CARBONDATA-2461] [CARBONDATA-2462] Fixed bug in AvroCarbonWriter
Issue1: If Null type is passed from avro schema then Unsupported data
type exception is thrown.
Solution1: Ignore column which has NULL data type.
Issue2: Array fields were being cast to ArrayList without any instance
check.
Solution2: Check the instance of Array fields and cast appropriately.
This closes #2291
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3d8b085a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3d8b085a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3d8b085a
Branch: refs/heads/master
Commit: 3d8b085a55f551122c7528b6981f1785a44fef3c
Parents: 61afa42
Author: kunal642 <ku...@gmail.com>
Authored: Wed May 9 18:32:23 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Fri May 11 13:38:53 2018 +0530
----------------------------------------------------------------------
.../TestNonTransactionalCarbonTable.scala | 47 ++++++++-
.../carbondata/sdk/file/AvroCarbonWriter.java | 103 ++++++++++++++-----
2 files changed, 122 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d8b085a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 376501b..86fda21 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -32,8 +32,6 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.sdk.file.AvroCarbonWriter
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -43,7 +41,7 @@ import org.apache.commons.lang.CharEncoding
import tech.allegro.schema.json2avro.converter.JsonAvroConverter
import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
-import org.apache.carbondata.sdk.file.{CarbonWriter, CarbonWriterBuilder, Field, Schema}
+import org.apache.carbondata.sdk.file.{AvroCarbonWriter, CarbonWriter, CarbonWriterBuilder, Field, Schema}
class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
@@ -51,7 +49,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
var writerPath = new File(this.getClass.getResource("/").getPath
+
"../." +
- "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+ "./target/SparkCarbonFileFormat/WriterOutput/")
.getCanonicalPath
//getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
writerPath = writerPath.replace("\\", "/")
@@ -1795,6 +1793,47 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
}.getMessage.toLowerCase.contains("column: name specified in sort columns"))
}
+ test("test if load is passing with NULL type") {
+ val schema1 =
+ """{
+ | "namespace": "com.apache.schema",
+ | "type": "record",
+ | "name": "StudentActivity",
+ | "fields": [
+ | {
+ | "name": "id",
+ | "type": "null"
+ | },
+ | {
+ | "name": "course_details",
+ | "type": {
+ | "name": "course_details",
+ | "type": "record",
+ | "fields": [
+ | {
+ | "name": "course_struct_course_time",
+ | "type": "string"
+ | }
+ | ]
+ | }
+ | }
+ | ]
+ |}""".stripMargin
+
+ val json1 =
+ """{"id": 101,"course_details": { "course_struct_course_time":"2014-01-05" }}""".stripMargin
+
+ val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+ val converter = new JsonAvroConverter
+ val record = converter
+ .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+ val writer = CarbonWriter.builder.withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(schema1))
+ .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+ writer.write(record)
+ writer.close()
+ }
+
test("test if data load is success with a struct having timestamp column ") {
val schema1 =
"""{
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d8b085a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 55fd211..137e3f4 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -75,15 +75,18 @@ public class AvroCarbonWriter extends CarbonWriter {
avroSchema = avroRecord.getSchema();
}
List<Schema.Field> fields = avroSchema.getFields();
- Object[] csvField = new Object[fields.size()];
+ List<Object> csvFields = new ArrayList<>();
for (int i = 0; i < fields.size(); i++) {
- csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i));
+ Object field = avroFieldToObject(fields.get(i), avroRecord.get(i));
+ if (field != null) {
+ csvFields.add(field);
+ }
}
- return csvField;
+ return csvFields.toArray();
}
private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) {
- Object out = new Object();
+ Object out;
Schema.Type type = avroField.schema().getType();
switch (type) {
case BOOLEAN:
@@ -102,24 +105,45 @@ public class AvroCarbonWriter extends CarbonWriter {
Object[] structChildObjects = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
- structChildObjects[i] =
+ Object childObject =
avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i));
+ if (childObject != null) {
+ structChildObjects[i] = childObject;
+ }
}
StructObject structObject = new StructObject(structChildObjects);
out = structObject;
break;
case ARRAY:
- int size = ((ArrayList) fieldValue).size();
- Object[] arrayChildObjects = new Object[size];
- for (int i = 0; i < size; i++) {
- arrayChildObjects[i] = (avroFieldToObject(
- new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
- ((ArrayList) fieldValue).get(i)));
+ Object[] arrayChildObjects;
+ if (fieldValue instanceof GenericData.Array) {
+ int size = ((GenericData.Array) fieldValue).size();
+ arrayChildObjects = new Object[size];
+ for (int i = 0; i < size; i++) {
+ Object childObject = avroFieldToObject(
+ new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
+ ((GenericData.Array) fieldValue).get(i));
+ if (childObject != null) {
+ arrayChildObjects[i] = childObject;
+ }
+ }
+ } else {
+ int size = ((ArrayList) fieldValue).size();
+ arrayChildObjects = new Object[size];
+ for (int i = 0; i < size; i++) {
+ Object childObject = avroFieldToObject(
+ new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
+ ((ArrayList) fieldValue).get(i));
+ if (childObject != null) {
+ arrayChildObjects[i] = childObject;
+ }
+ }
}
- ArrayObject arrayObject = new ArrayObject(arrayChildObjects);
- out = arrayObject;
+ out = new ArrayObject(arrayChildObjects);
+ break;
+ case NULL:
+ out = null;
break;
-
default:
throw new UnsupportedOperationException(
"carbon not support " + type.toString() + " avro type yet");
@@ -142,7 +166,10 @@ public class AvroCarbonWriter extends CarbonWriter {
Field[] carbonField = new Field[avroSchema.getFields().size()];
int i = 0;
for (Schema.Field avroField : avroSchema.getFields()) {
- carbonField[i] = prepareFields(avroField);
+ Field field = prepareFields(avroField);
+ if (field != null) {
+ carbonField[i] = field;
+ }
i++;
}
return new org.apache.carbondata.sdk.file.Schema(carbonField);
@@ -169,15 +196,25 @@ public class AvroCarbonWriter extends CarbonWriter {
// recursively get the sub fields
ArrayList<StructField> structSubFields = new ArrayList<>();
for (Schema.Field avroSubField : childSchema.getFields()) {
- structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema()));
+ StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
+ if (structField != null) {
+ structSubFields.add(structField);
+ }
}
return new Field(FieldName, "struct", structSubFields);
case ARRAY:
// recursively get the sub fields
ArrayList<StructField> arraySubField = new ArrayList<>();
// array will have only one sub field.
- arraySubField.add(prepareSubFields("val", childSchema.getElementType()));
- return new Field(FieldName, "array", arraySubField);
+ StructField structField = prepareSubFields("val", childSchema.getElementType());
+ if (structField != null) {
+ arraySubField.add(structField);
+ return new Field(FieldName, "array", arraySubField);
+ } else {
+ return null;
+ }
+ case NULL:
+ return null;
default:
throw new UnsupportedOperationException(
"carbon not support " + type.toString() + " avro type yet");
@@ -203,14 +240,23 @@ public class AvroCarbonWriter extends CarbonWriter {
// recursively get the sub fields
ArrayList<StructField> structSubFields = new ArrayList<>();
for (Schema.Field avroSubField : childSchema.getFields()) {
- structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema()));
+ StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
+ if (structField != null) {
+ structSubFields.add(structField);
+ }
}
return (new StructField(FieldName, DataTypes.createStructType(structSubFields)));
case ARRAY:
// recursively get the sub fields
// array will have only one sub field.
- return (new StructField(FieldName, DataTypes.createArrayType(
- getMappingDataTypeForArrayRecord(childSchema.getElementType()))));
+ DataType subType = getMappingDataTypeForArrayRecord(childSchema.getElementType());
+ if (subType != null) {
+ return (new StructField(FieldName, DataTypes.createArrayType(subType)));
+ } else {
+ return null;
+ }
+ case NULL:
+ return null;
default:
throw new UnsupportedOperationException(
"carbon not support " + type.toString() + " avro type yet");
@@ -235,13 +281,22 @@ public class AvroCarbonWriter extends CarbonWriter {
// recursively get the sub fields
ArrayList<StructField> structSubFields = new ArrayList<>();
for (Schema.Field avroSubField : childSchema.getFields()) {
- structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema()));
+ StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
+ if (structField != null) {
+ structSubFields.add(structField);
+ }
}
return DataTypes.createStructType(structSubFields);
case ARRAY:
// array will have only one sub field.
- return DataTypes.createArrayType(
- getMappingDataTypeForArrayRecord(childSchema.getElementType()));
+ DataType subType = getMappingDataTypeForArrayRecord(childSchema.getElementType());
+ if (subType != null) {
+ return DataTypes.createArrayType(subType);
+ } else {
+ return null;
+ }
+ case NULL:
+ return null;
default:
throw new UnsupportedOperationException(
"carbon not support " + childSchema.getType().toString() + " avro type yet");