You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/05/11 08:11:26 UTC

carbondata git commit: [CARBONDATA-2460] [CARBONDATA-2461] [CARBONDATA-2462] Fixed bug in AvroCarbonWriter

Repository: carbondata
Updated Branches:
  refs/heads/master 61afa42da -> 3d8b085a5


[CARBONDATA-2460] [CARBONDATA-2461] [CARBONDATA-2462] Fixed bug in AvroCarbonWriter

Issue1: If Null type is passed from avro schema then Unsupported data
type exception is thrown.
Solution1: Ignore column which has NULL data type.

Issue2: Array fields were being cast to ArrayList without any instance
check.
Solution2: Check the instance of Array fields and cast appropriately.

This closes #2291


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3d8b085a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3d8b085a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3d8b085a

Branch: refs/heads/master
Commit: 3d8b085a55f551122c7528b6981f1785a44fef3c
Parents: 61afa42
Author: kunal642 <ku...@gmail.com>
Authored: Wed May 9 18:32:23 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Fri May 11 13:38:53 2018 +0530

----------------------------------------------------------------------
 .../TestNonTransactionalCarbonTable.scala       |  47 ++++++++-
 .../carbondata/sdk/file/AvroCarbonWriter.java   | 103 ++++++++++++++-----
 2 files changed, 122 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d8b085a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 376501b..86fda21 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -32,8 +32,6 @@ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandExcepti
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.sdk.file.AvroCarbonWriter
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -43,7 +41,7 @@ import org.apache.commons.lang.CharEncoding
 import tech.allegro.schema.json2avro.converter.JsonAvroConverter
 
 import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
-import org.apache.carbondata.sdk.file.{CarbonWriter, CarbonWriterBuilder, Field, Schema}
+import org.apache.carbondata.sdk.file.{AvroCarbonWriter, CarbonWriter, CarbonWriterBuilder, Field, Schema}
 
 
 class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
@@ -51,7 +49,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
   var writerPath = new File(this.getClass.getResource("/").getPath
                             +
                             "../." +
-                            "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+                            "./target/SparkCarbonFileFormat/WriterOutput/")
     .getCanonicalPath
   //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
   writerPath = writerPath.replace("\\", "/")
@@ -1795,6 +1793,47 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     }.getMessage.toLowerCase.contains("column: name specified in sort columns"))
   }
 
+  test("test if load is passing with NULL type") {
+    val schema1 =
+      """{
+        |	"namespace": "com.apache.schema",
+        |	"type": "record",
+        |	"name": "StudentActivity",
+        |	"fields": [
+        |		{
+        |			"name": "id",
+        |			"type": "null"
+        |		},
+        |		{
+        |			"name": "course_details",
+        |			"type": {
+        |				"name": "course_details",
+        |				"type": "record",
+        |				"fields": [
+        |					{
+        |						"name": "course_struct_course_time",
+        |						"type": "string"
+        |					}
+        |				]
+        |			}
+        |		}
+        |	]
+        |}""".stripMargin
+
+    val json1 =
+      """{"id": 101,"course_details": { "course_struct_course_time":"2014-01-05"  }}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val converter = new JsonAvroConverter
+    val record = converter
+      .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+    val writer = CarbonWriter.builder.withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(schema1))
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+    writer.write(record)
+    writer.close()
+  }
+
   test("test if data load is success with a struct having timestamp column  ") {
     val schema1 =
       """{

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3d8b085a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 55fd211..137e3f4 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -75,15 +75,18 @@ public class AvroCarbonWriter extends CarbonWriter {
       avroSchema = avroRecord.getSchema();
     }
     List<Schema.Field> fields = avroSchema.getFields();
-    Object[] csvField = new Object[fields.size()];
+    List<Object> csvFields = new ArrayList<>();
     for (int i = 0; i < fields.size(); i++) {
-      csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i));
+      Object field = avroFieldToObject(fields.get(i), avroRecord.get(i));
+      if (field != null) {
+        csvFields.add(field);
+      }
     }
-    return csvField;
+    return csvFields.toArray();
   }
 
   private Object avroFieldToObject(Schema.Field avroField, Object fieldValue) {
-    Object out = new Object();
+    Object out;
     Schema.Type type = avroField.schema().getType();
     switch (type) {
       case BOOLEAN:
@@ -102,24 +105,45 @@ public class AvroCarbonWriter extends CarbonWriter {
 
         Object[] structChildObjects = new Object[fields.size()];
         for (int i = 0; i < fields.size(); i++) {
-          structChildObjects[i] =
+          Object childObject =
               avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i));
+          if (childObject != null) {
+            structChildObjects[i] = childObject;
+          }
         }
         StructObject structObject = new StructObject(structChildObjects);
         out = structObject;
         break;
       case ARRAY:
-        int size = ((ArrayList) fieldValue).size();
-        Object[] arrayChildObjects = new Object[size];
-        for (int i = 0; i < size; i++) {
-          arrayChildObjects[i] = (avroFieldToObject(
-              new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
-              ((ArrayList) fieldValue).get(i)));
+        Object[] arrayChildObjects;
+        if (fieldValue instanceof GenericData.Array) {
+          int size = ((GenericData.Array) fieldValue).size();
+          arrayChildObjects = new Object[size];
+          for (int i = 0; i < size; i++) {
+            Object childObject = avroFieldToObject(
+                new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
+                ((GenericData.Array) fieldValue).get(i));
+            if (childObject != null) {
+              arrayChildObjects[i] = childObject;
+            }
+          }
+        } else {
+          int size = ((ArrayList) fieldValue).size();
+          arrayChildObjects = new Object[size];
+          for (int i = 0; i < size; i++) {
+            Object childObject = avroFieldToObject(
+                new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
+                ((ArrayList) fieldValue).get(i));
+            if (childObject != null) {
+              arrayChildObjects[i] = childObject;
+            }
+          }
         }
-        ArrayObject arrayObject = new ArrayObject(arrayChildObjects);
-        out = arrayObject;
+        out = new ArrayObject(arrayChildObjects);
+        break;
+      case NULL:
+        out = null;
         break;
-
       default:
         throw new UnsupportedOperationException(
             "carbon not support " + type.toString() + " avro type yet");
@@ -142,7 +166,10 @@ public class AvroCarbonWriter extends CarbonWriter {
     Field[] carbonField = new Field[avroSchema.getFields().size()];
     int i = 0;
     for (Schema.Field avroField : avroSchema.getFields()) {
-      carbonField[i] = prepareFields(avroField);
+      Field field = prepareFields(avroField);
+      if (field != null) {
+        carbonField[i] = field;
+      }
       i++;
     }
     return new org.apache.carbondata.sdk.file.Schema(carbonField);
@@ -169,15 +196,25 @@ public class AvroCarbonWriter extends CarbonWriter {
         // recursively get the sub fields
         ArrayList<StructField> structSubFields = new ArrayList<>();
         for (Schema.Field avroSubField : childSchema.getFields()) {
-          structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema()));
+          StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
+          if (structField != null) {
+            structSubFields.add(structField);
+          }
         }
         return new Field(FieldName, "struct", structSubFields);
       case ARRAY:
         // recursively get the sub fields
         ArrayList<StructField> arraySubField = new ArrayList<>();
         // array will have only one sub field.
-        arraySubField.add(prepareSubFields("val", childSchema.getElementType()));
-        return new Field(FieldName, "array", arraySubField);
+        StructField structField = prepareSubFields("val", childSchema.getElementType());
+        if (structField != null) {
+          arraySubField.add(structField);
+          return new Field(FieldName, "array", arraySubField);
+        } else {
+          return null;
+        }
+      case NULL:
+        return null;
       default:
         throw new UnsupportedOperationException(
             "carbon not support " + type.toString() + " avro type yet");
@@ -203,14 +240,23 @@ public class AvroCarbonWriter extends CarbonWriter {
         // recursively get the sub fields
         ArrayList<StructField> structSubFields = new ArrayList<>();
         for (Schema.Field avroSubField : childSchema.getFields()) {
-          structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema()));
+          StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
+          if (structField != null) {
+            structSubFields.add(structField);
+          }
         }
         return (new StructField(FieldName, DataTypes.createStructType(structSubFields)));
       case ARRAY:
         // recursively get the sub fields
         // array will have only one sub field.
-        return (new StructField(FieldName, DataTypes.createArrayType(
-            getMappingDataTypeForArrayRecord(childSchema.getElementType()))));
+        DataType subType = getMappingDataTypeForArrayRecord(childSchema.getElementType());
+        if (subType != null) {
+          return (new StructField(FieldName, DataTypes.createArrayType(subType)));
+        } else {
+          return null;
+        }
+      case NULL:
+        return null;
       default:
         throw new UnsupportedOperationException(
             "carbon not support " + type.toString() + " avro type yet");
@@ -235,13 +281,22 @@ public class AvroCarbonWriter extends CarbonWriter {
         // recursively get the sub fields
         ArrayList<StructField> structSubFields = new ArrayList<>();
         for (Schema.Field avroSubField : childSchema.getFields()) {
-          structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema()));
+          StructField structField = prepareSubFields(avroSubField.name(), avroSubField.schema());
+          if (structField != null) {
+            structSubFields.add(structField);
+          }
         }
         return DataTypes.createStructType(structSubFields);
       case ARRAY:
         // array will have only one sub field.
-        return DataTypes.createArrayType(
-            getMappingDataTypeForArrayRecord(childSchema.getElementType()));
+        DataType subType = getMappingDataTypeForArrayRecord(childSchema.getElementType());
+        if (subType != null) {
+          return DataTypes.createArrayType(subType);
+        } else {
+          return null;
+        }
+      case NULL:
+        return null;
       default:
         throw new UnsupportedOperationException(
             "carbon not support " + childSchema.getType().toString() + " avro type yet");