You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/05/10 22:02:28 UTC

carbondata git commit: [CARBONDATA-2452] [CARBONDATA-2451] [CARBONDATA-2450] [CARBONDATA-2453] Fixed issues related to complex types

Repository: carbondata
Updated Branches:
  refs/heads/master 8e7fceb71 -> 6b70b7e47


[CARBONDATA-2452] [CARBONDATA-2451] [CARBONDATA-2450] [CARBONDATA-2453] Fixed issues related to complex types

Issue 1: Dictionary encoding was being added to complex types in SDK
case which led to data load failure
Issue 2: Sort columns were not being checked against table schema to
validate the same.
Issue 3: Bad record handling was not there for complex types.
Issue 4: Parent name was not being prepended to field name before
checking for duplicates which threw duplicate column exception

This closes #2278


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6b70b7e4
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6b70b7e4
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6b70b7e4

Branch: refs/heads/master
Commit: 6b70b7e47b05a612ccb5a5ad01ee2d5a05ffa600
Parents: 8e7fceb
Author: kunal642 <ku...@gmail.com>
Authored: Mon May 7 20:58:21 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Fri May 11 03:27:36 2018 +0530

----------------------------------------------------------------------
 .../schema/table/TableSchemaBuilder.java        |  21 +-
 .../complexType/TestComplexTypeQuery.scala      |   2 +
 .../TestNonTransactionalCarbonTable.scala       | 410 +++++++++++++++++--
 .../processing/datatypes/ArrayDataType.java     |  11 +-
 .../processing/datatypes/GenericDataType.java   |   3 +-
 .../processing/datatypes/PrimitiveDataType.java |  41 +-
 .../processing/datatypes/StructDataType.java    |  11 +-
 .../loading/DataLoadProcessBuilder.java         |   9 +
 .../impl/ComplexFieldConverterImpl.java         |   2 +-
 .../DirectDictionaryFieldConverterImpl.java     |   1 -
 .../loading/model/CarbonLoadModelBuilder.java   |  15 +-
 .../InputProcessorStepWithNoConverterImpl.java  |  32 +-
 .../sdk/file/CarbonWriterBuilder.java           |  24 +-
 13 files changed, 524 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index b078400..03d03f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -122,7 +122,13 @@ public class TableSchemaBuilder {
   private ColumnSchema addColumn(StructField field, String parentName, AtomicInteger valIndex,
       boolean isSortColumn, boolean isComplexChild) {
     Objects.requireNonNull(field);
-    checkRepeatColumnName(field);
+    if (isComplexChild) {
+      // if field is complex then append parent name to the child field to check
+      // if any other field with same name exists
+      checkRepeatColumnName(field, parentName);
+    } else {
+      checkRepeatColumnName(field);
+    }
     ColumnSchema newColumn = new ColumnSchema();
     if (parentName != null) {
       newColumn.setColumnName(parentName + "." + field.getFieldName());
@@ -156,7 +162,7 @@ public class TableSchemaBuilder {
     // SO, this will not have any impact.
     newColumn.setColumnUniqueId(field.getFieldName());
     newColumn.setColumnReferenceId(newColumn.getColumnUniqueId());
-    newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn));
+    newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn, isComplexChild));
     if (field.getDataType().isComplexType()) {
       if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
         newColumn.setNumberOfChild(1);
@@ -209,6 +215,12 @@ public class TableSchemaBuilder {
   /**
    * Throw exception if {@param field} name is repeated
    */
+  private void checkRepeatColumnName(StructField field, String parentName) {
+    checkRepeatColumnName(
+        new StructField(parentName + "." + field.getFieldName(), field.getDataType(),
+            field.getChildren()));
+  }
+
   private void checkRepeatColumnName(StructField field) {
     for (ColumnSchema column : sortColumns) {
       if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) {
@@ -234,9 +246,10 @@ public class TableSchemaBuilder {
     }
   }
 
-  private List<Encoding> createEncoding(DataType dataType, boolean isSortColumn) {
+  private List<Encoding> createEncoding(DataType dataType, boolean isSortColumn,
+      boolean isComplexChild) {
     List<Encoding> encodings = new LinkedList<>();
-    if (dataType == DataTypes.TIMESTAMP || dataType == DataTypes.DATE) {
+    if (dataType == DataTypes.DATE && !isComplexChild) {
       encodings.add(Encoding.DIRECT_DICTIONARY);
       encodings.add(Encoding.DICTIONARY);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
index bc44df0..6728cdf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala
@@ -38,6 +38,8 @@ class TestComplexTypeQuery extends QueryTest with BeforeAndAfterAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
     CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+      .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        "force")
     sql("drop table if exists complexcarbontable")
     sql("drop table if exists complexhivetable")
     sql("drop table if exists complex_filter")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 3c51efe..376501b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.spark.testsuite.createTable
 
 import java.sql.Timestamp
 import java.io.{File, FileFilter, IOException}
+import java.io.{File, FileFilter}
 import java.util
 
 import org.apache.commons.io.FileUtils
@@ -33,6 +34,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.sdk.file.AvroCarbonWriter
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
@@ -40,7 +42,7 @@ import org.apache.avro
 import org.apache.commons.lang.CharEncoding
 import tech.allegro.schema.json2avro.converter.JsonAvroConverter
 
-import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
 import org.apache.carbondata.sdk.file.{CarbonWriter, CarbonWriterBuilder, Field, Schema}
 
 
@@ -219,6 +221,9 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
   }
 
   override def beforeAll(): Unit = {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
     sql("DROP TABLE IF EXISTS sdkOutputTable")
   }
 
@@ -247,8 +252,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
-  test("test create External Table with Schema with partition, should ignore schema and partition")
-  {
+  test("test create External Table with Schema with partition, should ignore schema and partition") {
     buildTestDataSingleFile()
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -270,8 +274,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
   }
 
 
-  test("test create External Table with insert into feature")
-  {
+  test("test create External Table with insert into feature") {
     buildTestDataSingleFile()
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -302,8 +305,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
-  test("test create External Table with insert overwrite")
-  {
+  test("test create External Table with insert overwrite") {
     buildTestDataSingleFile()
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -341,8 +343,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
   }
 
 
-  test("test create External Table with Load")
-  {
+  test("test create External Table with Load") {
     buildTestDataSingleFile()
     assert(new File(writerPath).exists())
     sql("DROP TABLE IF EXISTS sdkOutputTable")
@@ -563,6 +564,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
+
   test("Read sdk writer output file without any file should fail") {
     buildTestDataSingleFile()
     deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
@@ -748,7 +750,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
   }
 
-
   test("Read sdk two writer output with same column name but different sort columns") {
     FileUtils.deleteDirectory(new File(writerPath))
     buildTestDataOtherDataType(3, Array[String]("name"))
@@ -814,7 +815,10 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
-  private def WriteFilesWithAvroWriter(rows: Int, mySchema: String, json: String): Unit = {
+  private def WriteFilesWithAvroWriter(rows: Int,
+      mySchema: String,
+      json: String,
+      fields: Array[Field]) = {
     // conversion to GenericData.Record
     val nn = new avro.Schema.Parser().parse(mySchema)
     val converter = new JsonAvroConverter
@@ -822,8 +826,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       .convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn)
 
     try {
-      val writer = CarbonWriter.builder
-        .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema))
+      val writer = CarbonWriter.builder.withSchema(new Schema(fields))
         .outputPath(writerPath).isTransactionalTable(false)
         .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput
       var i = 0
@@ -860,7 +863,16 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       """.stripMargin
 
     val json = """ {"name":"bob", "age":10, "address" : {"street":"abc", "city":"bang"}} """
-    WriteFilesWithAvroWriter(rows, mySchema, json)
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("street", DataTypes.STRING))
+    fld.add(new StructField("city", DataTypes.STRING))
+    fields(2) = new Field("address", "struct", fld)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
   }
 
   def buildAvroTestDataStructType(): Any = {
@@ -899,7 +911,17 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
                    """.stripMargin
 
     val json: String = """ {"name": "bob","age": 10,"address": ["abc", "defg"]} """
-    WriteFilesWithAvroWriter(rows, mySchema, json)
+
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+    // fields[1] = new Field("age", DataTypes.INT);
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("street", DataTypes.STRING))
+    fields(2) = new Field("address", "array", fld)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
   }
 
   def buildAvroTestDataSingleFileArrayType(): Any = {
@@ -943,7 +965,18 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       """ {"name":"bob", "age":10,
         |"address" : {"street":"abc", "city":"bang"},
         |"doorNum" : [1,2,3,4]}""".stripMargin
-    WriteFilesWithAvroWriter(rows, mySchema, json)
+
+    val fields = new Array[Field](4)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("street", DataTypes.STRING))
+    fld.add(new StructField("city", DataTypes.STRING))
+    fields(2) = new Field("address", "struct", fld)
+    val fld1 = new util.ArrayList[StructField]
+    fld1.add(new StructField("eachDoorNum", DataTypes.INT))
+    fields(3) = new Field("doorNum", "array", fld1)
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
   }
 
   def buildAvroTestDataBothStructArrayType(): Any = {
@@ -951,6 +984,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     buildAvroTestDataStructWithArrayType(3, null)
   }
 
+
   // ArrayOfStruct test
   def buildAvroTestDataArrayOfStruct(rows: Int, options: util.Map[String, String]): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
@@ -995,7 +1029,20 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |{"street":"def","city":"city2"},
         |{"street":"ghi","city":"city3"},
         |{"street":"jkl","city":"city4"}]} """.stripMargin
-    WriteFilesWithAvroWriter(rows, mySchema, json)
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("street", DataTypes.STRING))
+    fld.add(new StructField("city", DataTypes.STRING))
+
+    val fld2 = new util.ArrayList[StructField]
+    fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
+    fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
   }
 
   def buildAvroTestDataArrayOfStructType(): Any = {
@@ -1003,6 +1050,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     buildAvroTestDataArrayOfStruct(3, null)
   }
 
+
   // StructOfArray test
   def buildAvroTestDataStructOfArray(rows: Int, options: util.Map[String, String]): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
@@ -1064,7 +1112,21 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
                  |		]
                  |	}
                  |} """.stripMargin
-    WriteFilesWithAvroWriter(rows, mySchema, json)
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+
+    val fld2 = new util.ArrayList[StructField]
+    fld2.add(new StructField("street", DataTypes.STRING))
+    fld2.add(new StructField("city", DataTypes.STRING))
+
+    val fld1 = new util.ArrayList[StructField]
+    fld1.add(new StructField("eachDoorNum", DataTypes.INT))
+    fld2.add(new StructField("doorNum", DataTypes.createArrayType(DataTypes.INT), fld1))
+
+    fields(2) = new Field("address","struct",fld2)
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
   }
 
   def buildAvroTestDataStructOfArrayType(): Any = {
@@ -1072,6 +1134,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     buildAvroTestDataStructOfArray(3, null)
   }
 
+
   test("Read sdk writer Avro output Record Type") {
     buildAvroTestDataStructType()
     assert(new File(writerPath).exists())
@@ -1080,6 +1143,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
          |'$writerPath' """.stripMargin)
 
+
     checkAnswer(sql("select * from sdkOutputTable"), Seq(
       Row("bob", 10, Row("abc","bang")),
       Row("bob", 10, Row("abc","bang")),
@@ -1138,6 +1202,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
+
   test("Read sdk writer Avro output with Array of struct") {
     buildAvroTestDataArrayOfStructType()
     assert(new File(writerPath).exists())
@@ -1163,6 +1228,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
+
   // Struct of array
   test("Read sdk writer Avro output with struct of Array") {
     buildAvroTestDataStructOfArrayType()
@@ -1264,7 +1330,21 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |		}
         |	]
         |} """.stripMargin
-    WriteFilesWithAvroWriter(rows, mySchema, json)
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("street", DataTypes.STRING))
+    fld.add(new StructField("city", DataTypes.STRING))
+    fld.add(new StructField("FloorNum", DataTypes.createArrayType(DataTypes.INT)))
+
+    val fld2 = new util.ArrayList[StructField]
+    fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
+    fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
   }
 
   def buildAvroTestDataMultiLevel3Type(): Any = {
@@ -1302,6 +1382,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
+
   // test multi level -- 3 levels [array of struct of struct of string, int]
   def buildAvroTestDataMultiLevel3_1(rows: Int, options: util.Map[String, String]): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
@@ -1381,7 +1462,26 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |		}
         |	]
         |}  """.stripMargin
-    WriteFilesWithAvroWriter(rows, mySchema, json)
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("street", DataTypes.STRING))
+    fld.add(new StructField("city", DataTypes.STRING))
+
+    val subFld = new util.ArrayList[StructField]
+    subFld.add(new StructField("wing", DataTypes.STRING))
+    subFld.add(new StructField("number", DataTypes.INT))
+    fld.add(new StructField("FloorNum", DataTypes.createStructType(subFld)))
+
+    // array of struct of struct
+    val fld2 = new util.ArrayList[StructField]
+    fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
+    fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
   }
 
   def buildAvroTestDataMultiLevel3_1Type(): Any = {
@@ -1461,7 +1561,22 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |        	"BuildNum": [[[1,2,3],[4,5,6]],[[10,20,30],[40,50,60]]]
         |        }   """.stripMargin
 
-    WriteFilesWithAvroWriter(rows, mySchema, json)
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+
+    val subFld = new util.ArrayList[StructField]
+    subFld.add(new StructField("EachDoorNum", DataTypes.INT))
+
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("DoorNum", DataTypes.createArrayType(DataTypes.INT), subFld))
+    // array of struct of struct
+    val doorNum = new util.ArrayList[StructField]
+    doorNum.add(new StructField("FloorNum",
+      DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.INT)), fld))
+    fields(2) = new Field("BuildNum", "array", doorNum)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
   }
 
   def buildAvroTestDataMultiLevel3_2Type(): Any = {
@@ -1500,6 +1615,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
+
+
   // test multi level -- 4 levels [array of array of array of struct]
   def buildAvroTestDataMultiLevel4(rows: Int, options: util.Map[String, String]): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
@@ -1578,7 +1695,30 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |		]
         |	]
         |} """.stripMargin
-    WriteFilesWithAvroWriter(rows, mySchema, json)
+
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+
+    val subFld = new util.ArrayList[StructField]
+    subFld.add(new StructField("EachDoorNum", DataTypes.INT))
+
+    val address = new util.ArrayList[StructField]
+    address.add(new StructField("street", DataTypes.STRING))
+    address.add(new StructField("city", DataTypes.STRING))
+
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("DoorNum",
+        DataTypes.createArrayType(DataTypes.createStructType(address)),
+        subFld))
+    // array of struct of struct
+    val doorNum = new util.ArrayList[StructField]
+    doorNum.add(new StructField("FloorNum",
+      DataTypes.createArrayType(
+        DataTypes.createArrayType(DataTypes.createStructType(address))), fld))
+    fields(2) = new Field("BuildNum", "array", doorNum)
+
+    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
   }
 
   def buildAvroTestDataMultiLevel4Type(): Any = {
@@ -1604,4 +1744,228 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
-}
\ No newline at end of file
+  test(
+    "test if exception is thrown when a column which is not in schema is specified in sort columns")
+  {
+    val schema1 =
+      """{
+        |	"namespace": "com.apache.schema",
+        |	"type": "record",
+        |	"name": "StudentActivity",
+        |	"fields": [
+        |		{
+        |			"name": "id",
+        |			"type": "int"
+        |		},
+        |		{
+        |			"name": "course_details",
+        |			"type": {
+        |				"name": "course_details",
+        |				"type": "record",
+        |				"fields": [
+        |					{
+        |						"name": "course_struct_course_time",
+        |						"type": "string"
+        |					}
+        |				]
+        |			}
+        |		}
+        |	]
+        |}""".stripMargin
+
+    val json1 =
+      """{"id": 101,"course_details": { "course_struct_course_time":"2014-01-05"  }}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val converter = new JsonAvroConverter
+    val record = converter
+      .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+    val fields = new Array[Field](2)
+    fields(0) = new Field("id", DataTypes.INT)
+    val fld_s = new java.util.ArrayList[StructField]
+    fld_s.add(new StructField("course_struct_course_time", DataTypes.STRING))
+    fields(1) = new Field("course_details", "struct", fld_s)
+
+    assert(intercept[RuntimeException] {
+      val writer = CarbonWriter.builder.withSchema(new Schema(fields)).sortBy(Array("name", "id"))
+        .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+      writer.write(record)
+      writer.close()
+    }.getMessage.toLowerCase.contains("column: name specified in sort columns"))
+  }
+
+  test("test if data load is success with a struct having timestamp column  ") {
+    val schema1 =
+      """{
+        |	"namespace": "com.apache.schema",
+        |	"type": "record",
+        |	"name": "StudentActivity",
+        |	"fields": [
+        |		{
+        |			"name": "id",
+        |			"type": "int"
+        |		},
+        |		{
+        |			"name": "course_details",
+        |			"type": {
+        |				"name": "course_details",
+        |				"type": "record",
+        |				"fields": [
+        |					{
+        |						"name": "course_struct_course_time",
+        |						"type": "string"
+        |					}
+        |				]
+        |			}
+        |		}
+        |	]
+        |}""".stripMargin
+
+    val json1 =
+      """{"id": 101,"course_details": { "course_struct_course_time":"2014-01-05 00:00:00"  }}""".stripMargin
+    val nn = new org.apache.avro.Schema.Parser().parse(schema1)
+    val converter = new JsonAvroConverter
+    val record = converter
+      .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+    val fields = new Array[Field](2)
+    fields(0) = new Field("id", DataTypes.INT)
+    val fld_s = new java.util.ArrayList[StructField]
+    fld_s.add(new StructField("course_struct_course_time", DataTypes.TIMESTAMP))
+    fields(1) = new Field("course_details", "struct", fld_s)
+
+    val writer = CarbonWriter.builder.withSchema(new Schema(fields)).sortBy(Array("id"))
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+    writer.write(record)
+    writer.close()
+  }
+
+  test(
+    "test is dataload is successful if childcolumn has same name as one of the other fields(not " +
+    "complex)")
+  {
+    val schema =
+      """{
+        |	"type": "record",
+        |	"name": "Order",
+        |	"namespace": "com.apache.schema",
+        |	"fields": [
+        |		{
+        |			"name": "id",
+        |			"type": "long"
+        |		},
+        |		{
+        |			"name": "entries",
+        |			"type": {
+        |				"type": "array",
+        |				"items": {
+        |					"type": "record",
+        |					"name": "Entry",
+        |					"fields": [
+        |						{
+        |							"name": "id",
+        |							"type": "long"
+        |						}
+        |					]
+        |				}
+        |			}
+        |		}
+        |	]
+        |}""".stripMargin
+    val json1 =
+      """{"id": 101, "entries": [ {"id":1234}, {"id":3212}  ]}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema)
+    val converter = new JsonAvroConverter
+    val record = converter
+      .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+    val fields = new Array[Field](2)
+    fields(0) = new Field("id", DataTypes.LONG)
+    val fld_s = new java.util.ArrayList[StructField]
+    fld_s.add(new StructField("id", DataTypes.LONG))
+    fields(1) = new Field("entries", DataTypes.createArrayType(DataTypes.createStructType(fld_s)))
+    val writer = CarbonWriter.builder.withSchema(new Schema(fields))
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+    writer.write(record)
+    writer.close()
+  }
+
+  test("test if data load with various bad_records_action") {
+    val schema =
+      """{
+        |	"namespace": "com.apache.schema",
+        |	"type": "record",
+        |	"name": "StudentActivity",
+        |	"fields": [
+        |		{
+        |			"name": "id",
+        |			"type": "string"
+        |		},
+        |		{
+        |			"name": "course_details",
+        |			"type": {
+        |				"name": "course_details",
+        |				"type": "record",
+        |				"fields": [
+        |					{
+        |						"name": "course_struct_course_string",
+        |						"type": "string"
+        |					}
+        |				]
+        |			}
+        |		},
+        |		{
+        |			"name": "salary_string",
+        |			"type": {
+        |				"type": "array",
+        |				"items": "string"
+        |			}
+        |		}
+        |	]
+        |}""".stripMargin
+    val json1 =
+      """{
+        |	"id": "cust_1",
+        |	"course_details": {
+        |		"course_struct_course_string": "asd"
+        |	},
+        |	"salary_string": [
+        |		"xyz",
+        |		"abc"
+        |	]
+        |}""".stripMargin
+
+    val nn = new org.apache.avro.Schema.Parser().parse(schema)
+    val converter = new JsonAvroConverter
+    val record = converter
+      .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
+
+    val fields = new Array[Field](3)
+    fields(0)=new Field("id", DataTypes.STRING)
+    val fld_s = new java.util.ArrayList[StructField]
+    fld_s.add(new StructField("carbon_int", DataTypes.INT))
+    fields(1)=new Field("course_details", "struct",fld_s)
+
+    val fld_a = new java.util.ArrayList[StructField]
+    fld_a.add(new StructField("carbon_array", DataTypes.INT))
+    fields(2)=new Field("salary_string", "array",fld_a)
+
+    val loadOptions = new util.HashMap[String, String]()
+    loadOptions.put("bad_records_action", "fail")
+    assert(intercept[Exception] {
+      val writer = CarbonWriter.builder.withSchema(new Schema(fields)).outputPath(writerPath)
+        .isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForAvroInput
+      writer.write(record)
+      writer.close()
+    }.getMessage.contains("Data load failed due to bad record"))
+
+    loadOptions.put("bad_records_action", "FORCE")
+      val writer = CarbonWriter.builder.withSchema(new Schema(fields)).outputPath(writerPath)
+        .isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForAvroInput
+      writer.write(record)
+      writer.close()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index d7d59ce..cc2619e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
-
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 
 /**
  * Array DataType stateless object used in data loading
@@ -151,17 +151,16 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
     return true;
   }
 
-  @Override
-  public void writeByteArray(ArrayObject input, DataOutputStream dataOutputStream)
-      throws IOException, DictionaryGenerationException {
+  @Override public void writeByteArray(ArrayObject input, DataOutputStream dataOutputStream,
+      BadRecordLogHolder logHolder) throws IOException, DictionaryGenerationException {
     if (input == null) {
       dataOutputStream.writeInt(1);
-      children.writeByteArray(null, dataOutputStream);
+      children.writeByteArray(null, dataOutputStream, logHolder);
     } else {
       Object[] data = input.getData();
       dataOutputStream.writeInt(data.length);
       for (Object eachInput : data) {
-        children.writeByteArray(eachInput, dataOutputStream);
+        children.writeByteArray(eachInput, dataOutputStream, logHolder);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index 21ad83d..f48a91d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 
 /**
  * Generic DataType interface which will be used while data loading for complex types like Array &
@@ -58,7 +59,7 @@ public interface GenericDataType<T> {
    * @param dataOutputStream
    * @throws IOException
    */
-  void writeByteArray(T input, DataOutputStream dataOutputStream)
+  void writeByteArray(T input, DataOutputStream dataOutputStream, BadRecordLogHolder logHolder)
       throws IOException, DictionaryGenerationException;
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index e34c184..fdfc3bb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -48,10 +48,12 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.loading.dictionary.DictionaryServerClientDictionary;
 import org.apache.carbondata.processing.loading.dictionary.DirectDictionary;
 import org.apache.carbondata.processing.loading.dictionary.PreCreatedDictionary;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 /**
  * Primitive DataType stateless object used in data loading
@@ -265,19 +267,29 @@ public class PrimitiveDataType implements GenericDataType<Object> {
     return isDictionary;
   }
 
-  @Override public void writeByteArray(Object input, DataOutputStream dataOutputStream)
-      throws IOException, DictionaryGenerationException {
-
+  @Override public void writeByteArray(Object input, DataOutputStream dataOutputStream,
+      BadRecordLogHolder logHolder) throws IOException, DictionaryGenerationException {
     String parsedValue =
         input == null ? null : DataTypeUtil.parseValue(input.toString(), carbonDimension);
+    String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
     if (this.isDictionary) {
       Integer surrogateKey;
       if (null == parsedValue) {
         surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+        if (null == message) {
+          message = CarbonDataProcessorUtil
+              .prepareFailureReason(carbonDimension.getColName(), carbonDimension.getDataType());
+          logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message);
+          logHolder.setReason(message);
+        }
       } else {
         surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
         if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
           surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+          message = CarbonDataProcessorUtil
+              .prepareFailureReason(carbonDimension.getColName(), carbonDimension.getDataType());
+          logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message);
+          logHolder.setReason(message);
         }
       }
       dataOutputStream.writeInt(surrogateKey);
@@ -285,15 +297,15 @@ public class PrimitiveDataType implements GenericDataType<Object> {
       // Transform into ByteArray for No Dictionary.
       // TODO have to refactor and place all the cases present in NonDictionaryFieldConverterImpl
       if (null == parsedValue && this.carbonDimension.getDataType() != DataTypes.STRING) {
-        updateNullValue(dataOutputStream);
+        updateNullValue(dataOutputStream, logHolder);
       } else if (null == parsedValue || parsedValue.equals(nullformat)) {
-        updateNullValue(dataOutputStream);
+        updateNullValue(dataOutputStream, logHolder);
       } else {
         String dateFormat = null;
         if (this.carbonDimension.getDataType() == DataTypes.DATE) {
-          dateFormat = this.carbonDimension.getDateFormat();
+          dateFormat = carbonDimension.getDateFormat();
         } else if (this.carbonDimension.getDataType() == DataTypes.TIMESTAMP) {
-          dateFormat = this.carbonDimension.getTimestampFormat();
+          dateFormat = carbonDimension.getTimestampFormat();
         }
 
         try {
@@ -318,9 +330,12 @@ public class PrimitiveDataType implements GenericDataType<Object> {
               updateValueToByteStream(dataOutputStream,
                   parsedValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
             } else {
-              updateNullValue(dataOutputStream);
+              updateNullValue(dataOutputStream, logHolder);
             }
           }
+        } catch (NumberFormatException e) {
+          // Update logHolder for bad record and put null in dataOutputStream.
+          updateNullValue(dataOutputStream, logHolder);
         } catch (CarbonDataLoadingException e) {
           throw e;
         } catch (Throwable ex) {
@@ -338,7 +353,8 @@ public class PrimitiveDataType implements GenericDataType<Object> {
     dataOutputStream.write(value);
   }
 
-  private void updateNullValue(DataOutputStream dataOutputStream) throws IOException {
+  private void updateNullValue(DataOutputStream dataOutputStream, BadRecordLogHolder logHolder)
+      throws IOException {
     if (this.carbonDimension.getDataType() == DataTypes.STRING) {
       dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
       dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
@@ -346,6 +362,13 @@ public class PrimitiveDataType implements GenericDataType<Object> {
       dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
       dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
     }
+    String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
+    if (null == message) {
+      message = CarbonDataProcessorUtil
+          .prepareFailureReason(carbonDimension.getColName(), carbonDimension.getDataType());
+      logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message);
+      logHolder.setReason(message);
+    }
   }
 
   @Override public void fillCardinality(List<Integer> dimCardWithComplex) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index 4fe6255..bb3da6c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.processing.loading.complexobjects.StructObject;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 
 /**
  * Struct DataType stateless object used in data loading
@@ -150,22 +151,22 @@ public class StructDataType implements GenericDataType<StructObject> {
     return true;
   }
 
-  @Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream)
-      throws IOException, DictionaryGenerationException {
+  @Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream,
+      BadRecordLogHolder logHolder) throws IOException, DictionaryGenerationException {
     dataOutputStream.writeInt(children.size());
     if (input == null) {
       for (int i = 0; i < children.size(); i++) {
-        children.get(i).writeByteArray(null, dataOutputStream);
+        children.get(i).writeByteArray(null, dataOutputStream, logHolder);
       }
     } else {
       Object[] data = input.getData();
       for (int i = 0; i < data.length && i < children.size(); i++) {
-        children.get(i).writeByteArray(data[i], dataOutputStream);
+        children.get(i).writeByteArray(data[i], dataOutputStream, logHolder);
       }
 
       // For other children elements which dont have data, write empty
       for (int i = data.length; i < children.size(); i++) {
-        children.get(i).writeByteArray(null, dataOutputStream);
+        children.get(i).writeByteArray(null, dataOutputStream, logHolder);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 2f904ed..17d0c76 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -237,6 +237,15 @@ public final class DataLoadProcessBuilder {
       }
       if (column.isComplex()) {
         complexDataFields.add(dataField);
+        List<CarbonDimension> childDimensions =
+            ((CarbonDimension) dataField.getColumn()).getListOfChildDimensions();
+        for (CarbonDimension childDimension : childDimensions) {
+          if (childDimension.getDataType() == DataTypes.DATE) {
+            childDimension.setDateFormat(loadModel.getDateFormat());
+          } else if (childDimension.getDataType() == DataTypes.TIMESTAMP) {
+            childDimension.setTimestampFormat(loadModel.getTimestampformat());
+          }
+        }
       } else {
         dataFields.add(dataField);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
index b26ef36..4e46f9f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
@@ -44,7 +44,7 @@ public class ComplexFieldConverterImpl extends AbstractDictionaryFieldConverterI
     ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
     try {
-      genericDataType.writeByteArray(object, dataOutputStream);
+      genericDataType.writeByteArray(object, dataOutputStream, logHolder);
       dataOutputStream.close();
       row.update(byteArray.toByteArray(), index);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
index b49cd90..64ddf27 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
@@ -54,7 +54,6 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
       this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
           .getDirectDictionaryGenerator(dataField.getColumn().getDataType(),
               dataField.getTimestampFormat());
-
     } else {
       this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
           .getDirectDictionaryGenerator(dataField.getColumn().getDataType());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 13dd75c..9a9d09e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -79,7 +79,20 @@ public class CarbonLoadModelBuilder {
 
     // we have provided 'fileheader', so it hadoopConf can be null
     build(options, optionsFinal, model, null);
-
+    String timestampFormat = options.get("timestampformat");
+    if (timestampFormat == null) {
+      timestampFormat = CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+              CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+    }
+    String dateFormat = options.get("dateFormat");
+    if (dateFormat == null) {
+      dateFormat = CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+              CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
+    }
+    model.setDateFormat(dateFormat);
+    model.setTimestampformat(timestampFormat);
     model.setUseOnePass(Boolean.parseBoolean(Maps.getOrDefault(options, "onepass", "false")));
     model.setDictionaryServerHost(Maps.getOrDefault(options, "dicthost", null));
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 77f5260..c99a413 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -35,11 +35,15 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.loading.converter.impl.FieldEncoderFactory;
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -134,7 +138,7 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
       outIterators[i] =
           new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(),
               rowCounter, orderOfData, noDictionaryMapping, dataTypes,
-              configuration.getDataFields(), dataFieldsWithComplexDataType);
+              configuration, dataFieldsWithComplexDataType);
     }
     return outIterators;
   }
@@ -207,11 +211,13 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
 
     private int[] orderOfData;
 
+    private CarbonDataLoadConfiguration configuration;
+
     private Map<Integer, GenericDataType> dataFieldsWithComplexDataType;
 
     public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize,
         boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping,
-        DataType[] dataTypes, DataField[] dataFields,
+        DataType[] dataTypes, CarbonDataLoadConfiguration configuration,
         Map<Integer, GenericDataType> dataFieldsWithComplexDataType) {
       this.inputIterators = inputIterators;
       this.batchSize = batchSize;
@@ -223,7 +229,8 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
       this.firstTime = true;
       this.noDictionaryMapping = noDictionaryMapping;
       this.dataTypes = dataTypes;
-      this.dataFields = dataFields;
+      this.dataFields = configuration.getDataFields();
+      this.configuration = configuration;
       this.orderOfData = orderOfData;
       this.dataFieldsWithComplexDataType = dataFieldsWithComplexDataType;
     }
@@ -272,6 +279,9 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
 
     private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) {
       Object[] newData = new Object[data.length];
+      BadRecordLogHolder logHolder = new BadRecordLogHolder();
+      BadRecordsLogger badRecordLogger =
+          BadRecordsLoggerProvider.createBadRecordLogger(configuration);
       for (int i = 0; i < data.length; i++) {
         if (i < noDictionaryMapping.length && noDictionaryMapping[i]) {
           newData[i] = DataTypeUtil
@@ -284,11 +294,21 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
             try {
               GenericDataType complextType =
                   dataFieldsWithComplexDataType.get(dataFields[i].getColumn().getOrdinal());
-
-              complextType.writeByteArray(data[orderOfData[i]], dataOutputStream);
-
+              complextType.writeByteArray(data[orderOfData[i]], dataOutputStream, logHolder);
+              if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
+                badRecordLogger.addBadRecordsToBuilder(data, logHolder.getReason());
+                if (badRecordLogger.isDataLoadFail()) {
+                  String error = "Data load failed due to bad record: " + logHolder.getReason();
+                  if (!badRecordLogger.isBadRecordLoggerEnable()) {
+                    error += "Please enable bad record logger to know the detail reason.";
+                  }
+                  throw new BadRecordFoundException(error);
+                }
+              }
               dataOutputStream.close();
               newData[i] = byteArray.toByteArray();
+            } catch (BadRecordFoundException e) {
+              throw new CarbonDataLoadingException("Loading Exception: " + e.getMessage(), e);
             } catch (Exception e) {
               throw new CarbonDataLoadingException("Loading Exception", e);
             }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index f541dbb..00ba8a5 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -431,6 +431,20 @@ public class CarbonWriterBuilder {
     // to child of complex array type in the order val1, val2 so that each array type child is
     // differentiated to any level
     AtomicInteger valIndex = new AtomicInteger(0);
+    // Check if any of the columns specified in sort columns are missing from schema.
+    for (String sortColumn: sortColumnsList) {
+      boolean exists = false;
+      for (Field field : fields) {
+        if (field.getFieldName().equalsIgnoreCase(sortColumn)) {
+          exists = true;
+          break;
+        }
+      }
+      if (!exists) {
+        throw new RuntimeException(
+            "column: " + sortColumn + " specified in sort columns does not exist in schema");
+      }
+    }
     for (Field field : fields) {
       if (null != field) {
         int isSortColumn = sortColumnsList.indexOf(field.getFieldName());
@@ -442,9 +456,9 @@ public class CarbonWriterBuilder {
                 " sort columns not supported for " + "array, struct, double, float, decimal ");
           }
         }
-
         if (field.getChildren() != null && field.getChildren().size() > 0) {
           if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
+            checkForUnsupportedDataTypes(field.getChildren().get(0).getDataType());
             // Loop through the inner columns and for a StructData
             DataType complexType =
                 DataTypes.createArrayType(field.getChildren().get(0).getDataType());
@@ -455,6 +469,7 @@ public class CarbonWriterBuilder {
             List<StructField> structFieldsArray =
                 new ArrayList<StructField>(field.getChildren().size());
             for (StructField childFld : field.getChildren()) {
+              checkForUnsupportedDataTypes(childFld.getDataType());
               structFieldsArray
                   .add(new StructField(childFld.getFieldName(), childFld.getDataType()));
             }
@@ -475,6 +490,13 @@ public class CarbonWriterBuilder {
     }
   }
 
+  private void checkForUnsupportedDataTypes(DataType dataType) {
+    if (dataType == DataTypes.DOUBLE || dataType == DataTypes.DATE || DataTypes
+        .isDecimal(dataType)) {
+      throw new RuntimeException("Unsupported data type: " + dataType.getName());
+    }
+  }
+
   /**
    * Save the schema of the {@param table} to {@param persistFilePath}
    * @param table table object containing schema