You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/12/20 17:05:16 UTC

carbondata git commit: [CARBONDATA-3179] Map Data Load Failure and Struct Projection Pushdown Issue

Repository: carbondata
Updated Branches:
  refs/heads/master 34923db0e -> 96b2ea364


[CARBONDATA-3179] Map Data Load Failure and Struct Projection Pushdown Issue

Problem1 : Data Load failing for Insert into Select from same table in containing Map datatype.
Solution: Map type was not handled for this scenario. Handled it now.

Problem2 : Projection Pushdown not supported for table containing Struct of Map.
Solution: Pass the parent column only for projection pushdown if table contains MapType.

This closes #2993


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/96b2ea36
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/96b2ea36
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/96b2ea36

Branch: refs/heads/master
Commit: 96b2ea3646a2a768133880bb2e4c1318d366b482
Parents: 34923db
Author: manishnalla1994 <ma...@gmail.com>
Authored: Fri Dec 14 17:20:15 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Thu Dec 20 22:33:58 2018 +0530

----------------------------------------------------------------------
 .../TestCreateDDLForComplexMapType.scala        | 71 +++++++++++++++++++-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  6 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 12 ++--
 .../carbondata/spark/util/CarbonScalaUtil.scala | 25 ++++---
 .../spark/rdd/CarbonDataRDDFactory.scala        |  5 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    | 37 ++++++----
 .../streaming/parser/FieldConverter.scala       | 44 ++++++------
 .../streaming/parser/RowStreamParserImp.scala   | 16 +++--
 8 files changed, 150 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
index 09f23e5..9006b61 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk
-
 import scala.collection.JavaConversions._
 
 class TestCreateDDLForComplexMapType extends QueryTest with BeforeAndAfterAll {
@@ -471,4 +470,74 @@ class TestCreateDDLForComplexMapType extends QueryTest with BeforeAndAfterAll {
         "sort_columns is unsupported for map datatype column: mapfield"))
   }
 
+  test("Data Load Fail Issue") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<INT,STRING>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE carbon OPTIONS(
+         | 'header' = 'false')
+       """.stripMargin)
+    sql("INSERT INTO carbon SELECT * FROM carbon")
+    checkAnswer(sql("select * from carbon"), Seq(
+      Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar")),
+      Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar")),
+      Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar")),
+      Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar"))
+      ))
+  }
+
+  test("Struct inside map") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<INT,struct<kk:STRING,mm:STRING>>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+    sql("INSERT INTO carbon values('1\002man\003nan\0012\002kands\003dsnknd')")
+    sql("INSERT INTO carbon SELECT * FROM carbon")
+    checkAnswer(sql("SELECT * FROM carbon limit 1"),
+      Seq(Row(Map(1 -> Row("man", "nan"), (2 -> Row("kands", "dsnknd"))))))
+  }
+
+  test("Struct inside map pushdown") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<INT,struct<kk:STRING,mm:STRING>>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+    sql("INSERT INTO carbon values('1\002man\003nan\0012\002kands\003dsnknd')")
+    checkAnswer(sql("SELECT mapField[1].kk FROM carbon"), Row("man"))
+  }
+
+  test("Map inside struct") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | structField struct<intVal:INT,map1:MAP<STRING,STRING>>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+    sql("INSERT INTO carbon values('1\001man\003nan\002kands\003dsnknd')")
+    val res = sql("SELECT structField.intVal FROM carbon").show(false)
+    checkAnswer(sql("SELECT structField.intVal FROM carbon"), Seq(Row(1)))
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 8574e66..b67fc71 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.rdd
 import java.io.{DataInputStream, InputStreamReader}
 import java.nio.charset.Charset
 import java.text.SimpleDateFormat
+import java.util
 import java.util.regex.Pattern
 
 import scala.collection.mutable
@@ -293,11 +294,12 @@ class CarbonBlockDistinctValuesCombineRDD(
         row = rddIter.next()
         if (row != null) {
           rowCount += 1
+          val complexDelimiters = new util.ArrayList[String]
+          model.delimiters.foreach(x => complexDelimiters.add(x))
           for (i <- 0 until dimNum) {
             dimensionParsers(i).parseString(CarbonScalaUtil.getString(row.get(i),
               model.serializationNullFormat,
-              model.delimiters(0),
-              model.delimiters(1),
+              complexDelimiters,
               timeStampFormat,
               dateFormat))
           }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index f76a8d9..f44bb8d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -341,9 +341,7 @@ class NewRddIterator(rddIter: Iterator[Row],
   private val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
     .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
   private val dateFormat = new SimpleDateFormat(dateFormatString)
-  private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
-  private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
-  private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2)
+  private val complexDelimiters = carbonLoadModel.getComplexDelimiters
   private val serializationNullFormat =
     carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
   import scala.collection.JavaConverters._
@@ -357,7 +355,7 @@ class NewRddIterator(rddIter: Iterator[Row],
     val columns = new Array[AnyRef](row.length)
     for (i <- 0 until columns.length) {
       columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
-        delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat,
+        complexDelimiters, timeStampFormat, dateFormat,
         isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i))
     }
     columns
@@ -391,9 +389,7 @@ class LazyRddIterator(serializer: SerializerInstance,
     .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
       CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
   private val dateFormat = new SimpleDateFormat(dateFormatString)
-  private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
-  private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
-  private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2)
+  private val complexDelimiters = carbonLoadModel.getComplexDelimiters
   private val serializationNullFormat =
     carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
   // the order of fields in dataframe and createTable may be different, here we need to know whether
@@ -431,7 +427,7 @@ class LazyRddIterator(serializer: SerializerInstance,
     val columns = new Array[AnyRef](row.length)
     for (i <- 0 until columns.length) {
       columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
-        delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat,
+        complexDelimiters, timeStampFormat, dateFormat,
         isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i))
     }
     columns

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index ca9b4af..626d92f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -62,22 +62,22 @@ object CarbonScalaUtil {
 
   def getString(value: Any,
       serializationNullFormat: String,
-      delimiterLevel1: String,
-      delimiterLevel2: String,
+      complexDelimiters: util.ArrayList[String],
       timeStampFormat: SimpleDateFormat,
       dateFormat: SimpleDateFormat,
       isVarcharType: Boolean = false,
-      level: Int = 1): String = {
-    FieldConverter.objectToString(value, serializationNullFormat, delimiterLevel1,
-      delimiterLevel2, timeStampFormat, dateFormat, isVarcharType = isVarcharType, level)
+      level: Int = 0): String = {
+    FieldConverter.objectToString(value, serializationNullFormat, complexDelimiters,
+      timeStampFormat, dateFormat, isVarcharType = isVarcharType, level)
   }
 
   /**
    * Converts incoming value to String after converting data as per the data type.
-   * @param value Input value to convert
-   * @param dataType Datatype to convert and then convert to String
+   *
+   * @param value           Input value to convert
+   * @param dataType        Datatype to convert and then convert to String
    * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes
-   * @param dateFormat DataFormat to convert in case of DateType datatype
+   * @param dateFormat      DataFormat to convert in case of DateType datatype
    * @return converted String
    */
   def convertToDateAndTimeFormats(
@@ -126,7 +126,8 @@ object CarbonScalaUtil {
 
   /**
    * Converts incoming value to String after converting data as per the data type.
-   * @param value Input value to convert
+   *
+   * @param value  Input value to convert
    * @param column column which it value belongs to
    * @return converted String
    */
@@ -183,7 +184,8 @@ object CarbonScalaUtil {
 
   /**
    * Converts incoming value to String after converting data as per the data type.
-   * @param value Input value to convert
+   *
+   * @param value  Input value to convert
    * @param column column which it value belongs to
    * @return converted String
    */
@@ -238,6 +240,7 @@ object CarbonScalaUtil {
 
   /**
    * Update partition values as per the right date and time format
+   *
    * @return updated partition spec
    */
   def updatePartitions(partitionSpec: mutable.LinkedHashMap[String, String],
@@ -466,7 +469,7 @@ object CarbonScalaUtil {
       }
     } catch {
       case e: Exception =>
-        // ignore it
+      // ignore it
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 3b69f9e..5d03026 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -983,15 +983,14 @@ object CarbonDataRDDFactory {
     // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
     val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
       // input data from DataFrame
-      val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
-      val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
+      val complexDelimiters = carbonLoadModel.getComplexDelimiters
       val serializationNullFormat =
         carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
       dataFrame.get.rdd.map { row =>
         if (null != row && row.length > partitionColumnIndex &&
             null != row.get(partitionColumnIndex)) {
           (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
-            delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+            complexDelimiters, timeStampFormat, dateFormat), row)
         } else {
           (null, row)
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index f848ae1..672508f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -24,7 +24,7 @@ import scala.util.control.Breaks._
 import org.apache.spark.CarbonInputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, GetArrayItem, GetStructField, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, GetArrayItem, GetMapValue, GetStructField, NamedExpression}
 import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
@@ -101,32 +101,41 @@ case class CarbonDatasourceHadoopRelation(
     if (!complexFilterExists.exists(f => f.contains(true))) {
       var parentColumn = new ListBuffer[String]
       // In case of Struct or StructofStruct Complex type, get the project column for given
-      // parent/child field and pushdown the corresponding project column. In case of Array,
-      // ArrayofStruct or StructofArray, pushdown parent column
+      // parent/child field and pushdown the corresponding project column. In case of Array, Map,
+      // ArrayofStruct, StructofArray, MapOfStruct or StructOfMap, pushdown parent column
       var reqColumns = projects.map {
         case a@Alias(s: GetStructField, name) =>
-          var arrayTypeExists = false
-          var ifGetArrayItemExists = s
+          var arrayOrMapTypeExists = false
+          var ifGetArrayOrMapItemExists = s
           breakable({
-            while (ifGetArrayItemExists.containsChild != null) {
-              if (ifGetArrayItemExists.childSchema.toString().contains("ArrayType")) {
-                arrayTypeExists = ifGetArrayItemExists.childSchema.toString().contains("ArrayType")
+            while (ifGetArrayOrMapItemExists.containsChild != null) {
+              if (ifGetArrayOrMapItemExists.childSchema.toString().contains("ArrayType") ||
+                  ifGetArrayOrMapItemExists.childSchema.toString().contains("MapType")) {
+                arrayOrMapTypeExists = true
                 break
               }
-              if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) {
-                arrayTypeExists = s.childSchema.toString().contains("ArrayType")
+              if (ifGetArrayOrMapItemExists.child.isInstanceOf[AttributeReference]) {
+                arrayOrMapTypeExists = s.childSchema.toString().contains("ArrayType") ||
+                                       s.childSchema.toString().contains("MapType")
                 break
               } else {
-                if (ifGetArrayItemExists.child.isInstanceOf[GetArrayItem]) {
-                  arrayTypeExists = true
+                if (ifGetArrayOrMapItemExists.child.isInstanceOf[GetArrayItem] ||
+                    ifGetArrayOrMapItemExists.child.isInstanceOf[GetMapValue]) {
+                  arrayOrMapTypeExists = true
                   break
                 } else {
-                  ifGetArrayItemExists = ifGetArrayItemExists.child.asInstanceOf[GetStructField]
+                  if (ifGetArrayOrMapItemExists.child.isInstanceOf[GetStructField]) {
+                    ifGetArrayOrMapItemExists = ifGetArrayOrMapItemExists.child
+                      .asInstanceOf[GetStructField]
+                  } else {
+                    arrayOrMapTypeExists = true
+                    break
+                  }
                 }
               }
             }
           })
-          if (!arrayTypeExists) {
+          if (!arrayOrMapTypeExists) {
             parentColumn += s.toString().split("\\.")(0).replaceAll("#.*", "").toLowerCase
             parentColumn = parentColumn.distinct
             s.toString().replaceAll("#[0-9]*", "").toLowerCase

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
index cfafd40..5c67dfb 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.streaming.parser
 
 import java.nio.charset.Charset
 import java.text.SimpleDateFormat
+import java.util
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 
@@ -28,8 +29,7 @@ object FieldConverter {
    * Return a String representation of the input value
    * @param value input value
    * @param serializationNullFormat string for null value
-   * @param delimiterLevel1 level 1 delimiter for complex type
-   * @param delimiterLevel2 level 2 delimiter for complex type
+   * @param complexDelimiters List of Complex Delimiters
    * @param timeStampFormat timestamp format
    * @param dateFormat date format
    * @param isVarcharType whether it is varchar type. A varchar type has no string length limit
@@ -38,12 +38,11 @@ object FieldConverter {
   def objectToString(
       value: Any,
       serializationNullFormat: String,
-      delimiterLevel1: String,
-      delimiterLevel2: String,
+      complexDelimiters: util.ArrayList[String],
       timeStampFormat: SimpleDateFormat,
       dateFormat: SimpleDateFormat,
       isVarcharType: Boolean = false,
-      level: Int = 1): String = {
+      level: Int = 0): String = {
     if (value == null) {
       serializationNullFormat
     } else {
@@ -66,30 +65,35 @@ object FieldConverter {
         case bs: Array[Byte] => new String(bs,
           Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
         case s: scala.collection.Seq[Any] =>
-          val delimiter = if (level == 1) {
-            delimiterLevel1
-          } else {
-            delimiterLevel2
-          }
+          val delimiter = complexDelimiters.get(level)
           val builder = new StringBuilder()
           s.foreach { x =>
-            builder.append(objectToString(x, serializationNullFormat, delimiterLevel1,
-              delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1))
+            builder.append(objectToString(x, serializationNullFormat, complexDelimiters,
+              timeStampFormat, dateFormat, isVarcharType, level + 1))
               .append(delimiter)
           }
           builder.substring(0, builder.length - delimiter.length())
+        // First convert the 'key' of Map and then append the keyValueDelimiter and then convert
+        // the 'value of the map and append delimiter
         case m: scala.collection.Map[_, _] =>
-          throw new Exception("Unsupported data type: Map")
-        case r: org.apache.spark.sql.Row =>
-          val delimiter = if (level == 1) {
-            delimiterLevel1
-          } else {
-            delimiterLevel2
+          val delimiter = complexDelimiters.get(level)
+          val keyValueDelimiter = complexDelimiters.get(level + 1)
+          val builder = new StringBuilder()
+          m.foreach { x =>
+            builder.append(objectToString(x._1, serializationNullFormat, complexDelimiters,
+              timeStampFormat, dateFormat, isVarcharType, level + 2))
+              .append(keyValueDelimiter)
+            builder.append(objectToString(x._2, serializationNullFormat, complexDelimiters,
+              timeStampFormat, dateFormat, isVarcharType, level + 2))
+              .append(delimiter)
           }
+          builder.substring(0, builder.length - delimiter.length())
+        case r: org.apache.spark.sql.Row =>
+          val delimiter = complexDelimiters.get(level)
           val builder = new StringBuilder()
           for (i <- 0 until r.length) {
-            builder.append(objectToString(r(i), serializationNullFormat, delimiterLevel1,
-              delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1))
+            builder.append(objectToString(r(i), serializationNullFormat, complexDelimiters,
+              timeStampFormat, dateFormat, isVarcharType, level + 1))
               .append(delimiter)
           }
           builder.substring(0, builder.length - delimiter.length())

http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
index 4dcb3ce..e279caf 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.streaming.parser
 
 import java.text.SimpleDateFormat
+import java.util
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
@@ -26,6 +27,8 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.processing.loading
+import org.apache.carbondata.processing.loading.ComplexDelimitersEnum
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 
 /**
@@ -39,9 +42,7 @@ class RowStreamParserImp extends CarbonStreamParser {
 
   var timeStampFormat: SimpleDateFormat = null
   var dateFormat: SimpleDateFormat = null
-  var complexDelimiterLevel1: String = null
-  var complexDelimiterLevel2: String = null
-  var complexDelimiterLevel3: String = null
+  var complexDelimiters: util.ArrayList[String] = new util.ArrayList[String]()
   var serializationNullFormat: String = null
 
   override def initialize(configuration: Configuration, structType: StructType): Unit = {
@@ -53,9 +54,10 @@ class RowStreamParserImp extends CarbonStreamParser {
       this.configuration.get(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT))
     this.dateFormat = new SimpleDateFormat(
       this.configuration.get(CarbonCommonConstants.CARBON_DATE_FORMAT))
-    this.complexDelimiterLevel1 = this.configuration.get("carbon_complex_delimiter_level_1")
-    this.complexDelimiterLevel2 = this.configuration.get("carbon_complex_delimiter_level_2")
-    this.complexDelimiterLevel3 = this.configuration.get("carbon_complex_delimiter_level_3")
+    this.complexDelimiters.add(this.configuration.get("carbon_complex_delimiter_level_1"))
+    this.complexDelimiters.add(this.configuration.get("carbon_complex_delimiter_level_2"))
+    this.complexDelimiters.add(this.configuration.get("carbon_complex_delimiter_level_3"))
+    this.complexDelimiters.add(ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_4.value())
     this.serializationNullFormat =
       this.configuration.get(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
   }
@@ -63,7 +65,7 @@ class RowStreamParserImp extends CarbonStreamParser {
   override def parserRow(value: InternalRow): Array[Object] = {
     this.encoder.fromRow(value).toSeq.map { x => {
       FieldConverter.objectToString(
-        x, serializationNullFormat, complexDelimiterLevel1, complexDelimiterLevel2,
+        x, serializationNullFormat, complexDelimiters,
         timeStampFormat, dateFormat)
     } }.toArray
   }