You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/05/17 14:30:24 UTC

[11/50] [abbrv] carbondata git commit: [CARBONDATA-2457] Add converter to get Carbon SDK Schema from Avro schema directly.

[CARBONDATA-2457] Add converter to get Carbon SDK Schema from Avro schema directly.

In the current implementation, SDK users have to manually create carbon schema of fields from avro schema.
This is time-consuming and error-prone. Also, user should not be worried about this logic.
So, abstract the carbon schema creation from avro schema by exposing a method to user.

This closes #2283


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cf3e9196
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cf3e9196
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cf3e9196

Branch: refs/heads/spark-2.3
Commit: cf3e919651f3b84c3045d1be8fb89a8a8cfd8242
Parents: 747be9b
Author: ajantha-bhat <aj...@gmail.com>
Authored: Tue May 8 12:12:37 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed May 9 18:07:05 2018 +0530

----------------------------------------------------------------------
 .../TestNonTransactionalCarbonTable.scala       | 172 ++-----------------
 .../carbondata/sdk/file/AvroCarbonWriter.java   | 129 +++++++++++++-
 .../sdk/file/AvroCarbonWriterTest.java          |  63 +------
 3 files changed, 145 insertions(+), 219 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf3e9196/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index c641ed3..2f88c40 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
+import org.apache.carbondata.sdk.file.{AvroCarbonWriter, CarbonWriter, Field, Schema}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
@@ -39,8 +39,6 @@ import org.apache.avro
 import org.apache.commons.lang.CharEncoding
 import tech.allegro.schema.json2avro.converter.JsonAvroConverter
 
-import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
-
 class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
   var writerPath = new File(this.getClass.getResource("/").getPath
@@ -351,8 +349,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
-
-
   test("read non transactional table, files written from sdk Writer Output)") {
     buildTestDataSingleFile()
     assert(new File(writerPath).exists())
@@ -531,7 +527,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
-
   test("Read sdk writer output file without any file should fail") {
     buildTestDataSingleFile()
     deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
@@ -685,11 +680,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
   }
 
-
-  private def WriteFilesWithAvroWriter(rows: Int,
-      mySchema: String,
-      json: String,
-      fields: Array[Field]) = {
+  private def WriteFilesWithAvroWriter(rows: Int, mySchema: String, json: String): Unit = {
     // conversion to GenericData.Record
     val nn = new avro.Schema.Parser().parse(mySchema)
     val converter = new JsonAvroConverter
@@ -697,7 +688,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       .convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn)
 
     try {
-      val writer = CarbonWriter.builder.withSchema(new Schema(fields))
+      val writer = CarbonWriter.builder
+        .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema))
         .outputPath(writerPath).isTransactionalTable(false)
         .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput
       var i = 0
@@ -734,16 +726,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       """.stripMargin
 
     val json = """ {"name":"bob", "age":10, "address" : {"street":"abc", "city":"bang"}} """
-
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fld.add(new StructField("city", DataTypes.STRING))
-    fields(2) = new Field("address", "struct", fld)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataStructType(): Any = {
@@ -782,17 +765,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
                    """.stripMargin
 
     val json: String = """ {"name": "bob","age": 10,"address": ["abc", "defg"]} """
-
-
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-    // fields[1] = new Field("age", DataTypes.INT);
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fields(2) = new Field("address", "array", fld)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataSingleFileArrayType(): Any = {
@@ -836,18 +809,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       """ {"name":"bob", "age":10,
         |"address" : {"street":"abc", "city":"bang"},
         |"doorNum" : [1,2,3,4]}""".stripMargin
-
-    val fields = new Array[Field](4)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fld.add(new StructField("city", DataTypes.STRING))
-    fields(2) = new Field("address", "struct", fld)
-    val fld1 = new util.ArrayList[StructField]
-    fld1.add(new StructField("eachDoorNum", DataTypes.INT))
-    fields(3) = new Field("doorNum", "array", fld1)
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataBothStructArrayType(): Any = {
@@ -855,7 +817,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     buildAvroTestDataStructWithArrayType(3, null)
   }
 
-
   // ArrayOfStruct test
   def buildAvroTestDataArrayOfStruct(rows: Int, options: util.Map[String, String]): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
@@ -900,20 +861,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |{"street":"def","city":"city2"},
         |{"street":"ghi","city":"city3"},
         |{"street":"jkl","city":"city4"}]} """.stripMargin
-
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fld.add(new StructField("city", DataTypes.STRING))
-
-    val fld2 = new util.ArrayList[StructField]
-    fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
-    fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataArrayOfStructType(): Any = {
@@ -921,7 +869,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     buildAvroTestDataArrayOfStruct(3, null)
   }
 
-
   // StructOfArray test
   def buildAvroTestDataStructOfArray(rows: Int, options: util.Map[String, String]): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
@@ -983,21 +930,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
                  |		]
                  |	}
                  |} """.stripMargin
-
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val fld2 = new util.ArrayList[StructField]
-    fld2.add(new StructField("street", DataTypes.STRING))
-    fld2.add(new StructField("city", DataTypes.STRING))
-
-    val fld1 = new util.ArrayList[StructField]
-    fld1.add(new StructField("eachDoorNum", DataTypes.INT))
-    fld2.add(new StructField("doorNum", DataTypes.createArrayType(DataTypes.INT), fld1))
-
-    fields(2) = new Field("address","struct",fld2)
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataStructOfArrayType(): Any = {
@@ -1005,7 +938,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     buildAvroTestDataStructOfArray(3, null)
   }
 
-
   test("Read sdk writer Avro output Record Type") {
     buildAvroTestDataStructType()
     assert(new File(writerPath).exists())
@@ -1014,7 +946,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
          |'$writerPath' """.stripMargin)
 
-
     checkAnswer(sql("select * from sdkOutputTable"), Seq(
       Row("bob", 10, Row("abc","bang")),
       Row("bob", 10, Row("abc","bang")),
@@ -1073,7 +1004,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
-
   test("Read sdk writer Avro output with Array of struct") {
     buildAvroTestDataArrayOfStructType()
     assert(new File(writerPath).exists())
@@ -1099,7 +1029,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
-
   // Struct of array
   test("Read sdk writer Avro output with struct of Array") {
     buildAvroTestDataStructOfArrayType()
@@ -1201,21 +1130,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |		}
         |	]
         |} """.stripMargin
-
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fld.add(new StructField("city", DataTypes.STRING))
-    fld.add(new StructField("FloorNum", DataTypes.createArrayType(DataTypes.INT)))
-
-    val fld2 = new util.ArrayList[StructField]
-    fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
-    fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataMultiLevel3Type(): Any = {
@@ -1253,7 +1168,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
-
   // test multi level -- 3 levels [array of struct of struct of string, int]
   def buildAvroTestDataMultiLevel3_1(rows: Int, options: util.Map[String, String]): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
@@ -1333,26 +1247,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |		}
         |	]
         |}  """.stripMargin
-
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fld.add(new StructField("city", DataTypes.STRING))
-
-    val subFld = new util.ArrayList[StructField]
-    subFld.add(new StructField("wing", DataTypes.STRING))
-    subFld.add(new StructField("number", DataTypes.INT))
-    fld.add(new StructField("FloorNum", DataTypes.createStructType(subFld)))
-
-    // array of struct of struct
-    val fld2 = new util.ArrayList[StructField]
-    fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
-    fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataMultiLevel3_1Type(): Any = {
@@ -1432,22 +1327,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |        	"BuildNum": [[[1,2,3],[4,5,6]],[[10,20,30],[40,50,60]]]
         |        }   """.stripMargin
 
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val subFld = new util.ArrayList[StructField]
-    subFld.add(new StructField("EachDoorNum", DataTypes.INT))
-
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("DoorNum", DataTypes.createArrayType(DataTypes.INT), subFld))
-    // array of struct of struct
-    val doorNum = new util.ArrayList[StructField]
-    doorNum.add(new StructField("FloorNum",
-      DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.INT)), fld))
-    fields(2) = new Field("BuildNum", "array", doorNum)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataMultiLevel3_2Type(): Any = {
@@ -1486,8 +1366,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
-
-
   // test multi level -- 4 levels [array of array of array of struct]
   def buildAvroTestDataMultiLevel4(rows: Int, options: util.Map[String, String]): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
@@ -1566,30 +1444,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |		]
         |	]
         |} """.stripMargin
-
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val subFld = new util.ArrayList[StructField]
-    subFld.add(new StructField("EachDoorNum", DataTypes.INT))
-
-    val address = new util.ArrayList[StructField]
-    address.add(new StructField("street", DataTypes.STRING))
-    address.add(new StructField("city", DataTypes.STRING))
-
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("DoorNum",
-        DataTypes.createArrayType(DataTypes.createStructType(address)),
-        subFld))
-    // array of struct of struct
-    val doorNum = new util.ArrayList[StructField]
-    doorNum.add(new StructField("FloorNum",
-      DataTypes.createArrayType(
-        DataTypes.createArrayType(DataTypes.createStructType(address))), fld))
-    fields(2) = new Field("BuildNum", "array", doorNum)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataMultiLevel4Type(): Any = {
@@ -1615,5 +1470,4 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf3e9196/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 946040f..55fd211 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -24,6 +24,9 @@ import java.util.Random;
 import java.util.UUID;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
@@ -46,7 +49,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
  * Writer Implementation to write Avro Record to carbondata file.
  */
 @InterfaceAudience.Internal
-class AvroCarbonWriter extends CarbonWriter {
+public class AvroCarbonWriter extends CarbonWriter {
 
   private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
   private TaskAttemptContext context;
@@ -118,12 +121,134 @@ class AvroCarbonWriter extends CarbonWriter {
         break;
 
       default:
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException(
+            "carbon not support " + type.toString() + " avro type yet");
     }
     return out;
   }
 
   /**
+   * converts avro schema to carbon schema required by carbonWriter
+   *
+   * @param avroSchemaString json formatted avro schema as string
+   * @return carbon sdk schema
+   */
+  public static org.apache.carbondata.sdk.file.Schema getCarbonSchemaFromAvroSchema(
+      String avroSchemaString) {
+    if (avroSchemaString == null) {
+      throw new UnsupportedOperationException("avro schema string cannot be null");
+    }
+    Schema avroSchema = new Schema.Parser().parse(avroSchemaString);
+    Field[] carbonField = new Field[avroSchema.getFields().size()];
+    int i = 0;
+    for (Schema.Field avroField : avroSchema.getFields()) {
+      carbonField[i] = prepareFields(avroField);
+      i++;
+    }
+    return new org.apache.carbondata.sdk.file.Schema(carbonField);
+  }
+
+  private static Field prepareFields(Schema.Field avroField) {
+    String FieldName = avroField.name();
+    Schema childSchema = avroField.schema();
+    Schema.Type type = childSchema.getType();
+    switch (type) {
+      case BOOLEAN:
+        return new Field(FieldName, DataTypes.BOOLEAN);
+      case INT:
+        return new Field(FieldName, DataTypes.INT);
+      case LONG:
+        return new Field(FieldName, DataTypes.LONG);
+      case DOUBLE:
+        return new Field(FieldName, DataTypes.DOUBLE);
+      case STRING:
+        return new Field(FieldName, DataTypes.STRING);
+      case FLOAT:
+        return new Field(FieldName, DataTypes.DOUBLE);
+      case RECORD:
+        // recursively get the sub fields
+        ArrayList<StructField> structSubFields = new ArrayList<>();
+        for (Schema.Field avroSubField : childSchema.getFields()) {
+          structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema()));
+        }
+        return new Field(FieldName, "struct", structSubFields);
+      case ARRAY:
+        // recursively get the sub fields
+        ArrayList<StructField> arraySubField = new ArrayList<>();
+        // array will have only one sub field.
+        arraySubField.add(prepareSubFields("val", childSchema.getElementType()));
+        return new Field(FieldName, "array", arraySubField);
+      default:
+        throw new UnsupportedOperationException(
+            "carbon not support " + type.toString() + " avro type yet");
+    }
+  }
+
+  private static StructField prepareSubFields(String FieldName, Schema childSchema) {
+    Schema.Type type = childSchema.getType();
+    switch (type) {
+      case BOOLEAN:
+        return new StructField(FieldName, DataTypes.BOOLEAN);
+      case INT:
+        return new StructField(FieldName, DataTypes.INT);
+      case LONG:
+        return new StructField(FieldName, DataTypes.LONG);
+      case DOUBLE:
+        return new StructField(FieldName, DataTypes.DOUBLE);
+      case STRING:
+        return new StructField(FieldName, DataTypes.STRING);
+      case FLOAT:
+        return new StructField(FieldName, DataTypes.DOUBLE);
+      case RECORD:
+        // recursively get the sub fields
+        ArrayList<StructField> structSubFields = new ArrayList<>();
+        for (Schema.Field avroSubField : childSchema.getFields()) {
+          structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema()));
+        }
+        return (new StructField(FieldName, DataTypes.createStructType(structSubFields)));
+      case ARRAY:
+        // recursively get the sub fields
+        // array will have only one sub field.
+        return (new StructField(FieldName, DataTypes.createArrayType(
+            getMappingDataTypeForArrayRecord(childSchema.getElementType()))));
+      default:
+        throw new UnsupportedOperationException(
+            "carbon not support " + type.toString() + " avro type yet");
+    }
+  }
+
+  private static DataType getMappingDataTypeForArrayRecord(Schema childSchema) {
+    switch (childSchema.getType()) {
+      case BOOLEAN:
+        return DataTypes.BOOLEAN;
+      case INT:
+        return DataTypes.INT;
+      case LONG:
+        return DataTypes.LONG;
+      case DOUBLE:
+        return DataTypes.DOUBLE;
+      case STRING:
+        return DataTypes.STRING;
+      case FLOAT:
+        return DataTypes.DOUBLE;
+      case RECORD:
+        // recursively get the sub fields
+        ArrayList<StructField> structSubFields = new ArrayList<>();
+        for (Schema.Field avroSubField : childSchema.getFields()) {
+          structSubFields.add(prepareSubFields(avroSubField.name(), avroSubField.schema()));
+        }
+        return DataTypes.createStructType(structSubFields);
+      case ARRAY:
+        // array will have only one sub field.
+        return DataTypes.createArrayType(
+            getMappingDataTypeForArrayRecord(childSchema.getElementType()));
+      default:
+        throw new UnsupportedOperationException(
+            "carbon not support " + childSchema.getType().toString() + " avro type yet");
+    }
+  }
+
+  /**
    * Write single row data, input row is Avro Record
    */
   @Override

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf3e9196/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index 105fb6d..163512a 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -21,30 +21,20 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.ArrayType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.datatype.StructField;
-import org.apache.carbondata.core.metadata.datatype.StructType;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.CharEncoding;
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
 import org.junit.Test;
 
-import scala.Array;
 import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
 import org.apache.avro.Schema;
 
-import static org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.InputType.file;
 
 public class AvroCarbonWriterTest {
   private String path = "./AvroCarbonWriterSuiteWriteFiles";
@@ -70,13 +60,9 @@ public class AvroCarbonWriterTest {
     GenericData.Record record = converter.convertToGenericDataRecord(
         json.getBytes(CharEncoding.UTF_8), new Schema.Parser().parse(avroSchema));
 
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.STRING);
-
     try {
       CarbonWriter writer = CarbonWriter.builder()
-          .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
+          .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema))
           .outputPath(path)
           .isTransactionalTable(true)
           .buildWriterForAvroInput();
@@ -145,19 +131,9 @@ public class AvroCarbonWriterTest {
     GenericData.Record record = converter.convertToGenericDataRecord(
         json.getBytes(CharEncoding.UTF_8), new Schema.Parser().parse(avroSchema));
 
-    Field[] fields = new Field[6];
-    // fields[0] = new Field("mynull", DataTypes.NULL);
-    fields[0] = new Field("myboolean", DataTypes.BOOLEAN);
-    fields[1] = new Field("myint", DataTypes.INT);
-    fields[2] = new Field("mylong", DataTypes.LONG);
-    fields[3] = new Field("myfloat", DataTypes.DOUBLE);
-    fields[4] = new Field("mydouble", DataTypes.DOUBLE);
-    fields[5] = new Field("mystring", DataTypes.STRING);
-
-
     try {
       CarbonWriter writer = CarbonWriter.builder()
-          .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
+          .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema))
           .outputPath(path)
           .isTransactionalTable(true)
           .buildWriterForAvroInput();
@@ -250,18 +226,9 @@ public class AvroCarbonWriterTest {
     GenericData.Record record = converter.convertToGenericDataRecord(
         json.getBytes(CharEncoding.UTF_8), nn);
 
-    Field[] fields = new Field[3];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("name1", DataTypes.STRING);
-    // fields[1] = new Field("age", DataTypes.INT);
-    List fld = new ArrayList<StructField>();
-    fld.add(new StructField("street", DataTypes.STRING));
-    fld.add(new StructField("city", DataTypes.STRING));
-    fields[2] = new Field("address", "struct", fld);
-
     try {
       CarbonWriter writer = CarbonWriter.builder()
-          .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
+          .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema))
           .outputPath(path)
           .isTransactionalTable(true)
           .buildWriterForAvroInput();
@@ -323,18 +290,9 @@ public class AvroCarbonWriterTest {
     GenericData.Record record = converter.convertToGenericDataRecord(
         json.getBytes(CharEncoding.UTF_8), nn);
 
-    Field[] fields = new Field[3];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("name1", DataTypes.STRING);
-    // fields[1] = new Field("age", DataTypes.INT);
-    List fld = new ArrayList<StructField>();
-    fld.add(new StructField("street", DataTypes.STRING));
-    fld.add(new StructField("city", DataTypes.STRING));
-    fields[2] = new Field("address", "struct", fld);
-
     try {
       CarbonWriter writer = CarbonWriter.builder()
-          .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
+          .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema))
           .outputPath(path)
           .isTransactionalTable(true)
           .buildWriterForAvroInput();
@@ -365,17 +323,6 @@ public class AvroCarbonWriterTest {
 
   private void WriteAvroComplexData(String mySchema, String json, String[] sortColumns)
       throws UnsupportedEncodingException, IOException, InvalidLoadOptionException {
-    Field[] fields = new Field[4];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("name1", DataTypes.STRING);
-    // fields[1] = new Field("age", DataTypes.INT);
-    List fld = new ArrayList<StructField>();
-    fld.add(new StructField("street", DataTypes.STRING));
-    fld.add(new StructField("city", DataTypes.STRING));
-    fields[2] = new Field("address", "struct", fld);
-    List fld1 = new ArrayList<StructField>();
-    fld1.add(new StructField("eachDoorNum", DataTypes.INT));
-    fields[3] = new Field("doorNum","array",fld1);
 
     // conversion to GenericData.Record
     Schema nn = new Schema.Parser().parse(mySchema);
@@ -385,7 +332,7 @@ public class AvroCarbonWriterTest {
 
     try {
       CarbonWriter writer = CarbonWriter.builder()
-          .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
+          .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema))
           .outputPath(path)
           .isTransactionalTable(true).sortBy(sortColumns)
           .buildWriterForAvroInput();