You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/12/14 17:02:46 UTC

carbondata git commit: [CARBONDATA-3017] Map DDL Support

Repository: carbondata
Updated Branches:
  refs/heads/master ebdd5486e -> 90f63a0cc


[CARBONDATA-3017] Map DDL Support

Support Create DDL for Map type.

This closes #2980


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/90f63a0c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/90f63a0c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/90f63a0c

Branch: refs/heads/master
Commit: 90f63a0cc7e0ba11b9a850cbd19a1a0dd3212e4e
Parents: ebdd548
Author: manishnalla1994 <ma...@gmail.com>
Authored: Tue Oct 16 15:18:08 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Fri Dec 14 22:37:42 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/core/util/CarbonUtil.java |   4 +
 .../hadoop/api/CarbonTableOutputFormat.java     |  17 +-
 .../TestCreateDDLForComplexMapType.scala        | 445 +++++++++++++++++++
 .../LocalDictionarySupportLoadTableTest.scala   |  17 +
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  10 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |   1 +
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  43 +-
 .../streaming/CarbonAppendableStreamSink.scala  |   9 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   4 +-
 .../CarbonAlterTableCompactionCommand.scala     |   6 +
 .../management/CarbonLoadDataCommand.scala      |   6 +-
 .../table/CarbonCreateTableCommand.scala        |   2 +-
 .../spark/util/AllDictionaryTestCase.scala      |   4 +-
 .../util/ExternalColumnDictionaryTestCase.scala |   4 +-
 .../TestStreamingTableWithRowParser.scala       |   3 +-
 .../loading/ComplexDelimitersEnum.java          |  39 ++
 .../loading/DataLoadProcessBuilder.java         |   7 +-
 .../loading/model/CarbonLoadModel.java          |  38 +-
 .../loading/model/CarbonLoadModelBuilder.java   |  11 +-
 .../processing/loading/model/LoadOption.java    |  17 +-
 .../loading/parser/CarbonParserFactory.java     |  25 +-
 .../loading/parser/impl/ArrayParserImpl.java    |   6 +-
 .../loading/parser/impl/MapParserImpl.java      |  60 +++
 .../loading/parser/impl/RowParserImpl.java      |   8 +-
 .../sdk/file/CarbonWriterBuilder.java           |   1 +
 .../streaming/parser/RowStreamParserImp.scala   |   2 +
 26 files changed, 715 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index ac52728..fc4704e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -597,6 +597,10 @@ public final class CarbonUtil {
    */
   public static String delimiterConverter(String delimiter) {
     switch (delimiter) {
+      case "\\001":
+      case "\\002":
+      case "\\003":
+      case "\\004":
       case "|":
       case "*":
       case ".":

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index dbd2f0e..16486d0 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
+import org.apache.carbondata.processing.loading.ComplexDelimitersEnum;
 import org.apache.carbondata.processing.loading.DataLoadExecutor;
 import org.apache.carbondata.processing.loading.TableProcessingOperations;
 import org.apache.carbondata.processing.loading.iterator.CarbonOutputIteratorWrapper;
@@ -338,11 +339,19 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
             SKIP_EMPTY_LINE,
             carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE)));
 
-    String complexDelim = conf.get(COMPLEX_DELIMITERS, "\\\001" + "," + "\\\002");
+    String complexDelim = conf.get(COMPLEX_DELIMITERS);
+    if (null == complexDelim) {
+      complexDelim = ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_1.value() + ","
+          + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_2.value() + ","
+          + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_3.value();
+    }
     String[] split = complexDelim.split(",");
-    model.setComplexDelimiterLevel1(split[0]);
-    if (split.length > 1) {
-      model.setComplexDelimiterLevel2(split[1]);
+    model.setComplexDelimiter(split[0]);
+    if (split.length > 2) {
+      model.setComplexDelimiter(split[1]);
+      model.setComplexDelimiter(split[2]);
+    } else if (split.length > 1) {
+      model.setComplexDelimiter(split[1]);
     }
     model.setDateFormat(
         conf.get(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
new file mode 100644
index 0000000..941364c
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
@@ -0,0 +1,445 @@
+/*
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements. See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License. You may obtain a copy of the License at
+    *
+    http://www.apache.org/licenses/LICENSE-2.0
+    *
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+    */
+package org.apache.carbondata.spark.testsuite.createTable.TestCreateDDLForComplexMapType
+
+import java.io.{BufferedWriter, File, FileWriter}
+import java.util
+
+import au.com.bytecode.opencsv.CSVWriter
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk
+
+import scala.collection.JavaConversions._
+
+class TestCreateDDLForComplexMapType extends QueryTest with BeforeAndAfterAll {
+  private val conf: Configuration = new Configuration(false)
+
+  val rootPath = new File(this.getClass.getResource("/").getPath
+                          + "../../../..").getCanonicalPath
+
+  val path = s"$rootPath/integration/spark-common-test/src/test/resources/maptest2.csv"
+
+  private def checkForLocalDictionary(dimensionRawColumnChunks: util
+  .List[DimensionRawColumnChunk]): Boolean = {
+    var isLocalDictionaryGenerated = false
+    import scala.collection.JavaConversions._
+    isLocalDictionaryGenerated = dimensionRawColumnChunks
+      .filter(dimensionRawColumnChunk => dimensionRawColumnChunk.getDataChunkV3
+        .isSetLocal_dictionary).size > 0
+    isLocalDictionaryGenerated
+  }
+
+  def createCSVFile(): Unit = {
+    val out = new BufferedWriter(new FileWriter(path));
+    val writer = new CSVWriter(out);
+
+    val employee1 = Array("1\u0002Nalla\u00012\u0002Singh\u00011\u0002Gupta\u00014\u0002Kumar")
+
+    val employee2 = Array("10\u0002Nallaa\u000120\u0002Sissngh\u0001100\u0002Gusspta\u000140" +
+                          "\u0002Kumar")
+
+    var listOfRecords = List(employee1, employee2)
+
+    writer.writeAll(listOfRecords)
+    out.close()
+  }
+
+  override def beforeAll(): Unit = {
+    createCSVFile()
+    sql("DROP TABLE IF EXISTS carbon")
+  }
+
+  override def afterAll(): Unit = {
+    new File(path).delete()
+  }
+
+  test("Single Map One Level") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<STRING,STRING>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+    val desc = sql(
+      s"""
+         | Describe Formatted
+         | carbon
+         | """.stripMargin).collect()
+    assert(desc(0).get(1).asInstanceOf[String].trim.equals("map<string,string>"))
+  }
+
+  test("Single Map with Two Nested Level") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<STRING,map<INT,STRING>>
+         | )
+         | STORED BY
+         |'carbondata'
+         |"""
+        .stripMargin)
+    val desc = sql(
+      s"""
+         | Describe Formatted
+         | carbon
+         | """.stripMargin).collect()
+    assert(desc(0).get(1).asInstanceOf[String].trim.equals("map<string,map<int,string>>"))
+  }
+
+  test("Map Type with array type as value") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<STRING,array<INT>>
+         | )
+         | STORED BY 'carbondata'
+         |
+         """
+        .stripMargin)
+    val desc = sql(
+      s"""
+         | Describe Formatted
+         | carbon
+         | """.stripMargin).collect()
+    assert(desc(0).get(1).asInstanceOf[String].trim.equals("map<string,array<int>>"))
+  }
+
+  test("Map Type with struct type as value") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<STRING,struct<key:INT,val:INT>>
+         | )
+         | STORED BY
+         | 'carbondata'
+         | """
+        .stripMargin)
+    val desc = sql(
+      s"""
+         | Describe Formatted
+         | carbon
+         | """.stripMargin).collect()
+    assert(desc(0).get(1).asInstanceOf[String].trim
+      .equals("map<string,struct<key:int,val:int>>"))
+  }
+
+  test("Map Type as child to struct type") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField struct<key:INT,val:map<INT,INT>>
+         | )
+         | STORED BY
+         |'carbondata' """
+        .stripMargin)
+    val desc = sql(
+      s"""
+         | Describe Formatted
+         | carbon
+         | """.stripMargin).collect()
+    assert(desc(0).get(1).asInstanceOf[String].trim
+      .equals("struct<key:int,val:map<int,int>>"))
+  }
+
+  test("Map Type as child to array type") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField array<map<INT,INT>>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+    val desc = sql(
+      s"""
+         | Describe Formatted
+         | carbon """.stripMargin).collect()
+    assert(desc(0).get(1).asInstanceOf[String].trim.equals("array<map<int,int>>"))
+    sql("insert into carbon values('1\0032\0022\0033\001100\003200\002200\003300')")
+    sql("select * from carbon").show(false)
+  }
+
+  test("Test Load data in map") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<INT,STRING>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+    val desc = sql(
+      s"""
+         | Describe Formatted
+         | carbon
+         | """.stripMargin).collect()
+    sql("insert into carbon values('1\002Nalla\0012\002Singh\0013\002Gupta\0014\002Kumar')")
+    checkAnswer(sql("select * from carbon"), Seq(
+      Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar"))))
+  }
+
+  test("Test Load data in map with empty value") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<INT,STRING>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+    val desc = sql(
+      s"""
+         | Describe Formatted
+         | carbon
+         | """.stripMargin).collect()
+    sql("insert into carbon values('1\002Nalla\0012\002\0013\002Gupta\0014\002Kumar')")
+    checkAnswer(sql("select * from carbon"), Seq(
+      Row(Map(1 -> "Nalla", 2 -> "", 3 -> "Gupta", 4 -> "Kumar"))))
+  }
+
+  // Support this for Map type
+  test("Test Load data in map with dictionary include") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<int,STRING>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('DICTIONARY_INCLUDE'='mapField')
+         | """
+        .stripMargin)
+    sql("insert into carbon values('1\002Nalla\0012\002Singh\0013\002Gupta')")
+    sql("select * from carbon").show(false)
+    //checkAnswer(sql("select * from carbon"), Seq(
+    //Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar"))))
+  }
+
+  test("Test Load data in map with partition columns") {
+    sql("DROP TABLE IF EXISTS carbon")
+    val exception = intercept[AnalysisException](
+      sql(
+        s"""
+           | CREATE TABLE carbon(
+           | a INT,
+           | mapField array<STRING>,
+           | b STRING
+           | )
+           | PARTITIONED BY (mp map<int,string>)
+           | STORED BY 'carbondata'
+           | """
+          .stripMargin)
+    )
+    assertResult("Cannot use map<int,string> for partition column;")(exception.getMessage())
+  }
+
+  test("Test IUD in map columns") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | a INT,
+         | mapField map<INT,STRING>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+    sql("insert into carbon values(1,'1\002Nalla\0012\002Singh\0013\002Gupta\0014\002Kumar')")
+    sql("insert into carbon values(2,'1\002abc\0012\002xyz\0013\002hello\0014\002mno')")
+    val exception = intercept[UnsupportedOperationException](
+      sql("update carbon set(mapField)=('1,haha') where a=1").show(false))
+    assertResult("Unsupported operation on Complex data type")(exception.getMessage())
+    sql("delete from carbon where mapField[1]='abc'")
+    checkAnswer(sql("select * from carbon"), Seq(
+      Row(1, Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar"))))
+
+  }
+
+  test("Test Compaction blocking") {
+    sql("DROP TABLE IF EXISTS carbon")
+
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | a INT,
+         | mapField map<INT,STRING>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+
+    val exception = intercept[UnsupportedOperationException](
+      sql("ALTER table carbon compact 'minor'")
+    )
+    assertResult("Compaction is unsupported for Table containing Map Columns")(exception
+      .getMessage())
+  }
+
+  test("Test Load duplicate keys data in map") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<INT,STRING>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+    val desc = sql(
+      s"""
+         | Describe Formatted
+         | carbon
+         | """.stripMargin).collect()
+    sql("insert into carbon values('1\002Nalla\0012\002Singh\0011\002Gupta\0014\002Kumar')")
+    checkAnswer(sql("select * from carbon"), Seq(
+      Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar"))))
+  }
+
+  test("Test Load data in map of map") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<STRING,map<INT,STRING>>
+         | )
+         | STORED BY
+         |'carbondata' """
+        .stripMargin)
+    sql(
+      "insert into carbon values('manish\0021\004nalla\0032\004gupta\001kunal\0021\004kapoor\0032" +
+      "\004sharma')")
+    checkAnswer(sql("select * from carbon"), Seq(
+      Row(Map("manish" -> Map(1 -> "nalla", 2 -> "gupta"),
+        "kunal" -> Map(1 -> "kapoor", 2 -> "sharma")))))
+  }
+
+  test("Test Load duplicate keys data in map of map") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<STRING,map<INT,STRING>>
+         | )
+         | STORED BY
+         |'carbondata'
+         |"""
+        .stripMargin)
+    sql(
+      "insert into carbon values('manish\0021\004nalla\0031\004gupta\001kunal\0021\004kapoor\0032" +
+      "\004sharma')")
+    checkAnswer(sql("select * from carbon"), Seq(
+      Row(Map("manish" -> Map(1 -> "nalla"),
+        "kunal" -> Map(1 -> "kapoor", 2 -> "sharma")))))
+  }
+
+  test("Test Create table as select with map") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql("DROP TABLE IF EXISTS carbon1")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<INT,STRING>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+    sql("insert into carbon values('1\002Nalla\0012\002Singh\0013\002Gupta\0014\002Kumar')")
+    sql(
+      s"""
+         | CREATE TABLE carbon1
+         | AS
+         | Select *
+         | From carbon
+         | """
+        .stripMargin)
+    checkAnswer(sql("select * from carbon1"), Seq(
+      Row(Map(1 -> "Nalla", 2 -> "Singh", 3 -> "Gupta", 4 -> "Kumar"))))
+  }
+
+  test("Test Create table with double datatype in map") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<DOUBLE,STRING>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+    sql(
+      "insert into carbon values('1.23\002Nalla\0012.34\002Singh\0013.67676\002Gupta\0013.67676" +
+      "\002Kumar')")
+    checkAnswer(sql("select * from carbon"), Seq(
+      Row(Map(1.23 -> "Nalla", 2.34 -> "Singh", 3.67676 -> "Gupta"))))
+  }
+
+  test("Load Map data from CSV File") {
+    sql("DROP TABLE IF EXISTS carbon")
+    sql(
+      s"""
+         | CREATE TABLE carbon(
+         | mapField map<INT,STRING>
+         | )
+         | STORED BY 'carbondata'
+         | """
+        .stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE carbon OPTIONS(
+         | 'header' = 'false')
+       """.stripMargin)
+    checkAnswer(sql("select * from carbon"), Seq(
+      Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar")),
+      Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar"))
+    ))
+  }
+
+  test("Sort Column table property blocking for Map type") {
+    sql("DROP TABLE IF EXISTS carbon")
+    val exception1 = intercept[Exception] {
+      sql(
+        s"""
+           | CREATE TABLE carbon(
+           | mapField map<STRING,STRING>
+           | )
+           | STORED BY 'carbondata'
+           | TBLPROPERTIES('SORT_COLUMNS'='mapField')
+           | """
+          .stripMargin)
+    }
+    assert(exception1.getMessage
+      .contains(
+        "sort_columns is unsupported for map datatype column: mapfield"))
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
index d23c844..d332a5a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportLoadTableTest.scala
@@ -21,6 +21,7 @@ import java.io.{File, PrintWriter}
 import java.util
 import java.util.Collections
 
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, Ignore}
 
@@ -136,6 +137,22 @@ class LocalDictionarySupportLoadTableTest extends QueryTest with BeforeAndAfterA
     assert(checkForLocalDictionary(getDimRawChunk(2)))
   }
 
+  test("test local dictionary generation for map type") {
+    sql("drop table if exists local2")
+    sql(
+      "CREATE TABLE local2(name map<string,string>) STORED BY 'carbondata' tblproperties" +
+      "('local_dictionary_enable'='true','local_dictionary_include'='name')")
+    sql(
+      "insert into local2 values('Manish\002Nalla\001Manish\002Gupta\001Shardul\002Singh" +
+      "\001Vishal\002Kumar')")
+    checkAnswer(sql("select * from local2"), Seq(
+      Row(Map("Manish" -> "Nalla", "Shardul" -> "Singh", "Vishal" -> "Kumar"))))
+    assert(!checkForLocalDictionary(getDimRawChunk(0)))
+    assert(!checkForLocalDictionary(getDimRawChunk(1)))
+    assert(checkForLocalDictionary(getDimRawChunk(2)))
+    assert(checkForLocalDictionary(getDimRawChunk(3)))
+  }
+
   test("test local dictionary data validation") {
     sql("drop table if exists local_query_enable")
     sql("drop table if exists local_query_disable")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 3752eef..f7249b8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -339,8 +339,9 @@ class NewRddIterator(rddIter: Iterator[Row],
   private val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
     .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
   private val dateFormat = new SimpleDateFormat(dateFormatString)
-  private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
-  private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+  private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
+  private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
+  private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2)
   private val serializationNullFormat =
     carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
   import scala.collection.JavaConverters._
@@ -388,8 +389,9 @@ class LazyRddIterator(serializer: SerializerInstance,
     .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
       CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
   private val dateFormat = new SimpleDateFormat(dateFormatString)
-  private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
-  private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+  private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
+  private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
+  private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2)
   private val serializationNullFormat =
     carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
   // the order of fields in dataframe and createTable may be different, here we need to know whether

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 71447e9..ca9b4af 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -643,6 +643,7 @@ object CarbonScalaUtil {
                      !x.dataType.get.equalsIgnoreCase("STRING") &&
                      !x.dataType.get.equalsIgnoreCase("VARCHAR") &&
                      !x.dataType.get.equalsIgnoreCase("STRUCT") &&
+                     !x.dataType.get.equalsIgnoreCase("MAP") &&
                      !x.dataType.get.equalsIgnoreCase("ARRAY"))) {
         val errormsg = "LOCAL_DICTIONARY_INCLUDE/LOCAL_DICTIONARY_EXCLUDE column: " +
                        dictColm.trim +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 3ac2d2b..c971573 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -114,6 +114,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val MULTILINE = carbonKeyWord("MULTILINE")
   protected val COMPLEX_DELIMITER_LEVEL_1 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_1")
   protected val COMPLEX_DELIMITER_LEVEL_2 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_2")
+  protected val COMPLEX_DELIMITER_LEVEL_3 = carbonKeyWord("COMPLEX_DELIMITER_LEVEL_3")
   protected val OPTIONS = carbonKeyWord("OPTIONS")
   protected val OUTPATH = carbonKeyWord("OUTPATH")
   protected val OVERWRITE = carbonKeyWord("OVERWRITE")
@@ -915,7 +916,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
    * @param dimensionDatatype
    */
   def isDetectAsDimentionDatatype(dimensionDatatype: String): Boolean = {
-    val dimensionType = Array("string", "array", "struct", "timestamp", "date", "char")
+    val dimensionType = Array("string", "array", "struct", "map", "timestamp", "date", "char")
     dimensionType.exists(x => dimensionDatatype.toLowerCase.contains(x))
   }
 
@@ -1070,13 +1071,32 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
 
     // validate with all supported options
     val options = optionList.get.groupBy(x => x._1)
-    val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
-      "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
-      "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
-      "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", "BAD_RECORD_PATH",
-      "BATCH_SORT_SIZE_INMB", "GLOBAL_SORT_PARTITIONS", "SINGLE_PASS",
-      "IS_EMPTY_DATA_BAD_RECORD", "HEADER", "TIMESTAMPFORMAT", "SKIP_EMPTY_LINE",
-      "SORT_COLUMN_BOUNDS", "LOAD_MIN_SIZE_INMB"
+    val supportedOptions = Seq("DELIMITER",
+      "QUOTECHAR",
+      "FILEHEADER",
+      "ESCAPECHAR",
+      "MULTILINE",
+      "COMPLEX_DELIMITER_LEVEL_1",
+      "COMPLEX_DELIMITER_LEVEL_2",
+      "COMPLEX_DELIMITER_LEVEL_3",
+      "COLUMNDICT",
+      "SERIALIZATION_NULL_FORMAT",
+      "BAD_RECORDS_LOGGER_ENABLE",
+      "BAD_RECORDS_ACTION",
+      "ALL_DICTIONARY_PATH",
+      "MAXCOLUMNS",
+      "COMMENTCHAR",
+      "DATEFORMAT",
+      "BAD_RECORD_PATH",
+      "BATCH_SORT_SIZE_INMB",
+      "GLOBAL_SORT_PARTITIONS",
+      "SINGLE_PASS",
+      "IS_EMPTY_DATA_BAD_RECORD",
+      "HEADER",
+      "TIMESTAMPFORMAT",
+      "SKIP_EMPTY_LINE",
+      "SORT_COLUMN_BOUNDS",
+      "LOAD_MIN_SIZE_INMB"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder
@@ -1291,13 +1311,16 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
         Field("unknown", Some("struct"), Some("unknown"), Some(e1))
     }
 
+  //  Map<Key,Value> is represented as Map<Struct<Key,Value>>
   protected lazy val mapFieldType: Parser[Field] =
     (MAP ^^^ "map") ~> "<" ~> primitiveFieldType ~ ("," ~> nestedType) <~ ">" ^^ {
       case key ~ value =>
         Field("unknown", Some("map"), Some("unknown"),
           Some(List(
-            Field("key", key.dataType, Some("key"), key.children),
-            Field("value", value.dataType, Some("value"), value.children))))
+            Field("val", Some("struct"), Some("unknown"),
+              Some(List(
+                Field("key", key.dataType, Some("key"), key.children),
+                Field("value", value.dataType, Some("value"), value.children)))))))
     }
 
   protected lazy val measureCol: Parser[Field] =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 3d8170e..184cc1d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.util
 import java.util.Date
 
 import scala.collection.JavaConverters._
@@ -77,10 +78,10 @@ class CarbonAppendableStreamSink(
       conf.set(entry._1, entry._2)
     }
     // properties below will be used for default CarbonStreamParser
-    conf.set("carbon_complex_delimiter_level_1",
-      carbonLoadModel.getComplexDelimiterLevel1)
-    conf.set("carbon_complex_delimiter_level_2",
-      carbonLoadModel.getComplexDelimiterLevel2)
+    val complexDelimiters = carbonLoadModel.getComplexDelimiters
+    conf.set("carbon_complex_delimiter_level_1", complexDelimiters.get(0))
+    conf.set("carbon_complex_delimiter_level_2", complexDelimiters.get(1))
+    conf.set("carbon_complex_delimiter_level_3", complexDelimiters.get(2))
     conf.set(
       DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
       carbonLoadModel.getSerializationNullFormat().split(",")(1))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index b07be72..a849c99 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -989,8 +989,8 @@ object CarbonDataRDDFactory {
     // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
     val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
       // input data from DataFrame
-      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
-      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+      val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
+      val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
       val serializationNullFormat =
         carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
       dataFrame.get.rdd.map { row =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index f98c0cf..6edfcf4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -87,6 +88,11 @@ case class CarbonAlterTableCompactionCommand(
     if (!table.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
+    if (table.getTableInfo.getFactTable.getListOfColumns.asScala
+      .exists((m => DataTypes.isMapType(m.getDataType)))) {
+      throw new UnsupportedOperationException(
+        "Compaction is unsupported for Table containing Map Columns")
+    }
     if (CarbonUtil.hasAggregationDataMap(table) ||
         (table.isChildDataMap && null == operationContext.getProperty(table.getTableName))) {
       // If the compaction request is of 'streaming' type then we need to generate loadCommands

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 6a8a9cf..7a974e3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -69,7 +69,7 @@ import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
-import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.{ComplexDelimitersEnum, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
@@ -188,11 +188,13 @@ case class CarbonLoadDataCommand(
     val carbonLoadModel = new CarbonLoadModel()
     val tableProperties = table.getTableInfo.getFactTable.getTableProperties
     val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
+    optionsFinal
+      .put("complex_delimiter_level_4",
+        ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_4.value())
     optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
       carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
         carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
           CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
-
       optionsFinal
         .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
       val factPath = if (dataFrame.isDefined) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index ca39931..713561b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -128,7 +128,7 @@ case class CarbonCreateTableCommand(
             if (partitionInfo != null &&
                 partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
               // Restrict dictionary encoding on partition columns.
-              // TODO Need to decide wherher it is required
+              // TODO Need to decide whether it is required
               val dictionaryOnPartitionColumn =
               partitionInfo.getColumnSchemaList.asScala.exists{p =>
                 p.hasEncoding(Encoding.DICTIONARY) && !p.hasEncoding(Encoding.DIRECT_DICTIONARY)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 08c149b..ed5486b 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -55,8 +55,8 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     carbonLoadModel.setFactFilePath(filePath)
     carbonLoadModel.setCsvHeader(header)
     carbonLoadModel.setCsvDelimiter(",")
-    carbonLoadModel.setComplexDelimiterLevel1("$")
-    carbonLoadModel.setComplexDelimiterLevel2(":")
+    carbonLoadModel.setComplexDelimiter("$")
+    carbonLoadModel.setComplexDelimiter(":")
     carbonLoadModel.setAllDictPath(allDictFilePath)
     carbonLoadModel.setSerializationNullFormat(
           TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 060afca..69248d6 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -165,8 +165,8 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     carbonLoadModel.setFactFilePath(filePath)
     carbonLoadModel.setCsvHeader(header)
     carbonLoadModel.setCsvDelimiter(csvDelimiter)
-    carbonLoadModel.setComplexDelimiterLevel1("$")
-    carbonLoadModel.setComplexDelimiterLevel2(":")
+    carbonLoadModel.setComplexDelimiter("$")
+    carbonLoadModel.setComplexDelimiter(":")
     carbonLoadModel.setColDictFilePath(extColFilePath)
     carbonLoadModel.setQuoteChar("\"");
     carbonLoadModel.setSerializationNullFormat(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 985b9d9..21cad07 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -42,7 +42,6 @@ case class StreamData(id: Integer, name: String, city: String, salary: java.lang
     register: String, updated: String,
     file: FileElement)
 
-@Ignore
 class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
 
   private val spark = sqlContext.sparkSession
@@ -420,7 +419,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
       continueSeconds = 20,
       generateBadRecords = true,
       badRecordAction = "force",
-      autoHandoff = true
+      autoHandoff = false
     )
 
     // non-filter

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/processing/src/main/java/org/apache/carbondata/processing/loading/ComplexDelimitersEnum.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/ComplexDelimitersEnum.java b/processing/src/main/java/org/apache/carbondata/processing/loading/ComplexDelimitersEnum.java
new file mode 100644
index 0000000..bc196e1
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/ComplexDelimitersEnum.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+public enum ComplexDelimitersEnum {
+
+  COMPLEX_DELIMITERS_LEVEL_1("\001"),
+
+  COMPLEX_DELIMITERS_LEVEL_2("\002"),
+
+  COMPLEX_DELIMITERS_LEVEL_3("\003"),
+
+  COMPLEX_DELIMITERS_LEVEL_4("\004");
+
+  private String value;
+
+  ComplexDelimitersEnum(String value) {
+    this.value = value;
+  }
+
+  public String value() {
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 89d09fe..6fe89a2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -221,9 +221,10 @@ public final class DataLoadProcessBuilder {
     configuration.setHeader(loadModel.getCsvHeaderColumns());
     configuration.setSegmentId(loadModel.getSegmentId());
     configuration.setTaskNo(loadModel.getTaskNo());
-    configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS,
-        new String[] { loadModel.getComplexDelimiterLevel1(),
-            loadModel.getComplexDelimiterLevel2() });
+    String[] complexDelimiters = new String[loadModel.getComplexDelimiters().size()];
+    loadModel.getComplexDelimiters().toArray(complexDelimiters);
+    configuration
+        .setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS, complexDelimiters);
     configuration.setDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
         loadModel.getSerializationNullFormat().split(",")[1]);
     configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index e15fb5d..aecc52e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -65,8 +65,7 @@ public class CarbonLoadModel implements Serializable {
   private String csvHeader;
   private String[] csvHeaderColumns;
   private String csvDelimiter;
-  private String complexDelimiterLevel1;
-  private String complexDelimiterLevel2;
+  private ArrayList<String> complexDelimiters;
 
   private List<LoadMetadataDetails> loadMetadataDetails;
   private transient SegmentUpdateStatusManager segmentUpdateStatusManager;
@@ -276,20 +275,14 @@ public class CarbonLoadModel implements Serializable {
     this.csvDelimiter = csvDelimiter;
   }
 
-  public String getComplexDelimiterLevel1() {
-    return complexDelimiterLevel1;
+  public void setComplexDelimiter(String delimiter) {
+    checkAndInitializeComplexDelimiterList();
+    this.complexDelimiters.add(delimiter);
   }
 
-  public void setComplexDelimiterLevel1(String complexDelimiterLevel1) {
-    this.complexDelimiterLevel1 = complexDelimiterLevel1;
-  }
-
-  public String getComplexDelimiterLevel2() {
-    return complexDelimiterLevel2;
-  }
-
-  public void setComplexDelimiterLevel2(String complexDelimiterLevel2) {
-    this.complexDelimiterLevel2 = complexDelimiterLevel2;
+  public ArrayList<String> getComplexDelimiters() {
+    checkAndInitializeComplexDelimiterList();
+    return complexDelimiters;
   }
 
   public String getAllDictPath() {
@@ -441,8 +434,7 @@ public class CarbonLoadModel implements Serializable {
     copy.csvHeader = csvHeader;
     copy.csvHeaderColumns = csvHeaderColumns;
     copy.csvDelimiter = csvDelimiter;
-    copy.complexDelimiterLevel1 = complexDelimiterLevel1;
-    copy.complexDelimiterLevel2 = complexDelimiterLevel2;
+    copy.complexDelimiters = complexDelimiters;
     copy.carbonDataLoadSchema = carbonDataLoadSchema;
     copy.blocksID = blocksID;
     copy.taskNo = taskNo;
@@ -500,8 +492,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.csvHeader = header;
     copyObj.csvHeaderColumns = csvHeaderColumns;
     copyObj.csvDelimiter = delimiter;
-    copyObj.complexDelimiterLevel1 = complexDelimiterLevel1;
-    copyObj.complexDelimiterLevel2 = complexDelimiterLevel2;
+    copyObj.complexDelimiters = complexDelimiters;
     copyObj.blocksID = blocksID;
     copyObj.taskNo = taskNo;
     copyObj.factTimeStamp = factTimeStamp;
@@ -631,7 +622,16 @@ public class CarbonLoadModel implements Serializable {
   }
 
   public String[] getDelimiters() {
-    return new String[] { complexDelimiterLevel1, complexDelimiterLevel2 };
+    checkAndInitializeComplexDelimiterList();
+    String[] delimiters = new String[complexDelimiters.size()];
+    delimiters = complexDelimiters.toArray(delimiters);
+    return delimiters;
+  }
+
+  private void checkAndInitializeComplexDelimiterList() {
+    if (null == complexDelimiters) {
+      complexDelimiters = new ArrayList<>();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 4a29304..d02348d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -164,6 +164,8 @@ public class CarbonLoadModelBuilder {
     String delimeter = optionsFinal.get("delimiter");
     String complex_delimeter_level1 = optionsFinal.get("complex_delimiter_level_1");
     String complex_delimeter_level2 = optionsFinal.get("complex_delimiter_level_2");
+    String complex_delimeter_level3 = optionsFinal.get("complex_delimiter_level_3");
+    String complex_delimeter_level4 = optionsFinal.get("complex_delimiter_level_4");
     String all_dictionary_path = optionsFinal.get("all_dictionary_path");
     String column_dict = optionsFinal.get("columndict");
     validateDateTimeFormat(timestampformat, "TimestampFormat");
@@ -257,11 +259,14 @@ public class CarbonLoadModelBuilder {
 
     if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
         complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
-        delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
+        delimeter.equalsIgnoreCase(complex_delimeter_level2) ||
+        delimeter.equalsIgnoreCase(complex_delimeter_level3)) {
       throw new InvalidLoadOptionException("Field Delimiter and Complex types delimiter are same");
     } else {
-      carbonLoadModel.setComplexDelimiterLevel1(complex_delimeter_level1);
-      carbonLoadModel.setComplexDelimiterLevel2(complex_delimeter_level2);
+      carbonLoadModel.setComplexDelimiter(complex_delimeter_level1);
+      carbonLoadModel.setComplexDelimiter(complex_delimeter_level2);
+      carbonLoadModel.setComplexDelimiter(complex_delimeter_level3);
+      carbonLoadModel.setComplexDelimiter(complex_delimeter_level4);
     }
     // set local dictionary path, and dictionary file extension
     carbonLoadModel.setAllDictPath(all_dictionary_path);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index 2c5fa8b..759cf04 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.ComplexDelimitersEnum;
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 import org.apache.carbondata.processing.util.CarbonLoaderUtil;
@@ -111,13 +112,17 @@ public class LoadOption {
         "all_dictionary_path",
         Maps.getOrDefault(options, "all_dictionary_path", ""));
 
-    optionsFinal.put(
-        "complex_delimiter_level_1",
-        Maps.getOrDefault(options,"complex_delimiter_level_1", "\\\001"));
+    optionsFinal.put("complex_delimiter_level_1",
+        Maps.getOrDefault(options, "complex_delimiter_level_1",
+            ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_1.value()));
 
-    optionsFinal.put(
-        "complex_delimiter_level_2",
-        Maps.getOrDefault(options, "complex_delimiter_level_2", "\\\002"));
+    optionsFinal.put("complex_delimiter_level_2",
+        Maps.getOrDefault(options, "complex_delimiter_level_2",
+            ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_2.value()));
+
+    optionsFinal.put("complex_delimiter_level_3",
+        Maps.getOrDefault(options, "complex_delimiter_level_3",
+            ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_3.value()));
 
     optionsFinal.put(
         "dateformat",

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
index 3964869..6ffea4f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/CarbonParserFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.processing.loading.parser;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -23,6 +24,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.processing.loading.parser.impl.ArrayParserImpl;
+import org.apache.carbondata.processing.loading.parser.impl.MapParserImpl;
 import org.apache.carbondata.processing.loading.parser.impl.PrimitiveParserImpl;
 import org.apache.carbondata.processing.loading.parser.impl.StructParserImpl;
 
@@ -35,7 +37,8 @@ public final class CarbonParserFactory {
    * @param complexDelimiters
    * @return
    */
-  public static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
+  public static GenericParser createParser(CarbonColumn carbonColumn,
+      ArrayList<String> complexDelimiters,
       String nullFormat) {
     return createParser(carbonColumn, complexDelimiters, nullFormat, 0);
   }
@@ -51,23 +54,33 @@ public final class CarbonParserFactory {
    *                           delimiters
    * @return GenericParser
    */
-  private static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
-      String nullFormat, int depth) {
+  private static GenericParser createParser(CarbonColumn carbonColumn,
+      ArrayList<String> complexDelimiters, String nullFormat, int depth) {
     DataType dataType = carbonColumn.getDataType();
-    if (DataTypes.isArrayType(dataType) || DataTypes.isMapType(dataType)) {
+    if (DataTypes.isArrayType(dataType)) {
       List<CarbonDimension> listOfChildDimensions =
           ((CarbonDimension) carbonColumn).getListOfChildDimensions();
       // Create array parser with complex delimiter
-      ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[depth], nullFormat);
+      ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters.get(depth), nullFormat);
       for (CarbonDimension dimension : listOfChildDimensions) {
         arrayParser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
       }
       return arrayParser;
+    } else if (DataTypes.isMapType(dataType)) {
+      List<CarbonDimension> listOfChildDimensions =
+          ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+      // Create map parser with complex delimiter and key-value delimiter
+      MapParserImpl mapParser = new MapParserImpl(complexDelimiters.get(depth), nullFormat,
+          complexDelimiters.get(depth + 1));
+      for (CarbonDimension dimension : listOfChildDimensions) {
+        mapParser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
+      }
+      return mapParser;
     } else if (DataTypes.isStructType(dataType)) {
       List<CarbonDimension> dimensions =
           ((CarbonDimension) carbonColumn).getListOfChildDimensions();
       // Create struct parser with complex delimiter
-      StructParserImpl parser = new StructParserImpl(complexDelimiters[depth], nullFormat);
+      StructParserImpl parser = new StructParserImpl(complexDelimiters.get(depth), nullFormat);
       for (CarbonDimension dimension : dimensions) {
         parser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
index c56691a..c27f0fa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/ArrayParserImpl.java
@@ -32,11 +32,11 @@ import org.apache.commons.lang.ArrayUtils;
  */
 public class ArrayParserImpl implements ComplexParser<ArrayObject> {
 
-  private Pattern pattern;
+  protected Pattern pattern;
 
-  private GenericParser child;
+  protected GenericParser child;
 
-  private String nullFormat;
+  protected String nullFormat;
 
   public ArrayParserImpl(String delimiter, String nullFormat) {
     pattern = Pattern.compile(CarbonUtil.delimiterConverter(delimiter));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
new file mode 100644
index 0000000..e6814f8
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/MapParserImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.parser.impl;
+
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
+
+import org.apache.commons.lang.ArrayUtils;
+
+
+public class MapParserImpl extends ArrayParserImpl {
+
+  private String keyValueDelimiter;
+
+  public MapParserImpl(String delimiter, String nullFormat, String keyValueDelimiter) {
+    super(delimiter, nullFormat);
+    this.keyValueDelimiter = keyValueDelimiter;
+  }
+
+  //The Key for Map will always be a PRIMITIVE type so Set<Object> here will work fine
+  //Only the first occurance of key will be added and the remaining will be skipped/ignored
+  @Override public ArrayObject parse(Object data) {
+    if (data != null) {
+      String value = data.toString();
+      if (!value.isEmpty() && !value.equals(nullFormat)) {
+        String[] split = pattern.split(value, -1);
+        if (ArrayUtils.isNotEmpty(split)) {
+          ArrayList<Object> array = new ArrayList<>();
+          Set<Object> set = new HashSet<>();
+          for (int i = 0; i < split.length; i++) {
+            Object currKey = split[i].split(keyValueDelimiter)[0];
+            if (set.add(currKey)) {
+              array.add(child.parse(split[i]));
+            }
+          }
+          return new ArrayObject(array.toArray());
+        }
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
index 00d8420..d0fe30b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java
@@ -16,6 +16,8 @@
  */
 package org.apache.carbondata.processing.loading.parser.impl;
 
+import java.util.ArrayList;
+
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
@@ -34,8 +36,12 @@ public class RowParserImpl implements RowParser {
   private int numberOfColumns;
 
   public RowParserImpl(DataField[] output, CarbonDataLoadConfiguration configuration) {
-    String[] complexDelimiters =
+    String[] tempComplexDelimiters =
         (String[]) configuration.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS);
+    ArrayList<String> complexDelimiters = new ArrayList<>();
+    for (int i = 0; i < tempComplexDelimiters.length; i++) {
+      complexDelimiters.add(tempComplexDelimiters[i]);
+    }
     String nullFormat =
         configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
             .toString();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index c9adcdf..1241504 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -184,6 +184,7 @@ public class CarbonWriterBuilder {
           !option.equalsIgnoreCase("timestampformat") &&
           !option.equalsIgnoreCase("complex_delimiter_level_1") &&
           !option.equalsIgnoreCase("complex_delimiter_level_2") &&
+          !option.equalsIgnoreCase("complex_delimiter_level_3") &&
           !option.equalsIgnoreCase("quotechar") &&
           !option.equalsIgnoreCase("escapechar")) {
         throw new IllegalArgumentException("Unsupported option:" + option

http://git-wip-us.apache.org/repos/asf/carbondata/blob/90f63a0c/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
index 5a888ef..4dcb3ce 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
@@ -41,6 +41,7 @@ class RowStreamParserImp extends CarbonStreamParser {
   var dateFormat: SimpleDateFormat = null
   var complexDelimiterLevel1: String = null
   var complexDelimiterLevel2: String = null
+  var complexDelimiterLevel3: String = null
   var serializationNullFormat: String = null
 
   override def initialize(configuration: Configuration, structType: StructType): Unit = {
@@ -54,6 +55,7 @@ class RowStreamParserImp extends CarbonStreamParser {
       this.configuration.get(CarbonCommonConstants.CARBON_DATE_FORMAT))
     this.complexDelimiterLevel1 = this.configuration.get("carbon_complex_delimiter_level_1")
     this.complexDelimiterLevel2 = this.configuration.get("carbon_complex_delimiter_level_2")
+    this.complexDelimiterLevel3 = this.configuration.get("carbon_complex_delimiter_level_3")
     this.serializationNullFormat =
       this.configuration.get(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
   }