You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/10/23 09:13:51 UTC

[carbondata] branch master updated: [Carbondata-3954] Fix insertion from ORC table into carbon table when sort scope is global sort

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ec9a63  [Carbondata-3954] Fix insertion from ORC table into carbon table when sort scope is global sort
7ec9a63 is described below

commit 7ec9a6356b8e7334505434deaf0abb878a8bd2df
Author: Karan980 <ka...@gmail.com>
AuthorDate: Mon Oct 12 11:47:47 2020 +0530

    [Carbondata-3954] Fix insertion from ORC table into carbon table when sort scope is global sort
    
    Why is this PR needed?
    Loading records from ORC table having null values into carbon table having sort scope as global sort gives NPE.
    
    What changes were proposed in this PR?
    Added null check for arrayType and mapType before writing the data into byteArrays
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3979
---
 .../core/scan/complextypes/ArrayQueryType.java     |  3 +
 .../core/scan/complextypes/StructQueryType.java    | 10 +++
 .../scan/complextypes/StructQueryTypeTest.java     |  4 +-
 .../query/SecondaryIndexQueryResultProcessor.java  |  2 +-
 .../apache/carbondata/spark/util/CommonUtil.scala  |  5 +-
 .../allqueries/InsertIntoCarbonTableTestCase.scala | 74 ++++++++++++++++++++++
 ...nTransactionalCarbonTableWithAvroDataType.scala |  6 +-
 .../dataload/TestGlobalSortDataLoad.scala          | 33 +++++-----
 .../carbondata/TestStreamingTableOpName.scala      | 20 +++---
 .../TestStreamingTableWithRowParser.scala          | 20 +++---
 .../carbondata/sdk/file/AvroCarbonWriterTest.java  |  2 +-
 11 files changed, 135 insertions(+), 44 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
index 0e1bd7f..909f8e1 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
@@ -131,6 +131,9 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType
         data[i] = children.getDataBasedOnDataType(dataBuffer);
       }
     }
+    if (dataLength == 1 && data[0] == null) {
+      return null;
+    }
     return data;
   }
 
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
index 8ee0053..2ff41c7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -120,6 +120,16 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
     for (int i = 0; i < childLength; i++) {
       fields[i] =  children.get(i).getDataBasedOnDataType(dataBuffer);
     }
+    boolean isAllNull = true;
+    for (Object field : fields) {
+      if (field != null) {
+        isAllNull = false;
+        break;
+      }
+    }
+    if (isAllNull) {
+      return null;
+    }
     return DataTypeUtil.getDataTypeConverter().wrapWithGenericRow(fields);
   }
 
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
index 98f8117..8be9f52 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
@@ -27,7 +27,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 public class StructQueryTypeTest {
   private static StructQueryType structQueryType;
@@ -47,7 +47,7 @@ public class StructQueryTypeTest {
     structQueryType.addChildren(arrayQueryType);
     List children = new ArrayList();
     children.add(arrayQueryType);
-    assertNotNull(structQueryType.getDataBasedOnDataType(surrogateData));
+    assertNull(structQueryType.getDataBasedOnDataType(surrogateData));
   }
 
   @Test public void testGetColsCount() {
diff --git a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
index 1138ff7..41a5c43 100644
--- a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
+++ b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
@@ -361,7 +361,7 @@ public class SecondaryIndexQueryResultProcessor {
    * This method will return complex array primitive data
    */
   private Object getData(Object[] data, int index, DataType dataType) {
-    if (data.length == 0) {
+    if (data == null || data.length == 0) {
       return new byte[0];
     } else if (data[0] == null) {
       return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 1f1cd32..431df5f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -1009,6 +1009,10 @@ object CommonUtil {
 
   private def convertSparkComplexTypeToCarbonObject(data: AnyRef,
       objectDataType: DataType): AnyRef = {
+    if (data == null && (objectDataType.isInstanceOf[ArrayType]
+      || objectDataType.isInstanceOf[MapType] || objectDataType.isInstanceOf[StructType])) {
+      return null
+    }
     objectDataType match {
       case _: ArrayType =>
         val arrayDataType = objectDataType.asInstanceOf[ArrayType]
@@ -1086,5 +1090,4 @@ object CommonUtil {
       }
     }
   }
-
 }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 3c1331c..1d385cb 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -69,6 +69,80 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
 
   }
 
+  test("insert from orc-select columns with complex columns having null values and sort scope as global sort") {
+    sql("drop table if exists TORCSource")
+    sql("drop table if exists TCarbon")
+    sql("create table TORCSource(name string,col1 array<String>,col2 map<String,String>,col3 struct<school:string, age:int>,fee int) STORED AS orc")
+    sql("insert into TORCSource values('karan',null,null,null,2)")
+    sql("create table TCarbon(name string,col1 array<String>,col2 map<String,String>,col3 struct<school:string, age:int>,fee int) STORED AS carbondata TBLPROPERTIES ('SORT_COLUMNS'='name','TABLE_BLOCKSIZE'='128','TABLE_BLOCKLET_SIZE'='128','SORT_SCOPE'='global_SORT')")
+    sql("insert overwrite table TCarbon select name,col1,col2,col3,fee from TORCSource")
+    checkAnswer(sql("select * from TORCSource"), sql("select * from TCarbon"))
+    sql("drop table if exists TORCSource")
+    sql("drop table if exists TCarbon")
+  }
+
+  test("insert from orc-select columns with complex columns having null values and sort scope as local sort") {
+    sql("drop table if exists TORCSource")
+    sql("drop table if exists TCarbon")
+    sql("create table TORCSource(name string,col1 array<String>,col2 map<String,String>,col3 struct<school:string, age:int>,fee int) STORED AS orc")
+    sql("insert into TORCSource values('karan',null,null,null,2)")
+    sql("create table TCarbon(name string,col1 array<String>,col2 map<String,String>,col3 struct<school:string, age:int>,fee int) STORED AS carbondata TBLPROPERTIES ('SORT_COLUMNS'='name','TABLE_BLOCKSIZE'='128','TABLE_BLOCKLET_SIZE'='128','SORT_SCOPE'='local_SORT')")
+    sql("insert overwrite table TCarbon select name,col1,col2,col3,fee from TORCSource")
+    checkAnswer(sql("select * from TORCSource"), sql("select * from TCarbon"))
+    sql("drop table if exists TORCSource")
+    sql("drop table if exists TCarbon")
+  }
+
+  test("insert from orc-select columns with complex columns having null values and sort scope as global sort and bad record handling enabled") {
+    val initialPropertyValue = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
+    sql("drop table if exists TORCSource")
+    sql("drop table if exists TCarbon")
+    sql("create table TORCSource(name string,col1 array<String>,col2 map<String,String>,col3 struct<school:string, age:int>,fee int) STORED AS orc")
+    sql("insert into TORCSource values('karan',null,null,null,2)")
+    sql("create table TCarbon(name string,col1 array<String>,col2 map<String,String>,col3 struct<school:string, age:int>,fee int) STORED AS carbondata TBLPROPERTIES ('SORT_COLUMNS'='name','TABLE_BLOCKSIZE'='128','TABLE_BLOCKLET_SIZE'='128','SORT_SCOPE'='global_SORT')")
+    sql("insert overwrite table TCarbon select name,col1,col2,col3,fee from TORCSource")
+    checkAnswer(sql("select * from TORCSource"), sql("select * from TCarbon"))
+    sql("drop table if exists TORCSource")
+    sql("drop table if exists TCarbon")
+    if (initialPropertyValue == null) {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
+          CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT_DEFAULT)
+    } else {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
+          initialPropertyValue)
+    }
+  }
+
+  test("insert from orc-select columns with complex columns having null values and sort scope as local sort and bad record handling enabled") {
+    val initialPropertyValue = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
+    sql("drop table if exists TORCSource")
+    sql("drop table if exists TCarbon")
+    sql("create table TORCSource(name string,col1 array<String>,col2 map<String,String>,col3 struct<school:string, age:int>,fee int) STORED AS orc")
+    sql("insert into TORCSource values('karan',null,null,null,2)")
+    sql("create table TCarbon(name string,col1 array<String>,col2 map<String,String>,col3 struct<school:string, age:int>,fee int) STORED AS carbondata TBLPROPERTIES ('SORT_COLUMNS'='name','TABLE_BLOCKSIZE'='128','TABLE_BLOCKLET_SIZE'='128','SORT_SCOPE'='local_SORT')")
+    sql("insert overwrite table TCarbon select name,col1,col2,col3,fee from TORCSource")
+    checkAnswer(sql("select * from TORCSource"), sql("select * from TCarbon"))
+    sql("drop table if exists TORCSource")
+    sql("drop table if exists TCarbon")
+    if (initialPropertyValue == null) {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
+          CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT_DEFAULT)
+    } else {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT,
+          initialPropertyValue)
+    }
+  }
+
   test("insert from carbon-select * columns") {
      sql("drop table if exists TCarbonSource")
      sql("drop table if exists TCarbon")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
index ac12dff..32e2c7d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithAvroDataType.scala
@@ -352,7 +352,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     sql(
       s"""CREATE EXTERNAL TABLE sdkOutputTable STORED AS carbondata
          |LOCATION '$writerPath' """.stripMargin)
-    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(null))))
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(null)))
   }
 
   test("test union type with only type null") {
@@ -503,7 +503,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
       s"""CREATE EXTERNAL TABLE sdkOutputTable STORED AS carbondata
          |LOCATION '$writerPath' """.stripMargin)
     checkAnswer(sql("select * from sdkOutputTable"),
-      Seq(Row(Row(mutable.WrappedArray.make(Array(null)), 12))))
+      Seq(Row(Row(null, 12))))
   }
 
   test("test Struct of Union") {
@@ -1281,7 +1281,7 @@ class TestNonTransactionalCarbonTableWithAvroDataType extends QueryTest with Bef
     sql(
       s"""CREATE EXTERNAL TABLE sdkOutputTable STORED AS carbondata
          |LOCATION '$writerPath' """.stripMargin)
-    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(Row(null, null), Row("ab")))))
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row(Row(null, Row("ab")))))
   }
 
   test("test union with multiple Enum type") {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 2c78561..2a91833 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -20,7 +20,6 @@ package org.apache.carbondata.spark.testsuite.dataload
 import java.io.{File, FileWriter}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
@@ -50,7 +49,8 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     sql(
       """
         | CREATE TABLE carbon_globalsort(id INT, name STRING, city STRING, age INT)
-        | STORED AS carbondata TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'sort_columns' = 'name, city')
+        | STORED AS carbondata
+        | TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT', 'sort_columns' = 'name, city')
       """.stripMargin)
   }
 
@@ -449,11 +449,10 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
          | STORED AS carbondata
          | TBLPROPERTIES('sort_scope'='local_sort','sort_columns'='stringField')
        """.stripMargin)
-    sql(
-      s"""
-         | LOAD DATA LOCAL INPATH '$path' INTO TABLE carbon_localsort_difftypes
-         | OPTIONS('FILEHEADER'='shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField')
-       """.stripMargin)
+
+    sql(s"LOAD DATA LOCAL INPATH '$path' INTO TABLE carbon_localsort_difftypes " +
+        "OPTIONS('FILEHEADER'='shortField,intField,bigintField,doubleField,stringField," +
+        "timestampField,decimalField,dateField,charField,floatField')")
 
     sql("DROP TABLE IF EXISTS carbon_globalsort_difftypes")
     sql(
@@ -474,11 +473,9 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
          | 'SORT_SCOPE'='GLOBAL_SORT', 'sort_columns' = 'stringField')
        """.stripMargin)
     sql(
-      s"""
-         | LOAD DATA LOCAL INPATH '$path' INTO TABLE carbon_globalsort_difftypes
-         | OPTIONS(
-         | 'FILEHEADER'='shortField,intField,bigintField,doubleField,stringField,timestampField,decimalField,dateField,charField,floatField')
-       """.stripMargin)
+      s"LOAD DATA LOCAL INPATH '$path' INTO TABLE carbon_globalsort_difftypes " +
+      "OPTIONS('FILEHEADER'='shortField,intField,bigintField,doubleField,stringField," +
+      "timestampField,decimalField,dateField,charField,floatField')".stripMargin)
 
     checkAnswer(sql("SELECT * FROM carbon_globalsort_difftypes ORDER BY shortField"),
       sql("SELECT * FROM carbon_localsort_difftypes ORDER BY shortField"))
@@ -488,9 +485,13 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
   test("test global sort with null values") {
     sql("drop table if exists source")
     sql("drop table if exists sink")
-    sql("create table source(a string, b int, c int, d int, e int, f int, dec decimal(3,2), arr array<string>, str struct<a:string>, map map<string, string>) stored as carbondata TBLPROPERTIES('bad_record_action'='force')")
+    sql("create table source(a string, b int, c int, d int, e int, f int, dec decimal(3,2)," +
+      " arr array<string>, str struct<a:string>, map map<string, string>)" +
+      " stored as carbondata TBLPROPERTIES('bad_record_action'='force')")
     sql("insert into source select 'k','k', 'k','k','k', 'k',null,null,null,map('null','null')")
-    sql("create table sink (a string, b string, c int, d bigint, e double, f char(5),  dec decimal(3,2), arr array<string>, str struct<a:string>, map map<string, string>) stored as carbondata TBLPROPERTIES('sort_scope'='global_sort', 'sort_columns'='b,c,d,f')")
+    sql("create table sink (a string, b string, c int, d bigint, e double, f char(5)," +
+      "  dec decimal(3,2), arr array<string>, str struct<a:string>, map map<string, string>)" +
+      " stored as carbondata TBLPROPERTIES('sort_scope'='global_sort', 'sort_columns'='b,c,d,f')")
     sql("insert into sink select * from source")
     checkAnswer(sql("select * from sink"),
       Row("k",
@@ -500,8 +501,8 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
         null,
         null,
         null,
-        mutable.WrappedArray.make(Array(null)),
-        Row(null),
+        null,
+        null,
         Map("null" -> "null")))
   }
 
diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
index 09fdc8b..acffa9c 100644
--- a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
@@ -761,7 +761,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
     // check one row of streaming data
     assert(result(0).isNullAt(0))
     assert(result(0).getString(1) == "")
-    assert(result(0).getStruct(9).isNullAt(1))
+    assert(result(0).isNullAt(9))
     // check one row of batch loading
     assert(result(50).getInt(0) == 100000001)
     assert(result(50).getString(1) == "batch_1")
@@ -924,12 +924,12 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null order by name"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)),
+      Seq(Row(null, "", "", null, null, null, null, null, null, null),
         Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where name = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and name <> ''"),
@@ -937,7 +937,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where city = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and city <> ''"),
@@ -945,7 +945,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where salary is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and salary is not null"),
@@ -953,7 +953,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where tax is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and tax is not null"),
@@ -961,7 +961,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where percent is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and salary is not null"),
@@ -969,7 +969,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where birthday is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and birthday is not null"),
@@ -977,7 +977,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where register is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and register is not null"),
@@ -985,7 +985,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where updated is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and updated is not null"),
diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 22af9ba..4c5444f 100644
--- a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -429,7 +429,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
     // check one row of streaming data
     assert(result(0).isNullAt(0))
     assert(result(0).getString(1) == "")
-    assert(result(0).getStruct(9).isNullAt(1))
+    assert(result(0).isNullAt(9))
     // check one row of batch loading
     assert(result(50).getInt(0) == 100000001)
     assert(result(50).getString(1) == "batch_1")
@@ -592,12 +592,12 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null order by name"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)),
+      Seq(Row(null, "", "", null, null, null, null, null, null, null),
         Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where name = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and name <> ''"),
@@ -605,7 +605,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where city = ''"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and city <> ''"),
@@ -613,7 +613,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where salary is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and salary is not null"),
@@ -621,7 +621,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where tax is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and tax is not null"),
@@ -629,7 +629,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where percent is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and salary is not null"),
@@ -637,7 +637,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where birthday is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and birthday is not null"),
@@ -645,7 +645,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where register is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)),
+      Seq(Row(null, "", "", null, null, null, null, null, null, null),
         Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), null, Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6))))
 
     checkAnswer(
@@ -654,7 +654,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where updated is null"),
-      Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null))))
+      Seq(Row(null, "", "", null, null, null, null, null, null, null)))
 
     checkAnswer(
       sql("select * from stream_table_filter_complex where id is null and updated is not null"),
diff --git a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index dcc874b..94349b8 100644
--- a/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ b/sdk/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -642,7 +642,7 @@ public class AvroCarbonWriterTest {
     Assert.assertTrue(row.length == 3);
     if (sum % 2 != 0) {
       Assert.assertEquals(row[0], "Alyssa");
-      Assert.assertNull(((Object[]) row[1])[0]);
+      Assert.assertNull(row[1]);
     } else {
       Assert.assertEquals(row[0], "Ben");
       Assert.assertEquals(((Object[]) row[1])[0], "red");