You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/07 11:06:58 UTC

carbondata git commit: [CARBONDATA-2430][SDK] Reshuffling of Columns given by user in SDK.

Repository: carbondata
Updated Branches:
  refs/heads/master 531ecdf3f -> b1c85fa55


[CARBONDATA-2430][SDK] Reshuffling of Columns given by user in SDK.

Reshuffling of Columns given by the user in SDK. Order should be Sort COlumns -> Dimension -> Complex --> Measure

This closes #2261


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b1c85fa5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b1c85fa5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b1c85fa5

Branch: refs/heads/master
Commit: b1c85fa55eeca1a98b61117c6b46df9a28d60bca
Parents: 531ecdf
Author: sounakr <so...@gmail.com>
Authored: Wed May 2 21:29:57 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon May 7 16:36:46 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/datatype/ArrayType.java       |   4 +
 .../core/metadata/datatype/StructField.java     |  15 +
 .../core/metadata/schema/table/CarbonTable.java |  26 --
 .../schema/table/TableSchemaBuilder.java        |  72 ++-
 .../examples/DataFrameComplexTypeExample.scala  | 147 +++++-
 .../TestNonTransactionalCarbonTable.scala       | 459 +++++++++++++++++--
 .../apache/spark/util/SparkTypeConverter.scala  |  58 +--
 .../carbondata/sdk/file/AvroCarbonWriter.java   |  48 +-
 .../sdk/file/CarbonWriterBuilder.java           | 104 +++--
 .../org/apache/carbondata/sdk/file/Field.java   |   4 +-
 .../sdk/file/AvroCarbonWriterTest.java          | 214 +++++++++
 11 files changed, 963 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java
index c30e21c..c327d7f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/ArrayType.java
@@ -30,4 +30,8 @@ public class ArrayType extends DataType {
   public boolean isComplexType() {
     return true;
   }
+
+  public DataType getElementType() {
+    return elementType;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java
index efdc8e2..bfca057 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/StructField.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.metadata.datatype;
 
 import java.io.Serializable;
+import java.util.List;
 
 public class StructField implements Serializable {
 
@@ -27,9 +28,19 @@ public class StructField implements Serializable {
 
   private DataType dataType;
 
+  private List<StructField> children;
+
   public StructField(String fieldName, DataType dataType) {
     this.fieldName = fieldName;
     this.dataType = dataType;
+    this.children = null;
+  }
+
+
+  public StructField(String fieldName, DataType dataType, List<StructField> children) {
+    this.fieldName = fieldName;
+    this.dataType = dataType;
+    this.children = children;
   }
 
   public DataType getDataType() {
@@ -39,4 +50,8 @@ public class StructField implements Serializable {
   public String getFieldName() {
     return fieldName;
   }
+
+  public List<StructField> getChildren() {
+    return children;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 4178d8a..cf5660f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -37,8 +37,6 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.features.TableOperation;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
-import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.metadata.schema.PartitionInfo;
@@ -49,7 +47,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.reader.CarbonHeaderReader;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.TableProvider;
@@ -60,7 +57,6 @@ import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.format.FileHeader;
 
 /**
  * Mapping class for Carbon actual table
@@ -223,28 +219,6 @@ public class CarbonTable implements Serializable {
     }
   }
 
-  public static CarbonTable buildFromDataFile(
-      String tableName, String tablePath, String filePath) throws IOException {
-    CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(filePath);
-    FileHeader fileHeader = carbonHeaderReader.readHeader();
-    TableSchemaBuilder builder = TableSchema.builder();
-    ThriftWrapperSchemaConverterImpl schemaConverter = new ThriftWrapperSchemaConverterImpl();
-    for (org.apache.carbondata.format.ColumnSchema column : fileHeader.getColumn_schema()) {
-      ColumnSchema columnSchema = schemaConverter.fromExternalToWrapperColumnSchema(column);
-      builder.addColumn(
-          new StructField(columnSchema.getColumnName(), columnSchema.getDataType()), false);
-    }
-
-    TableSchema tableSchema = builder.tableName(tableName).build();
-    TableInfo tableInfo = new TableInfo();
-    tableInfo.setFactTable(tableSchema);
-    tableInfo.setTablePath(tablePath);
-    tableInfo.setDatabaseName("default");
-    tableInfo.setTableUniqueName(
-        CarbonTable.buildUniqueName("default", tableSchema.getTableName()));
-    return buildFromTableInfo(tableInfo);
-  }
-
   public static CarbonTable buildFromTablePath(String tableName, String tablePath,
       boolean isTransactionalTable) throws IOException {
     if (isTransactionalTable) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 731fea8..42bb958 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -27,6 +27,7 @@ import java.util.UUID;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.metadata.datatype.ArrayType;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalType;
@@ -46,7 +47,11 @@ public class TableSchemaBuilder {
 
   private List<ColumnSchema> sortColumns = new LinkedList<>();
 
-  private List<ColumnSchema> otherColumns = new LinkedList<>();
+  private List<ColumnSchema> dimension = new LinkedList<>();
+
+  private List<ColumnSchema> complex = new LinkedList<>();
+
+  private List<ColumnSchema> measures = new LinkedList<>();
 
   private int blockSize;
 
@@ -86,7 +91,9 @@ public class TableSchemaBuilder {
     schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>());
     schema.setSchemaEvalution(schemaEvol);
     List<ColumnSchema> allColumns = new LinkedList<>(sortColumns);
-    allColumns.addAll(otherColumns);
+    allColumns.addAll(dimension);
+    allColumns.addAll(complex);
+    allColumns.addAll(measures);
     schema.setListOfColumns(allColumns);
 
     Map<String, String> property = new HashMap<>();
@@ -108,21 +115,36 @@ public class TableSchemaBuilder {
   }
 
   public ColumnSchema addColumn(StructField field, boolean isSortColumn) {
+    return addColumn(field, null, isSortColumn, false);
+  }
+
+  private ColumnSchema addColumn(StructField field, String parentName, boolean isSortColumn,
+      boolean isComplexChild) {
     Objects.requireNonNull(field);
     checkRepeatColumnName(field);
     ColumnSchema newColumn = new ColumnSchema();
-    newColumn.setColumnName(field.getFieldName());
+    if (parentName != null) {
+      newColumn.setColumnName(parentName + "." + field.getFieldName());
+    } else {
+      newColumn.setColumnName(field.getFieldName());
+    }
     newColumn.setDataType(field.getDataType());
     if (isSortColumn ||
         field.getDataType() == DataTypes.STRING ||
         field.getDataType() == DataTypes.DATE ||
         field.getDataType() == DataTypes.TIMESTAMP ||
-        DataTypes.isStructType(field.getDataType())) {
+        field.getDataType().isComplexType() ||
+        (isComplexChild))  {
       newColumn.setDimensionColumn(true);
     } else {
       newColumn.setDimensionColumn(false);
     }
-    newColumn.setSchemaOrdinal(ordinal++);
+    if (!isComplexChild) {
+      newColumn.setSchemaOrdinal(ordinal++);
+    } else {
+      // child column should not be counted for schema ordinal
+      newColumn.setSchemaOrdinal(-1);
+    }
     newColumn.setColumnar(true);
 
     // For NonTransactionalTable, multiple sdk writer output with same column name can be placed in
@@ -135,7 +157,11 @@ public class TableSchemaBuilder {
     newColumn.setColumnReferenceId(newColumn.getColumnUniqueId());
     newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn));
     if (field.getDataType().isComplexType()) {
-      newColumn.setNumberOfChild(((StructType) field.getDataType()).getFields().size());
+      if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
+        newColumn.setNumberOfChild(1);
+      } else {
+        newColumn.setNumberOfChild(((StructType) field.getDataType()).getFields().size());
+      }
     }
     if (DataTypes.isDecimal(field.getDataType())) {
       DecimalType decimalType = (DecimalType) field.getDataType();
@@ -143,17 +169,29 @@ public class TableSchemaBuilder {
       newColumn.setScale(decimalType.getScale());
     }
     if (!isSortColumn) {
-      otherColumns.add(newColumn);
+      if (!newColumn.isDimensionColumn()) {
+        measures.add(newColumn);
+      } else if (DataTypes.isStructType(field.getDataType()) ||
+          DataTypes.isArrayType(field.getDataType()) || isComplexChild) {
+        complex.add(newColumn);
+      } else {
+        dimension.add(newColumn);
+      }
     }
     if (newColumn.isDimensionColumn()) {
       newColumn.setUseInvertedIndex(true);
     }
     if (field.getDataType().isComplexType()) {
-      if (((StructType) field.getDataType()).getFields().size() > 0) {
+      String parentFieldName = newColumn.getColumnName();
+      if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
+        addColumn(new StructField("val",
+            ((ArrayType) field.getDataType()).getElementType()), field.getFieldName(), false, true);
+      } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")
+          && ((StructType) field.getDataType()).getFields().size() > 0) {
         // This field has children.
         List<StructField> fields = ((StructType) field.getDataType()).getFields();
-        for (int i = 0; i < fields.size(); i ++) {
-          addColumn(fields.get(i), false);
+        for (int i = 0; i < fields.size(); i++) {
+          addColumn(fields.get(i), parentFieldName, false, true);
         }
       }
     }
@@ -169,7 +207,19 @@ public class TableSchemaBuilder {
         throw new IllegalArgumentException("column name already exists");
       }
     }
-    for (ColumnSchema column : otherColumns) {
+    for (ColumnSchema column : dimension) {
+      if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) {
+        throw new IllegalArgumentException("column name already exists");
+      }
+    }
+
+    for (ColumnSchema column : complex) {
+      if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) {
+        throw new IllegalArgumentException("column name already exists");
+      }
+    }
+
+    for (ColumnSchema column : measures) {
       if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) {
         throw new IllegalArgumentException("column name already exists");
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala
index 34b32f4..0abf5c5 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/DataFrameComplexTypeExample.scala
@@ -22,8 +22,19 @@ import org.apache.spark.sql.{SaveMode, SparkSession}
 import org.apache.carbondata.examples.util.ExampleUtils
 
 case class StructElement(school: Array[String], age: Int)
+
+case class StructElement1(school: Array[String], school1: Array[String], age: Int)
+
 case class ComplexTypeData(id: Int, name: String, city: String, salary: Float, file: StructElement)
 
+case class ComplexTypeData1(id: Int,
+    name: String,
+    city: String,
+    salary: Float,
+    file: StructElement1)
+
+case class ComplexTypeData2(id: Int, name: String, city: String, salary: Float, file: Array[String])
+
 // scalastyle:off println
 object DataFrameComplexTypeExample {
 
@@ -34,16 +45,21 @@ object DataFrameComplexTypeExample {
     spark.close()
   }
 
-  def exampleBody(spark : SparkSession): Unit = {
-    val complexTableName = s"complex_type_table"
+  def exampleBody(spark: SparkSession): Unit = {
+    val complexTypeDictionaryTableName = s"complex_type_dictionary_table"
+    val complexTypeNoDictionaryTableName = s"complex_type_noDictionary_table"
+    val complexTypeNoDictionaryTableNameArray = s"complex_type_noDictionary_array_table"
 
     import spark.implicits._
 
     // drop table if exists previously
-    spark.sql(s"DROP TABLE IF EXISTS ${ complexTableName }")
+    spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeDictionaryTableName }")
+    spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableName }")
+    spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableNameArray }")
+
     spark.sql(
       s"""
-         | CREATE TABLE ${ complexTableName }(
+         | CREATE TABLE ${ complexTypeDictionaryTableName }(
          | id INT,
          | name STRING,
          | city STRING,
@@ -56,6 +72,37 @@ object DataFrameComplexTypeExample {
          | 'dictionary_include'='city')
          | """.stripMargin)
 
+    spark.sql(
+      s"""
+         | CREATE TABLE ${ complexTypeNoDictionaryTableNameArray }(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | file array<string>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(
+         | 'sort_columns'='name',
+         | 'dictionary_include'='city')
+         | """.stripMargin)
+
+
+    spark.sql(
+      s"""
+         | CREATE TABLE ${ complexTypeNoDictionaryTableName }(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | file struct<school:array<string>, school1:array<string>, age:int>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(
+         | 'sort_columns'='name')
+         | """.stripMargin)
+
+
     val sc = spark.sparkContext
     // generate data
     val df = sc.parallelize(Seq(
@@ -66,30 +113,108 @@ object DataFrameComplexTypeExample {
       ComplexTypeData(3, "index_3", "city_3", 30000.0f,
         StructElement(Array("struct_31", "struct_32"), 30))
     )).toDF
+
+    // generate data
+    val df2 = sc.parallelize(Seq(
+      ComplexTypeData2(1, "index_1", "city_1", 10000.0f, Array("struct_11", "struct_12")),
+      ComplexTypeData2(2, "index_2", "city_2", 20000.0f, Array("struct_21", "struct_22")),
+      ComplexTypeData2(3, "index_3", "city_3", 30000.0f, Array("struct_31", "struct_32"))
+    )).toDF
+
+    // generate data
+    val df1 = sc.parallelize(Seq(
+      ComplexTypeData1(1, "index_1", "city_1", 10000.0f,
+        StructElement1(Array("struct_11", "struct_12"), Array("struct_11", "struct_12"), 10)),
+      ComplexTypeData1(2, "index_2", "city_2", 20000.0f,
+        StructElement1(Array("struct_21", "struct_22"), Array("struct_11", "struct_12"), 20)),
+      ComplexTypeData1(3, "index_3", "city_3", 30000.0f,
+        StructElement1(Array("struct_31", "struct_32"), Array("struct_11", "struct_12"), 30))
+    )).toDF
+
+
     df.printSchema()
     df.write
       .format("carbondata")
-      .option("tableName", complexTableName)
+      .option("tableName", complexTypeDictionaryTableName)
+      .mode(SaveMode.Append)
+      .save()
+
+    df1.printSchema()
+    df1.write
+      .format("carbondata")
+      .option("tableName", complexTypeNoDictionaryTableName)
       .mode(SaveMode.Append)
       .save()
 
-    spark.sql(s"select count(*) from ${ complexTableName }").show(100, truncate = false)
 
-    spark.sql(s"select * from ${ complexTableName } order by id desc").show(300, truncate = false)
+    df2.printSchema()
+    df2.write
+      .format("carbondata")
+      .option("tableName", complexTypeNoDictionaryTableNameArray)
+      .mode(SaveMode.Append)
+      .save()
+
+
+    spark.sql(s"select count(*) from ${ complexTypeDictionaryTableName }")
+      .show(100, truncate = false)
+
+    spark.sql(s"select * from ${ complexTypeDictionaryTableName } order by id desc")
+      .show(300, truncate = false)
+
+    spark.sql(s"select * " +
+              s"from ${ complexTypeDictionaryTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+    spark.sql(s"select * " +
+              s"from ${ complexTypeDictionaryTableName } " +
+              s"where id > 10 limit 100").show(100, truncate = false)
+
+    // show segments
+    spark.sql(s"SHOW SEGMENTS FOR TABLE ${ complexTypeDictionaryTableName }").show(false)
+
+    // drop table
+    spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeDictionaryTableName }")
+
+
+    spark.sql(s"select count(*) from ${ complexTypeNoDictionaryTableName }")
+      .show(100, truncate = false)
+
+    spark.sql(s"select * from ${ complexTypeNoDictionaryTableName } order by id desc")
+      .show(300, truncate = false)
+
+    spark.sql(s"select * " +
+              s"from ${ complexTypeNoDictionaryTableName } " +
+              s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+    spark.sql(s"select * " +
+              s"from ${ complexTypeNoDictionaryTableName } " +
+              s"where id > 10 limit 100").show(100, truncate = false)
+
+    // show segments
+    spark.sql(s"SHOW SEGMENTS FOR TABLE ${ complexTypeNoDictionaryTableName }").show(false)
+
+    // drop table
+    spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableName }")
+
+    spark.sql(s"select count(*) from ${ complexTypeNoDictionaryTableNameArray }")
+      .show(100, truncate = false)
+
+    spark.sql(s"select * from ${ complexTypeNoDictionaryTableNameArray } order by id desc")
+      .show(300, truncate = false)
 
     spark.sql(s"select * " +
-              s"from ${ complexTableName } " +
+              s"from ${ complexTypeNoDictionaryTableNameArray } " +
               s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
 
     spark.sql(s"select * " +
-              s"from ${ complexTableName } " +
+              s"from ${ complexTypeNoDictionaryTableNameArray } " +
               s"where id > 10 limit 100").show(100, truncate = false)
 
     // show segments
-    spark.sql(s"SHOW SEGMENTS FOR TABLE ${complexTableName}").show(false)
+    spark.sql(s"SHOW SEGMENTS FOR TABLE ${ complexTypeNoDictionaryTableNameArray }").show(false)
 
     // drop table
-    spark.sql(s"DROP TABLE IF EXISTS ${ complexTableName }")
+    spark.sql(s"DROP TABLE IF EXISTS ${ complexTypeNoDictionaryTableNameArray }")
   }
 }
 // scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index d8e5374..fabcd02 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -17,13 +17,13 @@
 
 package org.apache.carbondata.spark.testsuite.createTable
 
-import java.io.{File, FileFilter, IOException}
+import java.io.{File, FileFilter}
 import java.util
 
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
-import org.junit.{Assert, Test}
+import org.junit.Assert
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -35,6 +35,7 @@ import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import org.apache.avro
 import org.apache.commons.lang.CharEncoding
 import tech.allegro.schema.json2avro.converter.JsonAvroConverter
 
@@ -254,7 +255,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
     sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'")
     sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false)
-    sql("select * from t1").show(200,false)
     sql("insert into sdkOutputTable select * from t1").show(200,false)
 
     checkAnswer(sql(s"""select * from sdkOutputTable where age = 12"""),
@@ -545,7 +545,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
            |'$writerPath' """.stripMargin)
 
-      sql("select * from sdkOutputTable").show(false)
     }
     assert(exception.getMessage()
       .contains("Operation not allowed: Invalid table path provided:"))
@@ -687,49 +686,20 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
   }
 
 
-  def buildAvroTestData(rows: Int, options: util.Map[String, String]): Any = {
-    FileUtils.deleteDirectory(new File(writerPath))
-    val newAvroSchema = "{" + " \"type\" : \"record\", " + "  \"name\" : \"userInfo\", " +
-                        "  \"namespace\" : \"my.example\", " +
-                        "  \"fields\" : [{\"name\" : \"username\", " +
-                        "  \"type\" : \"string\", " + "  \"default\" : \"NONE\"}, " +
-                        " {\"name\" : \"age\", " + " \"type\" : \"int\", " +
-                        " \"default\" : -1}, " + "{\"name\" : \"address\", " +
-                        "   \"type\" : { " + "  \"type\" : \"record\", " +
-                        "   \"name\" : \"mailing_address\", " + "  \"fields\" : [ {" +
-                        "        \"name\" : \"street\", " +
-                        "       \"type\" : \"string\", " +
-                        "       \"default\" : \"NONE\"}, { " + " \"name\" : \"city\", " +
-                        "  \"type\" : \"string\", " + "  \"default\" : \"NONE\"}, " +
-                        "                 ]}, " + " \"default\" : {} " + " } " + "}"
-    val mySchema = "{" + "  \"name\": \"address\", " + "   \"type\": \"record\", " +
-                   "    \"fields\": [  " +
-                   "  { \"name\": \"name\", \"type\": \"string\"}, " +
-                   "  { \"name\": \"age\", \"type\": \"int\"}, " + "  { " +
-                   "    \"name\": \"address\", " + "      \"type\": { " +
-                   "    \"type\" : \"record\", " + "        \"name\" : \"my_address\", " +
-                   "        \"fields\" : [ " +
-                   "    {\"name\": \"street\", \"type\": \"string\"}, " +
-                   "    {\"name\": \"city\", \"type\": \"string\"} " + "  ]} " + "  } " +
-                   "] " + "}"
-    val json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", " +
-               "\"city\":\"bang\"}}"
+  private def WriteFilesWithAvroWriter(rows: Int,
+      mySchema: String,
+      json: String,
+      fields: Array[Field]) = {
     // conversion to GenericData.Record
-    val nn = new org.apache.avro.Schema.Parser().parse(mySchema)
+    val nn = new avro.Schema.Parser().parse(mySchema)
     val converter = new JsonAvroConverter
     val record = converter
       .convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn)
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.STRING)
-    // fields[1] = new Field("age", DataTypes.INT);
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fld.add(new StructField("city", DataTypes.STRING))
-    fields(2) = new Field("address", "struct", fld)
+
     try {
       val writer = CarbonWriter.builder.withSchema(new Schema(fields))
-        .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+        .outputPath(writerPath).isTransactionalTable(false)
+        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput
       var i = 0
       while (i < rows) {
         writer.write(record)
@@ -745,28 +715,419 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  def buildAvroTestDataSingleFile(): Any = {
+  // struct type test
+  def buildAvroTestDataStruct(rows: Int, options: util.Map[String, String]): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    val mySchema =
+      """
+        |{"name": "address",
+        | "type": "record",
+        | "fields": [
+        |  { "name": "name", "type": "string"},
+        |  { "name": "age", "type": "int"},
+        |  { "name": "address",  "type": {
+        |    "type" : "record",  "name" : "my_address",
+        |        "fields" : [
+        |    {"name": "street", "type": "string"},
+        |    {"name": "city", "type": "string"}]}}
+        |]}
+      """.stripMargin
+
+    val json = """ {"name":"bob", "age":10, "address" : {"street":"abc", "city":"bang"}} """
+
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("street", DataTypes.STRING))
+    fld.add(new StructField("city", DataTypes.STRING))
+    fields(2) = new Field("address", "struct", fld)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+  }
+
+  def buildAvroTestDataStructType(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildAvroTestDataStruct(3, null)
+  }
+
+  // array type test
+  def buildAvroTestDataArrayType(rows: Int, options: util.Map[String, String]): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val mySchema = """ {
+                     |      "name": "address",
+                     |      "type": "record",
+                     |      "fields": [
+                     |      {
+                     |      "name": "name",
+                     |      "type": "string"
+                     |      },
+                     |      {
+                     |      "name": "age",
+                     |      "type": "int"
+                     |      },
+                     |      {
+                     |      "name": "address",
+                     |      "type": {
+                     |      "type": "array",
+                     |      "items": {
+                     |      "name": "street",
+                     |      "type": "string"
+                     |      }
+                     |      }
+                     |      }
+                     |      ]
+                     |  }
+                     """.stripMargin
+
+    val json: String = """ {"name": "bob","age": 10,"address": ["abc", "defg"]} """
+
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+    // fields[1] = new Field("age", DataTypes.INT);
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("street", DataTypes.STRING))
+    fields(2) = new Field("address", "array", fld)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+  }
+
+  def buildAvroTestDataSingleFileArrayType(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildAvroTestDataArrayType(3, null)
+  }
+
+  // struct with array type test
+  def buildAvroTestDataStructWithArrayType(rows: Int, options: util.Map[String, String]): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val mySchema = """
+                     {
+                     |     "name": "address",
+                     |     "type": "record",
+                     |     "fields": [
+                     |     { "name": "name", "type": "string"},
+                     |     { "name": "age", "type": "int"},
+                     |     {
+                     |     "name": "address",
+                     |     "type": {
+                     |     "type" : "record",
+                     |     "name" : "my_address",
+                     |     "fields" : [
+                     |     {"name": "street", "type": "string"},
+                     |     {"name": "city", "type": "string"}
+                     |     ]}
+                     |     },
+                     |     {"name" :"doorNum",
+                     |     "type" : {
+                     |     "type" :"array",
+                     |     "items":{
+                     |     "name" :"EachdoorNums",
+                     |     "type" : "int",
+                     |     "default":-1
+                     |     }}
+                     |     }]}
+                     """.stripMargin
+
+    val json =
+      """ {"name":"bob", "age":10,
+          |"address" : {"street":"abc", "city":"bang"},
+          |"doorNum" : [1,2,3,4]}""".stripMargin
+
+    val fields = new Array[Field](4)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("street", DataTypes.STRING))
+    fld.add(new StructField("city", DataTypes.STRING))
+    fields(2) = new Field("address", "struct", fld)
+    val fld1 = new util.ArrayList[StructField]
+    fld1.add(new StructField("eachDoorNum", DataTypes.INT))
+    fields(3) = new Field("doorNum", "array", fld1)
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+  }
+
+  def buildAvroTestDataBothStructArrayType(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildAvroTestDataStructWithArrayType(3, null)
+  }
+
+
+  // ArrayOfStruct test
+  def buildAvroTestDataArrayOfStruct(rows: Int, options: util.Map[String, String]): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val mySchema = """ {
+                     |	"name": "address",
+                     |	"type": "record",
+                     |	"fields": [
+                     |		{
+                     |			"name": "name",
+                     |			"type": "string"
+                     |		},
+                     |		{
+                     |			"name": "age",
+                     |			"type": "int"
+                     |		},
+                     |		{
+                     |			"name": "doorNum",
+                     |			"type": {
+                     |				"type": "array",
+                     |				"items": {
+                     |					"type": "record",
+                     |					"name": "my_address",
+                     |					"fields": [
+                     |						{
+                     |							"name": "street",
+                     |							"type": "string"
+                     |						},
+                     |						{
+                     |							"name": "city",
+                     |							"type": "string"
+                     |						}
+                     |					]
+                     |				}
+                     |			}
+                     |		}
+                     |	]
+                     |} """.stripMargin
+    val json =
+      """ {"name":"bob","age":10,"doorNum" :
+        |[{"street":"abc","city":"city1"},
+        |{"street":"def","city":"city2"},
+        |{"street":"ghi","city":"city3"},
+        |{"street":"jkl","city":"city4"}]} """.stripMargin
+
+
+
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("street", DataTypes.STRING))
+    fld.add(new StructField("city", DataTypes.STRING))
+
+    val fld2 = new util.ArrayList[StructField]
+    fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
+    fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+  }
+
+  def buildAvroTestDataArrayOfStructType(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildAvroTestDataArrayOfStruct(3, null)
+  }
+
+
+  // StructOfArray test
+  def buildAvroTestDataStructOfArray(rows: Int, options: util.Map[String, String]): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+
+    val mySchema = """ {
+                     |	"name": "address",
+                     |	"type": "record",
+                     |	"fields": [
+                     |		{
+                     |			"name": "name",
+                     |			"type": "string"
+                     |		},
+                     |		{
+                     |			"name": "age",
+                     |			"type": "int"
+                     |		},
+                     |		{
+                     |			"name": "address",
+                     |			"type": {
+                     |				"type": "record",
+                     |				"name": "my_address",
+                     |				"fields": [
+                     |					{
+                     |						"name": "street",
+                     |						"type": "string"
+                     |					},
+                     |					{
+                     |						"name": "city",
+                     |						"type": "string"
+                     |					},
+                     |					{
+                     |						"name": "doorNum",
+                     |						"type": {
+                     |							"type": "array",
+                     |							"items": {
+                     |								"name": "EachdoorNums",
+                     |								"type": "int",
+                     |								"default": -1
+                     |							}
+                     |						}
+                     |					}
+                     |				]
+                     |			}
+                     |		}
+                     |	]
+                     |} """.stripMargin
+
+    val json = """ {
+                 |	"name": "bob",
+                 |	"age": 10,
+                 |	"address": {
+                 |		"street": "abc",
+                 |		"city": "bang",
+                 |		"doorNum": [
+                 |			1,
+                 |			2,
+                 |			3,
+                 |			4
+                 |		]
+                 |	}
+                 |} """.stripMargin
+
+
+
+
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+
+    val fld1 = new util.ArrayList[StructField]
+    fld1.add(new StructField("eachDoorNum", DataTypes.INT))
+
+    val fld2 = new util.ArrayList[StructField]
+    fld2.add(new StructField("street", DataTypes.STRING))
+    fld2.add(new StructField("city", DataTypes.STRING))
+    fld2.add(new StructField("doorNum", DataTypes.createArrayType(DataTypes.INT), fld1))
+
+    fields(2) = new Field("address","struct",fld2)
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+  }
+
+  def buildAvroTestDataStructOfArrayType(): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
-    buildAvroTestData(3, null)
+    buildAvroTestDataStructOfArray(3, null)
   }
 
-  test("Read sdk writer Avro output ") {
-    buildAvroTestDataSingleFile()
+
+  test("Read sdk writer Avro output Record Type") {
+    buildAvroTestDataStructType()
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
     sql(
       s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
          |'$writerPath' """.stripMargin)
 
-    sql("select * from sdkOutputTable").show(false)
 
     checkAnswer(sql("select * from sdkOutputTable"), Seq(
-      Row("bob", "10", Row("abc","bang")),
-      Row("bob", "10", Row("abc","bang")),
-      Row("bob", "10", Row("abc","bang"))))
+      Row("bob", 10, Row("abc","bang")),
+      Row("bob", 10, Row("abc","bang")),
+      Row("bob", 10, Row("abc","bang"))))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).listFiles().length > 0)
+  }
+
+  test("Read sdk writer Avro output Array Type") {
+    buildAvroTestDataSingleFileArrayType()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("select * from sdkOutputTable").show(200,false)
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("bob", 10, new mutable.WrappedArray.ofRef[String](Array("abc", "defg"))),
+      Row("bob", 10, new mutable.WrappedArray.ofRef[String](Array("abc", "defg"))),
+      Row("bob", 10, new mutable.WrappedArray.ofRef[String](Array("abc", "defg")))))
 
     sql("DROP TABLE sdkOutputTable")
     // drop table should not delete the files
     assert(new File(writerPath).exists())
   }
+
+  test("Read sdk writer Avro output with both Array and Struct Type") {
+    buildAvroTestDataBothStructArrayType()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    /*
+    *-+----+---+----------+------------+
+    |name|age|address   |doorNum     |
+    +----+---+----------+------------+
+    |bob |10 |[abc,bang]|[1, 2, 3, 4]|
+    |bob |10 |[abc,bang]|[1, 2, 3, 4]|
+    |bob |10 |[abc,bang]|[1, 2, 3, 4]|
+    +----+---+----------+------------+
+    * */
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("bob", 10, Row("abc","bang"), mutable.WrappedArray.newBuilder[Int].+=(1,2,3,4)),
+      Row("bob", 10, Row("abc","bang"), mutable.WrappedArray.newBuilder[Int].+=(1,2,3,4)),
+      Row("bob", 10, Row("abc","bang"), mutable.WrappedArray.newBuilder[Int].+=(1,2,3,4))))
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+  }
+
+
+  test("Read sdk writer Avro output with Array of struct") {
+    buildAvroTestDataArrayOfStructType()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    // TODO: Add a validation
+    /*
+    +----+---+----------------------------------------------------+
+    |name|age|doorNum                                             |
+    +----+---+----------------------------------------------------+
+    |bob |10 |[[abc,city1], [def,city2], [ghi,city3], [jkl,city4]]|
+    |bob |10 |[[abc,city1], [def,city2], [ghi,city3], [jkl,city4]]|
+    |bob |10 |[[abc,city1], [def,city2], [ghi,city3], [jkl,city4]]|
+    +----+---+----------------------------------------------------+ */
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+  }
+
+
+  // Struct of array
+  test("Read sdk writer Avro output with struct of Array") {
+    buildAvroTestDataStructOfArrayType()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    // TODO: Add a validation
+    /*
+    +----+---+-------------------------------------------------------+
+    |name|age|address                                                |
+    +----+---+-------------------------------------------------------+
+    |bob |10 |[abc,bang,WrappedArray(1, 2, 3, 4)]                    |
+    |bob |10 |[abc,bang,WrappedArray(1, 2, 3, 4)]                    |
+    |bob |10 |[abc,bang,WrappedArray(1, 2, 3, 4)]                    |
+    +----+---+-------------------------------------------------------+*/
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
index fe11b98..65210b8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
@@ -97,32 +97,16 @@ private[spark] object SparkTypeConverter {
   def getStructChildren(table: CarbonTable, dimName: String): String = {
     table.getChildren(dimName).asScala.map(childDim => {
       childDim.getDataType.getName.toLowerCase match {
-        case "array" => if (table.isTransactionalTable) {s"${
+        case "array" => s"${
           childDim.getColName.substring(dimName.length + 1)
-        }:array<${ getArrayChildren(table, childDim.getColName) }>" } else {
-          // For non Transactional table the Childrends of Struct Columns
-          // are not appended with their parent.
-          s"${
-            childDim.getColName
-          }:array<${ getArrayChildren(table, childDim.getColName) }>"
-        }
-        case "struct" => if (table.isTransactionalTable) { s"${
+        }:array<${ getArrayChildren(table, childDim.getColName) }>"
+        case "struct" => s"${
           childDim.getColName.substring(dimName.length + 1)
         }:struct<${ table.getChildren(childDim.getColName)
           .asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",")
-        }>"} else {
-          s"${
-            childDim.getColName
-          }:struct<${ table.getChildren(childDim.getColName)
-            .asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",")
-          }>"
-        }
-        case dType => if (table.isTransactionalTable) {
-          s"${ childDim.getColName
+        }>"
+        case dType => s"${ childDim.getColName
           .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
-        } else {
-          s"${ childDim.getColName}:${ addDecimalScaleAndPrecision(childDim, dType) }"
-        }
       }
     }).mkString(",")
   }
@@ -139,31 +123,13 @@ private[spark] object SparkTypeConverter {
   private def recursiveMethod(
       table: CarbonTable, dimName: String, childDim: CarbonDimension) = {
     childDim.getDataType.getName.toLowerCase match {
-      case "array" => if (table.isTransactionalTable) {
-        s"${
-          childDim.getColName.substring(dimName.length + 1)
-        }:array<${ getArrayChildren(table, childDim.getColName) }>"
-      } else {
-        // For non Transactional table the Childrends of Struct Columns
-        // are not appended with their parent.
-        s"${
-          childDim.getColName
-        }:array<${ getArrayChildren(table, childDim.getColName) }>"
-      }
-      case "struct" => if (table.isTransactionalTable) {
-        s"${
-          childDim.getColName.substring(dimName.length + 1)
-        }:struct<${ getStructChildren(table, childDim.getColName) }>"
-      } else {
-        s"${
-          childDim.getColName
-        }:struct<${ getStructChildren(table, childDim.getColName) }>"
-      }
-      case dType => if (table.isTransactionalTable) {
-        s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
-      } else {
-        s"${ childDim.getColName }:${ dType }"
-      }
+      case "array" => s"${
+        childDim.getColName.substring(dimName.length + 1)
+      }:array<${ getArrayChildren(table, childDim.getColName) }>"
+      case "struct" => s"${
+        childDim.getColName.substring(dimName.length + 1)
+      }:struct<${ getStructChildren(table, childDim.getColName) }>"
+      case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 8f1994a..bc2e9db 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.sdk.file;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -69,16 +70,16 @@ class AvroCarbonWriter extends CarbonWriter {
       avroSchema = avroRecord.getSchema();
     }
     List<Schema.Field> fields = avroSchema.getFields();
-    Object [] csvField = new String[fields.size()];
+    Object [] csvField = new Object[fields.size()];
     for (int i = 0; i < fields.size(); i++) {
-      csvField[i] = avroFieldToString(fields.get(i), avroRecord.get(i));
+      csvField[i] = avroFieldToObject(fields.get(i), avroRecord.get(i), 0);
     }
     return csvField;
   }
 
-  private String avroFieldToString(Schema.Field fieldType, Object fieldValue) {
+  private String avroFieldToObject(Schema.Field avroField, Object fieldValue, int delimiterLevel) {
     StringBuilder out = new StringBuilder();
-    Schema.Type type = fieldType.schema().getType();
+    Schema.Type type = avroField.schema().getType();
     switch (type) {
       case BOOLEAN:
       case INT:
@@ -89,22 +90,47 @@ class AvroCarbonWriter extends CarbonWriter {
         out.append(fieldValue.toString());
         break;
       case RECORD:
-        List<Schema.Field> fields = fieldType.schema().getFields();
+        List<Schema.Field> fields = avroField.schema().getFields();
         String delimiter = null;
-        for (int i = 0; i < fields.size(); i ++) {
-          if (i == 0) {
+        delimiterLevel ++;
+        for (int i = 0; i < fields.size(); i++) {
+          if (delimiterLevel == 1) {
             delimiter = "$";
-          } else {
+          } else if (delimiterLevel > 1) {
             delimiter = ":";
           }
           if (i != (fields.size() - 1)) {
-            out.append(avroFieldToString(fields.get(i), ((GenericData.Record) fieldValue).get(i)))
-                .append(delimiter);
+            out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i),
+                delimiterLevel)).append(delimiter);
+          } else {
+            out.append(avroFieldToObject(fields.get(i), ((GenericData.Record) fieldValue).get(i),
+                delimiterLevel));
+          }
+        }
+        break;
+      case ARRAY:
+        int size = ((ArrayList) fieldValue).size();
+        String delimiterArray = null;
+        delimiterLevel ++;
+        if (delimiterLevel == 1) {
+          delimiterArray = "$";
+        } else if (delimiterLevel > 1) {
+          delimiterArray = ":";
+        }
+
+        for (int i = 0; i < size; i++) {
+          if (i != size - 1) {
+            out.append(avroFieldToObject(
+                new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
+                ((ArrayList) fieldValue).get(i), delimiterLevel)).append(delimiterArray);
           } else {
-            out.append(avroFieldToString(fields.get(i), ((GenericData.Record) fieldValue).get(i)));
+            out.append(avroFieldToObject(
+                new Schema.Field(avroField.name(), avroField.schema().getElementType(), null, true),
+                ((ArrayList) fieldValue).get(i), delimiterLevel));
           }
         }
         break;
+
       default:
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 68bc3ab..397f151 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -333,6 +333,23 @@ public class CarbonWriterBuilder {
     return new AvroCarbonWriter(loadModel);
   }
 
+  private void setCsvHeader(CarbonLoadModel model) {
+    Field[] fields = schema.getFields();
+    StringBuilder builder = new StringBuilder();
+    String[] columns = new String[fields.length];
+    int i = 0;
+    for (Field field : fields) {
+      if (null != field) {
+        builder.append(field.getFieldName());
+        builder.append(",");
+        columns[i++] = field.getFieldName();
+      }
+    }
+    String header = builder.toString();
+    model.setCsvHeader(header.substring(0, header.length() - 1));
+    model.setCsvHeaderColumns(columns);
+  }
+
   private CarbonLoadModel createLoadModel() throws IOException, InvalidLoadOptionException {
     // build CarbonTable using schema
     CarbonTable table = buildCarbonTable();
@@ -368,7 +385,7 @@ public class CarbonWriterBuilder {
       for (Field field : schema.getFields()) {
         if (null != field) {
           if (field.getDataType() == DataTypes.STRING ||
-              field.getDataType() == DataTypes.DATE ||
+              field.getDataType() == DataTypes.DATE  ||
               field.getDataType() == DataTypes.TIMESTAMP) {
             sortColumnsList.add(field.getFieldName());
           }
@@ -380,30 +397,9 @@ public class CarbonWriterBuilder {
       sortColumnsList = Arrays.asList(sortColumns);
     }
     ColumnSchema[] sortColumnsSchemaList = new ColumnSchema[sortColumnsList.size()];
-    for (Field field : schema.getFields()) {
-      if (null != field) {
-        if (field.getChildren() != null && field.getChildren().size() > 0) {
-          // Loop through the inner columns and for a StructData
-          List<StructField> structFieldsArray =
-              new ArrayList<StructField>(field.getChildren().size());
-          String parentName = field.getFieldName();
-          for (StructField childFld : field.getChildren()) {
-            structFieldsArray.add(new StructField(childFld.getFieldName(), childFld.getDataType()));
-          }
-          DataType complexType = DataTypes.createStructType(structFieldsArray);
-          tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), complexType), false);
-        } else {
-          int isSortColumn = sortColumnsList.indexOf(field.getFieldName());
-          ColumnSchema columnSchema = tableSchemaBuilder
-              .addColumn(new StructField(field.getFieldName(), field.getDataType()),
-                  isSortColumn > -1);
-          if (isSortColumn > -1) {
-            columnSchema.setSortColumn(true);
-            sortColumnsSchemaList[isSortColumn] = columnSchema;
-          }
-        }
-      }
-    }
+    Field[] fields = schema.getFields();
+    buildTableSchema(fields, tableSchemaBuilder, sortColumnsList, sortColumnsSchemaList);
+
     tableSchemaBuilder.setSortColumns(Arrays.asList(sortColumnsSchemaList));
     String tableName;
     String dbName;
@@ -416,16 +412,56 @@ public class CarbonWriterBuilder {
     }
     TableSchema schema = tableSchemaBuilder.build();
     schema.setTableName(tableName);
-    CarbonTable table = CarbonTable.builder()
-        .tableName(schema.getTableName())
-        .databaseName(dbName)
-        .tablePath(path)
-        .tableSchema(schema)
-        .isTransactionalTable(isTransactionalTable)
-        .build();
+    CarbonTable table =
+        CarbonTable.builder().tableName(schema.getTableName()).databaseName(dbName).tablePath(path)
+            .tableSchema(schema).isTransactionalTable(isTransactionalTable).build();
     return table;
   }
 
+  private void buildTableSchema(Field[] fields, TableSchemaBuilder tableSchemaBuilder,
+      List<String> sortColumnsList, ColumnSchema[] sortColumnsSchemaList) {
+    for (Field field : fields) {
+      if (null != field) {
+        int isSortColumn = sortColumnsList.indexOf(field.getFieldName());
+        if (isSortColumn > -1) {
+          // unsupported types for ("array", "struct", "double", "float", "decimal")
+          if (field.getDataType() == DataTypes.DOUBLE || field.getDataType() == DataTypes.FLOAT
+              || DataTypes.isDecimal(field.getDataType()) || field.getDataType().isComplexType()) {
+            throw new RuntimeException(
+                " sort columns not supported for " + "array, struct, double, float, decimal ");
+          }
+        }
+
+        if (field.getChildren() != null && field.getChildren().size() > 0) {
+          if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
+            // Loop through the inner columns and for a StructData
+            DataType complexType =
+                DataTypes.createArrayType(field.getChildren().get(0).getDataType());
+            tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), complexType), false);
+          } else if (field.getDataType().getName().equalsIgnoreCase("STRUCT")) {
+            // Loop through the inner columns and for a StructData
+            List<StructField> structFieldsArray =
+                new ArrayList<StructField>(field.getChildren().size());
+            for (StructField childFld : field.getChildren()) {
+              structFieldsArray
+                  .add(new StructField(childFld.getFieldName(), childFld.getDataType()));
+            }
+            DataType complexType = DataTypes.createStructType(structFieldsArray);
+            tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), complexType), false);
+          }
+        } else {
+          ColumnSchema columnSchema = tableSchemaBuilder
+              .addColumn(new StructField(field.getFieldName(), field.getDataType()),
+                  isSortColumn > -1);
+          columnSchema.setSortColumn(true);
+          if (isSortColumn > -1) {
+            sortColumnsSchemaList[isSortColumn] = columnSchema;
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Save the schema of the {@param table} to {@param persistFilePath}
    * @param table table object containing schema
@@ -465,6 +501,8 @@ public class CarbonWriterBuilder {
       options = new HashMap<>();
     }
     CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
-    return builder.build(options, UUID, taskNo);
+    CarbonLoadModel build = builder.build(options, UUID, taskNo);
+    setCsvHeader(build);
+    return build;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
index 677047b..0db3bc5 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
@@ -104,7 +104,7 @@ public class Field {
     } else if (type.equalsIgnoreCase("double")) {
       this.type = DataTypes.DOUBLE;
     } else if (type.equalsIgnoreCase("array")) {
-      this.type = DataTypes.createStructType(fields);
+      this.type = DataTypes.createArrayType(fields.get(0).getDataType());
     } else if (type.equalsIgnoreCase("struct")) {
       this.type = DataTypes.createStructType(fields);
     }
@@ -113,6 +113,8 @@ public class Field {
     }
   }
 
+
+
   public Field(String name, DataType type, List<StructField> fields) {
     this.name = name;
     this.type = type;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b1c85fa5/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index ed3f2f1..105fb6d 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -20,10 +20,13 @@ package org.apache.carbondata.sdk.file;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.ArrayType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.metadata.datatype.StructType;
@@ -37,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.junit.Assert;
 import org.junit.Test;
 
+import scala.Array;
 import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
 import org.apache.avro.Schema;
 
@@ -285,4 +289,214 @@ public class AvroCarbonWriterTest {
     FileUtils.deleteDirectory(new File(path));
   }
 
+
+  @Test
+  public void testWriteNestedRecordWithMeasure() throws IOException {
+    FileUtils.deleteDirectory(new File(path));
+
+    String mySchema =
+        "{" +
+            "  \"name\": \"address\", " +
+            "   \"type\": \"record\", " +
+            "    \"fields\": [  " +
+            "  { \"name\": \"name\", \"type\": \"string\"}, " +
+            "  { \"name\": \"age\", \"type\": \"int\"}, " +
+            "  { " +
+            "    \"name\": \"address\", " +
+            "      \"type\": { " +
+            "    \"type\" : \"record\", " +
+            "        \"name\" : \"my_address\", " +
+            "        \"fields\" : [ " +
+            "    {\"name\": \"street\", \"type\": \"string\"}, " +
+            "    {\"name\": \"city\", \"type\": \"string\"} " +
+            "  ]} " +
+            "  } " +
+            "] " +
+            "}";
+
+    String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}}";
+
+
+    // conversion to GenericData.Record
+    Schema nn = new Schema.Parser().parse(mySchema);
+    JsonAvroConverter converter = new JsonAvroConverter();
+    GenericData.Record record = converter.convertToGenericDataRecord(
+        json.getBytes(CharEncoding.UTF_8), nn);
+
+    Field[] fields = new Field[3];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("name1", DataTypes.STRING);
+    // fields[1] = new Field("age", DataTypes.INT);
+    List fld = new ArrayList<StructField>();
+    fld.add(new StructField("street", DataTypes.STRING));
+    fld.add(new StructField("city", DataTypes.STRING));
+    fields[2] = new Field("address", "struct", fld);
+
+    try {
+      CarbonWriter writer = CarbonWriter.builder()
+          .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
+          .outputPath(path)
+          .isTransactionalTable(true)
+          .buildWriterForAvroInput();
+
+      for (int i = 0; i < 100; i++) {
+        writer.write(record);
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertEquals(1, dataFiles.length);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+
+  private void WriteAvroComplexData(String mySchema, String json, String[] sortColumns)
+      throws UnsupportedEncodingException, IOException, InvalidLoadOptionException {
+    Field[] fields = new Field[4];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("name1", DataTypes.STRING);
+    // fields[1] = new Field("age", DataTypes.INT);
+    List fld = new ArrayList<StructField>();
+    fld.add(new StructField("street", DataTypes.STRING));
+    fld.add(new StructField("city", DataTypes.STRING));
+    fields[2] = new Field("address", "struct", fld);
+    List fld1 = new ArrayList<StructField>();
+    fld1.add(new StructField("eachDoorNum", DataTypes.INT));
+    fields[3] = new Field("doorNum","array",fld1);
+
+    // conversion to GenericData.Record
+    Schema nn = new Schema.Parser().parse(mySchema);
+    JsonAvroConverter converter = new JsonAvroConverter();
+    GenericData.Record record = converter.convertToGenericDataRecord(
+        json.getBytes(CharEncoding.UTF_8), nn);
+
+    try {
+      CarbonWriter writer = CarbonWriter.builder()
+          .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
+          .outputPath(path)
+          .isTransactionalTable(true).sortBy(sortColumns)
+          .buildWriterForAvroInput();
+
+      for (int i = 0; i < 100; i++) {
+        writer.write(record);
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+
+  @Test
+  public void testWriteComplexRecord() throws IOException, InvalidLoadOptionException {
+    FileUtils.deleteDirectory(new File(path));
+
+    String mySchema =
+        "{" +
+            "  \"name\": \"address\", " +
+            "   \"type\": \"record\", " +
+            "    \"fields\": [  " +
+            "  { \"name\": \"name\", \"type\": \"string\"}, " +
+            "  { \"name\": \"age\", \"type\": \"int\"}, " +
+            "  { " +
+            "    \"name\": \"address\", " +
+            "      \"type\": { " +
+            "    \"type\" : \"record\", " +
+            "        \"name\" : \"my_address\", " +
+            "        \"fields\" : [ " +
+            "    {\"name\": \"street\", \"type\": \"string\"}, " +
+            "    {\"name\": \"city\", \"type\": \"string\"} " +
+            "  ]} " +
+            "  }, " +
+            "  {\"name\" :\"doorNum\", " +
+            "   \"type\" : { " +
+            "   \"type\" :\"array\", " +
+            "   \"items\":{ " +
+            "   \"name\" :\"EachdoorNums\", " +
+            "   \"type\" : \"int\", " +
+            "   \"default\":-1} " +
+            "              } " +
+            "  }] " +
+            "}";
+
+    String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}, "
+        + "   \"doorNum\" : [1,2,3,4]}";
+
+    WriteAvroComplexData(mySchema, json, null);
+
+    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertEquals(1, dataFiles.length);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+
+  @Test
+  public void testWriteComplexRecordWithSortColumns() throws IOException {
+    FileUtils.deleteDirectory(new File(path));
+
+    String mySchema =
+        "{" +
+            "  \"name\": \"address\", " +
+            "   \"type\": \"record\", " +
+            "    \"fields\": [  " +
+            "  { \"name\": \"name\", \"type\": \"string\"}, " +
+            "  { \"name\": \"age\", \"type\": \"int\"}, " +
+            "  { " +
+            "    \"name\": \"address\", " +
+            "      \"type\": { " +
+            "    \"type\" : \"record\", " +
+            "        \"name\" : \"my_address\", " +
+            "        \"fields\" : [ " +
+            "    {\"name\": \"street\", \"type\": \"string\"}, " +
+            "    {\"name\": \"city\", \"type\": \"string\"} " +
+            "  ]} " +
+            "  }, " +
+            "  {\"name\" :\"doorNum\", " +
+            "   \"type\" : { " +
+            "   \"type\" :\"array\", " +
+            "   \"items\":{ " +
+            "   \"name\" :\"EachdoorNums\", " +
+            "   \"type\" : \"int\", " +
+            "   \"default\":-1} " +
+            "              } " +
+            "  }] " +
+            "}";
+
+    String json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", \"city\":\"bang\"}, "
+        + "   \"doorNum\" : [1,2,3,4]}";
+
+    try {
+      WriteAvroComplexData(mySchema, json, new String[] { "doorNum" });
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertTrue(true);
+    }
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+
+
 }