You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2021/10/21 15:10:42 UTC

[carbondata] branch master updated: [CARBONDATA-4298][CARBONDATA-4281] Empty bad record support for complex type

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 305851e  [CARBONDATA-4298][CARBONDATA-4281] Empty bad record support for complex type
305851e is described below

commit 305851ed75cf935c2a606071118dbe1347a18628
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Wed Sep 29 21:18:13 2021 +0530

    [CARBONDATA-4298][CARBONDATA-4281] Empty bad record support for complex type
    
    Why is this PR needed?
    1. IS_EMPTY_DATA_BAD_RECORD property not supported for complex types.
    2. To update documentation that COLUMN_META_CACHE and RANGE_COLUMN
       doesn't support complex datatype
    
    What changes were proposed in this PR?
    1. Made changes to pass down IS_EMPTY_DATA_BAD_RECORD property and
       throw exception. Store empty complex type instead of storing
       null value which matches with hive table result.
    2. Updated document and added testcase.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4228
---
 .../apache/carbondata/core/util/CarbonUtil.java    | 18 ++++++---
 docs/ddl-of-carbondata.md                          |  4 +-
 .../apache/carbondata/spark/util/CommonUtil.scala  |  4 +-
 .../test/resources/complextypeWithEmptyRecords.csv |  3 ++
 .../complexType/TestComplexDataType.scala          | 17 ++++++++
 .../badrecordloger/BadRecordEmptyDataTest.scala    | 46 ++++++++++++++++++++++
 .../carbondata/TestStreamingTableOpName.scala      | 20 +++++-----
 .../processing/datatypes/ArrayDataType.java        | 28 +++++++++----
 .../processing/datatypes/GenericDataType.java      |  2 +-
 .../processing/datatypes/PrimitiveDataType.java    | 19 ++++-----
 .../processing/datatypes/StructDataType.java       | 30 +++++++++-----
 .../converter/impl/ComplexFieldConverterImpl.java  |  8 ++--
 .../converter/impl/FieldEncoderFactory.java        |  3 +-
 .../loading/parser/impl/ArrayParserImpl.java       |  8 ++--
 .../loading/parser/impl/MapParserImpl.java         |  8 +++-
 .../loading/parser/impl/StructParserImpl.java      | 11 ++++--
 .../InputProcessorStepWithNoConverterImpl.java     |  9 +++--
 .../processing/util/CarbonBadRecordUtil.java       | 22 +++++++++++
 18 files changed, 196 insertions(+), 64 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b41a71b..0f71c36 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -82,6 +82,7 @@ import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.datatype.ArrayType;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypeAdapter;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -3538,13 +3539,18 @@ public final class CarbonUtil {
       }
       dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
     } else {
-      if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) {
-        dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
-      } else {
-        dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
-      }
-      dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
+      updateWithEmptyValueBasedOnDatatype(dataOutputStream, dataType);
+    }
+  }
+
+  public static void updateWithEmptyValueBasedOnDatatype(DataOutputStream dataOutputStream,
+      DataType dataType) throws IOException {
+    if (DataTypeUtil.isByteArrayComplexChildColumn(dataType) || dataType instanceof ArrayType) {
+      dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+    } else {
+      dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
     }
+    dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
   }
 
   /**
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index b482ae5..b37b3ab 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -318,7 +318,7 @@ CarbonData DDL statements are documented here,which includes:
 
    - ##### Caching Min/Max Value for Required Columns
 
-     By default, CarbonData caches min and max values of all the columns in schema.  As the load increases, the memory required to hold the min and max values increases considerably. This feature enables you to configure min and max values only for the required columns, resulting in optimized memory usage. This feature doesn't support binary data type.
+     By default, CarbonData caches min and max values of all the columns in schema.  As the load increases, the memory required to hold the min and max values increases considerably. This feature enables you to configure min and max values only for the required columns, resulting in optimized memory usage. This feature doesn't support binary and complex data type.
 
       Following are the valid values for COLUMN_META_CACHE:
       * If you want no column min/max values to be cached in the driver.
@@ -507,7 +507,7 @@ CarbonData DDL statements are documented here,which includes:
    - ##### Range Column
      This property is used to specify a column to partition the input data by range.
      Only one column can be configured. During data loading, you can use "global_sort_partitions" or "scale_factor" to avoid generating small files.
-     This feature doesn't support binary data type.
+     This feature doesn't support binary and complex data type.
 
      ```
      TBLPROPERTIES('RANGE_COLUMN'='col1')
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 19056da..5b3914b 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -961,7 +961,7 @@ object CommonUtil {
             .writeByteArray(result.asInstanceOf[ArrayObject],
               dataOutputStream,
               badRecordLogHolder,
-              true)
+              true, false)
           dataOutputStream.close()
           data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
         case structType: StructType =>
@@ -973,7 +973,7 @@ object CommonUtil {
             .writeByteArray(result.asInstanceOf[StructObject],
               dataOutputStream,
               badRecordLogHolder,
-              true)
+              true, false)
           dataOutputStream.close()
           data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
         case other =>
diff --git a/integration/spark/src/test/resources/complextypeWithEmptyRecords.csv b/integration/spark/src/test/resources/complextypeWithEmptyRecords.csv
new file mode 100644
index 0000000..994d6d0
--- /dev/null
+++ b/integration/spark/src/test/resources/complextypeWithEmptyRecords.csv
@@ -0,0 +1,3 @@
+1,109,4ROM size,Intel,29-11-2015,,MAC1:1,7:Chinese:Hubei Province:yichang:yichang:yichang$7:India:New Delhi:delhi:delhi:delhi,29-11-2015$29-11-2015:29-11-2015,109,2738.562,,
+1,109,4ROM size,Intel,29-11-2015,1AA1$2,,7:Chinese:Hubei Province:yichang:yichang:yichang$7:India:New Delhi:delhi:delhi:delhi,29-11-2015$29-11-2015:29-11-2015,109,2738.562,,
+1,109,4ROM size,Intel,29-11-2015,1AA1$2,MAC1:1,,29-11-2015$29-11-2015:29-11-2015,109,2738.562,,
\ No newline at end of file
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
index fd8fef7..bf3cf93 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
@@ -1177,6 +1177,23 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists hive_table")
   }
 
+  test("test COLUMN_META_CACHE and RANGE_COLUMN doesn't support complex datatype") {
+    sql("DROP TABLE IF EXISTS test")
+    var exception = intercept[Exception] {
+      sql("CREATE TABLE IF NOT EXISTS test " +
+          "(id INT,mlabel boolean,name STRING,arr1 array<array<int>>,autoLabel boolean)" +
+          " STORED AS carbondata TBLPROPERTIES('COLUMN_META_CACHE'='arr1')")
+    }
+    assert(exception.getMessage.contains("arr1 is a complex type column and complex type " +
+                                         "is not allowed for the option(s): column_meta_cache"))
+    exception = intercept[Exception] {
+      sql("CREATE TABLE IF NOT EXISTS test " +
+          "(id INT,label boolean,name STRING,map1 map<string, array<int>>,autoLabel boolean)" +
+          " STORED AS carbondata TBLPROPERTIES('RANGE_COLUMN'='map1')")
+    }
+    assert(exception.getMessage.contains("RANGE_COLUMN doesn't support map data type: map1"))
+  }
+
   test("test when insert select from a parquet table " +
        "with an struct with binary and custom complex delimiter") {
     var carbonProperties = CarbonProperties.getInstance()
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
index 42b9e07..0ec0fbd 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
@@ -101,6 +101,52 @@ class BadRecordEmptyDataTest extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  def loadEmptyComplexData(isEmptyBadRecord: Boolean, badRecordsAction: String): Unit = {
+    sql(s"LOAD DATA local inpath '" + resourcesPath +
+        "/complextypeWithEmptyRecords.csv' INTO table complexcarbontable OPTIONS('DELIMITER'=','," +
+        "'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId,ROMSize,ROMName," +
+        "purchasedate,file,MAC,locationinfo,proddate,gamePointId,contractNumber,st,ar', " +
+        "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':', " +
+        s"'bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='$isEmptyBadRecord' ," +
+        s"'bad_records_action'='$badRecordsAction')")
+  }
+
+  test("Test complex type with empty values and IS_EMPTY_DATA_BAD_RECORD property") {
+    sql("DROP TABLE IF EXISTS complexcarbontable")
+    sql("DROP TABLE IF EXISTS complexhivetable")
+    sql(
+      "create table complexcarbontable(deviceInformationId int, channelsId string, ROMSize " +
+      "string, ROMName String, purchasedate string, file struct<school:array<string>, age:int>," +
+      " MAC map<string, int>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
+      "ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>, " +
+      "proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+      "double,contractNumber double, st struct<school:struct<a:string,b:int>, age:int>," +
+      "ar array<array<string>>)  STORED AS carbondata")
+    val exception = intercept[Exception] ( loadEmptyComplexData(true, "fail"))
+    assert(exception.getMessage.contains(
+        "The value with column name file.age and column data type INT is not a valid INT type."))
+    loadEmptyComplexData(true, "ignore")
+    checkAnswer(sql("select count(*) from complexcarbontable"), Seq(Row(0)))
+    loadEmptyComplexData(false, "ignore")
+    sql(
+      "create table complexhivetable(deviceInformationId int, channelsId " +
+      "string, ROMSize string, ROMName String, purchasedate string, file " +
+      "struct<school:array<string>, age:int>, MAC map<string, int>, " +
+      "locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, ActiveProvince:string, " +
+      "Activecity:string, ActiveDistrict:string, " +
+      "ActiveStreet:string>>, proddate struct<productionDate:string," +
+      "activeDeactivedate:array<string>>, gamePointId double,contractNumber double," +
+      "st struct<school:struct<a:string,b:int>, age:int>,ar array<array<string>>) row format " +
+      "delimited fields terminated by ',' collection items terminated by '$' map keys terminated " +
+      "by ':'")
+    sql("LOAD DATA local inpath '" + resourcesPath +
+        "/complextypeWithEmptyRecords.csv' INTO table complexhivetable")
+    checkAnswer(sql("select count(*) from complexcarbontable"), Seq(Row(3)))
+    checkAnswer(sql("select * from complexcarbontable"),
+      sql("select * from complexhivetable"))
+    sql("DROP TABLE IF EXISTS complexcarbontable")
+  }
+
    test("select count(*) from empty_timestamp") {
     checkAnswer(
       sql("select count(*) from empty_timestamp"),
diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
index c5abfa8..256317e 100644
--- a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
@@ -761,7 +761,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
     // check one row of streaming data
     assert(result(0).isNullAt(0))
     assert(result(0).getString(1) == "")
-    assert(result(0).isNullAt(9))
+    assert(result(0).getStruct(9).isNullAt(1))
     // check one row of batch loading
     assert(result(50).getInt(0) == 100000001)
     assert(result(50).getString(1) == "batch_1")
@@ -924,12 +924,12 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null order by name"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null),
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null)),
         Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where name = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and name <> ''"),
@@ -937,7 +937,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where city = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and city <> ''"),
@@ -945,7 +945,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where salary is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and salary is not null"),
@@ -953,7 +953,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where tax is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and tax is not null"),
@@ -961,7 +961,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where percent is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and salary is not null"),
@@ -969,7 +969,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where birthday is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and birthday is not null"),
@@ -977,7 +977,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where register is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and register is not null"),
@@ -985,7 +985,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where updated is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
+      Seq(Row(null, "", "", null, null, null, null, null, null, Row(mutable.WrappedArray.make(Array()), null))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and updated is not null"),
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 46e4749..bf17751 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -30,11 +30,12 @@ import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
 
 /**
  * Array DataType stateless object used in data loading
  */
-public class ArrayDataType implements GenericDataType<ArrayObject> {
+public class ArrayDataType implements GenericDataType<Object> {
 
   /**
    * child columns
@@ -171,16 +172,27 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
   }
 
   @Override
-  public void writeByteArray(ArrayObject input, DataOutputStream dataOutputStream,
-      BadRecordLogHolder logHolder, Boolean isWithoutConverter) throws IOException {
+  public void writeByteArray(Object input, DataOutputStream dataOutputStream,
+      BadRecordLogHolder logHolder, Boolean isWithoutConverter, boolean isEmptyBadRecord)
+      throws IOException {
     if (input == null) {
       dataOutputStream.writeInt(1);
-      children.writeByteArray(null, dataOutputStream, logHolder, isWithoutConverter);
+      children.writeByteArray(null, dataOutputStream, logHolder, isWithoutConverter,
+          isEmptyBadRecord);
     } else {
-      Object[] data = input.getData();
-      dataOutputStream.writeInt(data.length);
+      Object[] data = ((ArrayObject) input).getData();
+      if (data.length == 1 && data[0] != null
+          && data[0].equals("") && !(children instanceof PrimitiveDataType)) {
+        // If child complex column is empty, no need to iterate. Fill empty byte array and return.
+        CarbonBadRecordUtil.updateEmptyValue(dataOutputStream, isEmptyBadRecord, logHolder,
+            parentName, DataTypeUtil.valueOf("array"));
+        return;
+      } else {
+        dataOutputStream.writeInt(data.length);
+      }
       for (Object eachInput : data) {
-        children.writeByteArray(eachInput, dataOutputStream, logHolder, isWithoutConverter);
+        children.writeByteArray(eachInput, dataOutputStream, logHolder, isWithoutConverter,
+            isEmptyBadRecord);
       }
     }
   }
@@ -268,7 +280,7 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
   }
 
   @Override
-  public GenericDataType<ArrayObject> deepCopy() {
+  public GenericDataType<Object> deepCopy() {
     return new ArrayDataType(this.outputArrayIndex, this.dataCounter, this.children.deepCopy(),
         this.name);
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index 5e8ac08..dc8d9ae 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -61,7 +61,7 @@ public interface GenericDataType<T> extends Serializable {
    * @throws IOException
    */
   void writeByteArray(T input, DataOutputStream dataOutputStream, BadRecordLogHolder logHolder,
-      Boolean isWithoutConverter)
+      Boolean isWithoutConverter, boolean isEmptyBadRecord)
       throws IOException;
 
   /**
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index b0a4263..26037dd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.loading.converter.impl.binary.BinaryDecoder;
 import org.apache.carbondata.processing.loading.dictionary.DirectDictionary;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
 
 /**
  * Primitive DataType stateless object used in data loading
@@ -237,7 +237,8 @@ public class PrimitiveDataType implements GenericDataType<Object> {
 
   @Override
   public void writeByteArray(Object input, DataOutputStream dataOutputStream,
-      BadRecordLogHolder logHolder, Boolean isWithoutConverter) throws IOException {
+      BadRecordLogHolder logHolder, Boolean isWithoutConverter, boolean isEmptyBadRecord)
+      throws IOException {
     String parsedValue = null;
     // write null value
     if (null == input || ((this.carbonDimension.getDataType() == DataTypes.STRING
@@ -245,6 +246,11 @@ public class PrimitiveDataType implements GenericDataType<Object> {
       updateNullValue(dataOutputStream, logHolder);
       return;
     }
+    if (input.equals("")) {
+      CarbonBadRecordUtil.updateEmptyValue(dataOutputStream, isEmptyBadRecord, logHolder,
+          carbonDimension.getColName(), this.carbonDimension.getDataType());
+      return;
+    }
     // write null value after converter
     if (!isWithoutConverter) {
       parsedValue = DataTypeUtil.parseValue(input.toString(), carbonDimension);
@@ -415,13 +421,8 @@ public class PrimitiveDataType implements GenericDataType<Object> {
   private void updateNullValue(DataOutputStream dataOutputStream, BadRecordLogHolder logHolder)
       throws IOException {
     CarbonUtil.updateNullValueBasedOnDatatype(dataOutputStream, this.carbonDimension.getDataType());
-    String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
-    if (null == message) {
-      message = CarbonDataProcessorUtil
-          .prepareFailureReason(carbonDimension.getColName(), carbonDimension.getDataType());
-      logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message);
-    }
-    logHolder.setReason(message);
+    CarbonBadRecordUtil.setErrorMessage(logHolder, carbonDimension.getColName(),
+        carbonDimension.getDataType().getName());
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index ab5e71f..5e2b010 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -30,11 +30,12 @@ import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.complexobjects.StructObject;
 import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.util.CarbonBadRecordUtil;
 
 /**
  * Struct DataType stateless object used in data loading
  */
-public class StructDataType implements GenericDataType<StructObject> {
+public class StructDataType implements GenericDataType<Object> {
 
   /**
    * children columns
@@ -173,22 +174,33 @@ public class StructDataType implements GenericDataType<StructObject> {
   }
 
   @Override
-  public void writeByteArray(StructObject input, DataOutputStream dataOutputStream,
-      BadRecordLogHolder logHolder, Boolean isWithoutConverter) throws IOException {
+  public void writeByteArray(Object input, DataOutputStream dataOutputStream,
+      BadRecordLogHolder logHolder, Boolean isWithoutConverter, boolean isEmptyBadRecord)
+      throws IOException {
     dataOutputStream.writeShort(children.size());
-    if (input == null) {
+    if (input == null || input.equals("")) {
       for (int i = 0; i < children.size(); i++) {
-        children.get(i).writeByteArray(null, dataOutputStream, logHolder, isWithoutConverter);
+        if (input != null && input.equals("") && (children.get(i) instanceof ArrayDataType)) {
+          // If child column is of array type and is empty, no need to iterate.
+          // Fill empty byte array and return.
+          CarbonBadRecordUtil.updateEmptyValue(dataOutputStream, isEmptyBadRecord, logHolder,
+              children.get(i).getParentName(), DataTypeUtil.valueOf("array"));
+        } else {
+          children.get(i).writeByteArray(input, dataOutputStream, logHolder, isWithoutConverter,
+              isEmptyBadRecord);
+        }
       }
     } else {
-      Object[] data = input.getData();
+      Object[] data = ((StructObject) input).getData();
       for (int i = 0; i < data.length && i < children.size(); i++) {
-        children.get(i).writeByteArray(data[i], dataOutputStream, logHolder, isWithoutConverter);
+        children.get(i).writeByteArray(data[i], dataOutputStream, logHolder, isWithoutConverter,
+            isEmptyBadRecord);
       }
 
       // For other children elements which don't have data, write empty
       for (int i = data.length; i < children.size(); i++) {
-        children.get(i).writeByteArray(null, dataOutputStream, logHolder, isWithoutConverter);
+        children.get(i).writeByteArray(null, dataOutputStream, logHolder, isWithoutConverter,
+            isEmptyBadRecord);
       }
     }
   }
@@ -290,7 +302,7 @@ public class StructDataType implements GenericDataType<StructObject> {
   }
 
   @Override
-  public GenericDataType<StructObject> deepCopy() {
+  public GenericDataType<Object> deepCopy() {
     List<GenericDataType> childrenClone = new ArrayList<>();
     for (GenericDataType child : children) {
       childrenClone.add(child.deepCopy());
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
index b00f1fc..f8a85ad 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
@@ -33,12 +33,14 @@ public class ComplexFieldConverterImpl implements FieldConverter {
 
   private int index;
   private DataField dataField;
+  private boolean isEmptyBadRecord;
 
-  public ComplexFieldConverterImpl(DataField dataField, GenericDataType genericDataType,
-      int index) {
+  public ComplexFieldConverterImpl(DataField dataField, GenericDataType genericDataType, int index,
+      boolean isEmptyBadRecord) {
     this.genericDataType = genericDataType;
     this.index = index;
     this.dataField = dataField;
+    this.isEmptyBadRecord = isEmptyBadRecord;
   }
 
   @Override
@@ -53,7 +55,7 @@ public class ComplexFieldConverterImpl implements FieldConverter {
     ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
     DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
     try {
-      genericDataType.writeByteArray(value, dataOutputStream, logHolder, false);
+      genericDataType.writeByteArray(value, dataOutputStream, logHolder, false, isEmptyBadRecord);
       dataOutputStream.close();
       return byteArray.toByteArray();
     } catch (Exception e) {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 4ad4cce..567a212 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -91,7 +91,8 @@ public class FieldEncoderFactory {
             isEmptyBadRecord);
       } else if (dataField.getColumn().isComplex()) {
         return new ComplexFieldConverterImpl(dataField,
-            createComplexDataType(dataField, nullFormat, getBinaryDecoder(binaryDecoder)), index);
+            createComplexDataType(dataField, nullFormat, getBinaryDecoder(binaryDecoder)),
+            index, isEmptyBadRecord);
       } else if (dataField.getColumn().getDataType() == DataTypes.BINARY) {
         BinaryDecoder binaryDecoderObject = getBinaryDecoder(binaryDecoder);
         return new BinaryFieldConverterImpl(dataField, nullFormat,
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
index 1525bb5..0af9935 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
@@ -32,7 +32,7 @@ import org.apache.commons.lang.ArrayUtils;
  * It is thread safe as the state of class don't change while
  * calling @{@link GenericParser#parse(Object)} method
  */
-public class ArrayParserImpl implements ComplexParser<ArrayObject> {
+public class ArrayParserImpl implements ComplexParser<Object> {
 
   protected Pattern pattern;
 
@@ -46,7 +46,7 @@ public class ArrayParserImpl implements ComplexParser<ArrayObject> {
   }
 
   @Override
-  public ArrayObject parse(Object data) {
+  public Object parse(Object data) {
     if (data != null) {
       String value = data.toString();
       if (!value.isEmpty() && !value.equals(nullFormat)
@@ -61,7 +61,7 @@ public class ArrayParserImpl implements ComplexParser<ArrayObject> {
         }
       } else if (value.isEmpty()) {
         Object[] array = new Object[1];
-        array[0] = child.parse(value);
+        array[0] = value;
         return new ArrayObject(array);
       } else if (value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) {
         // When the data is not array('') but array(), an array with zero size should be returned.
@@ -73,7 +73,7 @@ public class ArrayParserImpl implements ComplexParser<ArrayObject> {
   }
 
   @Override
-  public ArrayObject parseRaw(Object data) {
+  public Object parseRaw(Object data) {
     return new ArrayObject((Object[]) data);
   }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
index c5bc7a5..9cc3524 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
@@ -38,7 +38,7 @@ public class MapParserImpl extends ArrayParserImpl {
   //The Key for Map will always be a PRIMITIVE type so Set<Object> here will work fine
   //The last occurance of the key, value pair will be added and all others will be overwritten
   @Override
-  public ArrayObject parse(Object data) {
+  public Object parse(Object data) {
     if (data != null) {
       String value = data.toString();
       if (!value.isEmpty() && !value.equals(nullFormat)
@@ -62,6 +62,10 @@ public class MapParserImpl extends ArrayParserImpl {
           }
           return new ArrayObject(array.toArray());
         }
+      } else if (value.isEmpty()) {
+        Object[] array = new Object[1];
+        array[0] = value;
+        return new ArrayObject(array);
       } else if (value.equals(CarbonCommonConstants.SIZE_ZERO_DATA_RETURN)) {
         // When the data is not map('','') but map(), an array with zero size should be returned.
         Object[] array = new Object[0];
@@ -72,7 +76,7 @@ public class MapParserImpl extends ArrayParserImpl {
   }
 
   @Override
-  public ArrayObject parseRaw(Object data) {
+  public Object parseRaw(Object data) {
     Object[] keyValuePairs = ((Object[]) data);
     Object[] objectArray = new Object[keyValuePairs.length];
     for (int i = 0; i < ((Object[]) data).length; i++) {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
index 91afc37..cf8db97 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/StructParserImpl.java
@@ -33,7 +33,7 @@ import org.apache.commons.lang.ArrayUtils;
  * It is thread safe as the state of class don't change while
  * calling @{@link GenericParser#parse(Object)} method
  */
-public class StructParserImpl implements ComplexParser<StructObject> {
+public class StructParserImpl implements ComplexParser<Object> {
 
   private Pattern pattern;
 
@@ -47,10 +47,13 @@ public class StructParserImpl implements ComplexParser<StructObject> {
   }
 
   @Override
-  public StructObject parse(Object data) {
+  public Object parse(Object data) {
     if (data != null) {
       String value = data.toString();
-      if (!value.isEmpty() && !value.equals(nullFormat)) {
+      if (value.isEmpty()) {
+        return value;
+      }
+      if (!value.equals(nullFormat)) {
         String[] split = pattern.split(value, -1);
         if (ArrayUtils.isNotEmpty(split)) {
           Object[] array = new Object[children.size()];
@@ -65,7 +68,7 @@ public class StructParserImpl implements ComplexParser<StructObject> {
   }
 
   @Override
-  public StructObject parseRaw(Object data) {
+  public Object parseRaw(Object data) {
     Object[] d = ((Object[]) data);
     Object[] array = new Object[children.size()];
     for (int i = 0; i < d.length; i++) {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index 243a105..d08b197 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -441,10 +441,13 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
       ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
       DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
       try {
-        GenericDataType complextType =
+        GenericDataType complexType =
             dataFieldsWithComplexDataType.get(dataField.getColumn().getOrdinal());
-        complextType
-            .writeByteArray(data[orderedIndex], dataOutputStream, logHolder, isWithoutConverter);
+        boolean isEmptyBadRecord = Boolean.parseBoolean(
+            configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+                .toString());
+        complexType.writeByteArray(data[orderedIndex], dataOutputStream, logHolder,
+            isWithoutConverter, isEmptyBadRecord);
         dataOutputStream.close();
         newData[index] = byteArray.toByteArray();
       } catch (BadRecordFoundException e) {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
index 5e9ed7d..26e838c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.processing.util;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
@@ -27,10 +28,13 @@ import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.loading.BadRecordsLogger;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 
 import org.apache.commons.lang.StringUtils;
@@ -154,4 +158,22 @@ public class CarbonBadRecordUtil {
     return badRecordsPath;
   }
 
+  public static void updateEmptyValue(DataOutputStream dataOutputStream, boolean isEmptyBadRecord,
+      BadRecordLogHolder logHolder, String parentName, DataType dataType) throws IOException {
+    CarbonUtil.updateWithEmptyValueBasedOnDatatype(dataOutputStream, dataType);
+    if (isEmptyBadRecord) {
+      CarbonBadRecordUtil.setErrorMessage(logHolder, parentName, dataType.getName());
+    }
+  }
+
+  public static void setErrorMessage(BadRecordLogHolder logHolder, String columnName,
+      String datatypeName) {
+    String message = logHolder.getColumnMessageMap().get(columnName);
+    if (null == message) {
+      message = "The value with column name " + columnName + " and column data type " + datatypeName
+          + " is not a valid " + datatypeName + " type.";
+      logHolder.getColumnMessageMap().put(columnName, message);
+    }
+    logHolder.setReason(message);
+  }
 }