You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2019/09/06 07:48:14 UTC

[carbondata] branch master updated: [CARBONDATA-3497] Support to write long string for streaming table

This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new a42f2e4  [CARBONDATA-3497] Support to write long string for streaming table
a42f2e4 is described below

commit a42f2e42017a04bf76f9cded0c929c29dec41abe
Author: Zhang Zhichao <44...@qq.com>
AuthorDate: Tue Aug 27 11:32:48 2019 +0800

    [CARBONDATA-3497] Support to write long string for streaming table
    
    This closes #3366
---
 .../hadoop/stream/StreamRecordReader.java          |  19 +-
 .../resources/streamSample_with_long_string.csv    |   6 +
 .../streaming/CarbonAppendableStreamSink.scala     |  19 +-
 .../converter/SparkDataTypeConverterImpl.java      |   6 +-
 .../TestStreamingTableWithLongString.scala         | 649 +++++++++++++++++++++
 .../streaming/CarbonStreamRecordWriter.java        |  11 +-
 .../streaming/parser/CSVStreamParserImp.java       |   5 +-
 .../streaming/parser/CarbonStreamParser.java       |   3 +-
 .../streaming/parser/RowStreamParserImp.scala      |  11 +-
 9 files changed, 715 insertions(+), 14 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
index 75e36be..1e40baa 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
@@ -81,6 +81,7 @@ public class StreamRecordReader extends RecordReader<Void, Object> {
   protected CarbonTable carbonTable;
   private CarbonColumn[] storageColumns;
   private boolean[] isRequired;
+  private boolean[] dimensionsIsVarcharTypeMap;
   private DataType[] measureDataTypes;
   private int dimensionCount;
   private int measureCount;
@@ -163,6 +164,10 @@ public class StreamRecordReader extends RecordReader<Void, Object> {
             .getDirectDictionaryGenerator(storageColumns[i].getDataType());
       }
     }
+    dimensionsIsVarcharTypeMap = new boolean[dimensionCount];
+    for (int i = 0; i < dimensionCount; i++) {
+      dimensionsIsVarcharTypeMap[i] = storageColumns[i].getDataType() == DataTypes.VARCHAR;
+    }
     measureDataTypes = new DataType[measureCount];
     for (int i = 0; i < measureCount; i++) {
       measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType();
@@ -387,7 +392,12 @@ public class StreamRecordReader extends RecordReader<Void, Object> {
         }
       } else {
         if (isNoDictColumn[colCount]) {
-          int v = input.readShort();
+          int v = 0;
+          if (dimensionsIsVarcharTypeMap[colCount]) {
+            v = input.readInt();
+          } else {
+            v = input.readShort();
+          }
           if (isRequired[colCount]) {
             byte[] b = input.readBytes(v);
             if (isFilterRequired[colCount]) {
@@ -561,7 +571,12 @@ public class StreamRecordReader extends RecordReader<Void, Object> {
         outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
       } else {
         if (isNoDictColumn[colCount]) {
-          int v = input.readShort();
+          int v = 0;
+          if (dimensionsIsVarcharTypeMap[colCount]) {
+            v = input.readInt();
+          } else {
+            v = input.readShort();
+          }
           outputValues[colCount] = input.readBytes(v);
         } else {
           outputValues[colCount] = input.readInt();
diff --git a/integration/spark-common-test/src/test/resources/streamSample_with_long_string.csv b/integration/spark-common-test/src/test/resources/streamSample_with_long_string.csv
new file mode 100644
index 0000000..b010c07
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/streamSample_with_long_string.csv
@@ -0,0 +1,6 @@
+id,name,city,salary,tax,percent,birthday,register,updated,longstr,file
+100000001,batch_1,city_1,0.1,0.01,80.01,1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01,1abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...]
+100000002,batch_2,city_2,0.2,0.02,80.02,1990-01-02,2010-01-02 10:01:01,2010-01-02 10:01:01,2abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...]
+100000003,batch_3,city_3,0.3,0.03,80.03,1990-01-03,2010-01-03 10:01:01,2010-01-03 10:01:01,3abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...]
+100000004,batch_4,city_4,0.4,0.04,80.04,1990-01-04,2010-01-04 10:01:01,2010-01-04 10:01:01,4abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...]
+100000005,batch_5,city_5,0.5,0.05,80.05,1990-01-05,2010-01-05 10:01:01,2010-01-05 10:01:01,5abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...]
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 90132ff..4440e3a 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.stats.QueryStatistic
 import org.apache.carbondata.core.util.CarbonProperties
@@ -261,6 +262,15 @@ object CarbonAppendableStreamSink {
         }
 
         val rowSchema = queryExecution.analyzed.schema
+        val isVarcharTypeMapping = {
+          val col2VarcharType = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+            .getCreateOrderColumn(carbonLoadModel.getTableName).asScala
+            .map(c => c.getColName -> (c.getDataType == DataTypes.VARCHAR)).toMap
+          rowSchema.fieldNames.map(c => {
+            val r = col2VarcharType.get(c.toLowerCase)
+            r.isDefined && r.get
+          })
+        }
         // write data file
         result = sparkSession.sparkContext.runJob(queryExecution.toRdd,
           (taskContext: TaskContext, iterator: Iterator[InternalRow]) => {
@@ -272,7 +282,8 @@ object CarbonAppendableStreamSink {
               sparkAttemptNumber = taskContext.attemptNumber(),
               committer,
               iterator,
-              rowSchema
+              rowSchema,
+              isVarcharTypeMapping
             )
           })
 
@@ -319,7 +330,8 @@ object CarbonAppendableStreamSink {
       sparkAttemptNumber: Int,
       committer: FileCommitProtocol,
       iterator: Iterator[InternalRow],
-      rowSchema: StructType): (TaskCommitMessage, StreamFileIndex) = {
+      rowSchema: StructType,
+      isVarcharTypeMapping: Array[Boolean]): (TaskCommitMessage, StreamFileIndex) = {
 
     val jobId = CarbonInputFormatUtil.getJobId(new Date, sparkStageId)
     val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
@@ -350,7 +362,8 @@ object CarbonAppendableStreamSink {
 
         val streamParser =
           Class.forName(parserName).newInstance.asInstanceOf[CarbonStreamParser]
-        streamParser.initialize(taskAttemptContext.getConfiguration, rowSchema)
+        streamParser.initialize(taskAttemptContext.getConfiguration,
+            rowSchema, isVarcharTypeMapping)
 
         blockIndex = StreamSegment.appendBatchData(new InputIterator(iterator, streamParser),
           taskAttemptContext, carbonLoadModel)
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
index 41b378d..4db1154 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
@@ -114,7 +114,8 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri
 
   private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
       DataType carbonDataType) {
-    if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
+    if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING
+        || carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.VARCHAR) {
       return DataTypes.StringType;
     } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
       return DataTypes.ShortType;
@@ -170,7 +171,8 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri
             || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT
             || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT
             || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG
-            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BINARY) {
+            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BINARY
+            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.VARCHAR) {
           fields[i] = new StructField(carbonColumn.getColName(),
               convertCarbonToSparkDataType(dataType), true, null);
         } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithLongString.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithLongString.scala
new file mode 100644
index 0000000..521b241
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithLongString.scala
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import java.io.{File, PrintWriter}
+import java.math.BigDecimal
+import java.net.{BindException, ServerSocket}
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+case class StreamLongStrData(id: Integer, name: String, city: String, salary: java.lang.Float,
+    tax: BigDecimal, percent: java.lang.Double, birthday: String,
+    register: String, updated: String, longStr: String,
+    file: FileElement)
+
+class TestStreamingTableWithLongString extends QueryTest with BeforeAndAfterAll {
+
+  private val spark = sqlContext.sparkSession
+  private val dataFilePath = s"$resourcesPath/streamSample_with_long_string.csv"
+  private val csvDataDir = integrationPath + "/spark2/target/csvdata_longstr"
+  private val longStrValue = "abc" * 12000
+
+  override def beforeAll {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+    sql("DROP DATABASE IF EXISTS streaming_longstr CASCADE")
+    sql("CREATE DATABASE streaming_longstr")
+    sql("USE streaming_longstr")
+
+    dropTable()
+
+    // 1. streaming table with long string field
+    // socket source
+    createTable(tableName = "stream_table_longstr", streaming = true, withBatchLoad = true)
+
+    // 2. streaming table with long string field
+    // file source
+    createTable(tableName = "stream_table_longstr_file", streaming = true, withBatchLoad = true)
+
+    // 3. streaming table with long string and complex field
+    createTableWithComplexType(
+      tableName = "stream_table_longstr_complex", streaming = true, withBatchLoad = true)
+  }
+
+  override def afterAll {
+    dropTable()
+    sql("USE default")
+    sql("DROP DATABASE IF EXISTS streaming_longstr CASCADE")
+    new File(csvDataDir).delete()
+  }
+
+  def dropTable(): Unit = {
+    sql("drop table if exists streaming_longstr.stream_table_longstr")
+    sql("drop table if exists streaming_longstr.stream_table_longstr_file")
+    sql("drop table if exists streaming_longstr.stream_table_longstr_complex")
+  }
+
+  // input source: file
+  test("[CARBONDATA-3497] Support to write long string for streaming table: ingest from file source") {
+    val identifier = new TableIdentifier("stream_table_longstr_file", Option("streaming_longstr"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetaStore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    // streaming ingest 10 rows
+    generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
+    val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+      identifier)
+    thread.start()
+    Thread.sleep(3000)
+    generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir)
+    Thread.sleep(5000)
+    thread.interrupt()
+    checkAnswer(
+      sql("select count(*) from streaming_longstr.stream_table_longstr_file"),
+      Seq(Row(25))
+    )
+
+    val row = sql("select * from streaming_longstr.stream_table_longstr_file order by id").head()
+    val exceptedRow = Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), "10" + longStrValue)
+    assertResult(exceptedRow)(row)
+    new File(csvDataDir).delete()
+  }
+
+  test("[CARBONDATA-3497] Support to write long string for streaming table") {
+    executeStreamingIngest(
+      tableName = "stream_table_longstr",
+      batchNums = 2,
+      rowNumsEachBatch = 25,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
+      handoffSize = 51200,
+      autoHandoff = false
+    )
+
+    var result = sql("select * from streaming_longstr.stream_table_longstr order by id, name").collect()
+    assert(result != null)
+    assert(result.length == 55)
+    // check one row of streaming data
+    assert(result(1).getString(1) == "name_2")
+    assert(result(1).getString(9) == ("2" + longStrValue))
+    // check one row of batch loading
+    assert(result(50).getInt(0) == 100000001)
+    assert(result(50).getString(1) == "batch_1")
+    assert(result(50).getString(9) == ("1" + longStrValue))
+
+    checkAnswer(
+      sql("select * from streaming_longstr.stream_table_longstr where id = 1"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+    checkAnswer(
+      sql("select * from streaming_longstr.stream_table_longstr where id > 49 and id < 100000002"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+    checkAnswer(
+      sql("select * from streaming_longstr.stream_table_longstr where id between 50 and 100000001"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+    sql("show segments for table streaming_longstr.stream_table_longstr").show(20, false)
+    sql("alter table streaming_longstr.stream_table_longstr finish streaming")
+    sql("alter table streaming_longstr.stream_table_longstr compact 'streaming'")
+    sql("show segments for table streaming_longstr.stream_table_longstr").show(20, false)
+    Thread.sleep(5000)
+
+    result = sql("select * from streaming_longstr.stream_table_longstr order by id, name").collect()
+    assert(result != null)
+    assert(result.length == 55)
+    // check one row of streaming data
+    assert(result(2).getString(1) == "name_3")
+    assert(result(2).getString(9) == ("3" + longStrValue))
+    // check one row of batch loading
+    assert(result(51).getInt(0) == 100000002)
+    assert(result(51).getString(1) == "batch_2")
+    assert(result(51).getString(9) == ("2" + longStrValue))
+
+    checkAnswer(
+      sql("select * from streaming_longstr.stream_table_longstr where id = 1"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+    checkAnswer(
+      sql("select * from streaming_longstr.stream_table_longstr where id > 49 and id < 100000002"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+    checkAnswer(
+      sql("select * from streaming_longstr.stream_table_longstr where id between 50 and 100000001"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+    sql("alter table streaming_longstr.stream_table_longstr compact 'major'")
+    sql("show segments for table streaming_longstr.stream_table_longstr").show(20, false)
+    Thread.sleep(5000)
+
+    result = sql("select * from streaming_longstr.stream_table_longstr order by id, name").collect()
+    assert(result != null)
+    assert(result.length == 55)
+    // check one row of streaming data
+    assert(result(3).getString(1) == "name_4")
+    assert(result(3).getString(9) == ("4" + longStrValue))
+    // check one row of batch loading
+    assert(result(52).getInt(0) == 100000003)
+    assert(result(52).getString(1) == "batch_3")
+    assert(result(52).getString(9) == ("3" + longStrValue))
+
+    checkAnswer(
+      sql("select * from streaming_longstr.stream_table_longstr where id = 1"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+    checkAnswer(
+      sql("select * from streaming_longstr.stream_table_longstr where id > 49 and id < 100000002"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+    checkAnswer(
+      sql("select * from streaming_longstr.stream_table_longstr where id between 50 and 100000001"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+  }
+
+  test("[CARBONDATA-3497] Support to write long string for streaming table: include complex column") {
+    executeStreamingIngest(
+      tableName = "stream_table_longstr_complex",
+      batchNums = 2,
+      rowNumsEachBatch = 25,
+      intervalOfSource = 5,
+      intervalOfIngest = 5,
+      continueSeconds = 20,
+      handoffSize = 51200,
+      autoHandoff = false
+    )
+
+    // non-filter
+    val result = sql("select * from streaming_longstr.stream_table_longstr_complex order by id, name").collect()
+    assert(result != null)
+    assert(result.length == 55)
+    // check one row of streaming data
+    assert(result(3).getString(1) == "name_4")
+    assert(result(3).getString(9) == ("4" + longStrValue))
+    // check one row of batch loading
+    assert(result(52).getInt(0) == 100000003)
+    assert(result(52).getString(1) == "batch_3")
+    assert(result(52).getString(9) == ("3" + longStrValue))
+    assert(result(52).getStruct(10).getInt(1) == 40)
+
+    // filter
+    checkAnswer(
+      sql("select * from streaming_longstr.stream_table_longstr_complex where id = 1"),
+      Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 1))))
+
+    checkAnswer(
+      sql("select * from streaming_longstr.stream_table_longstr_complex where id > 49 and id < 100000002"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue), Row(wrap(Array("school_50", "school_5050")), 50)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 20))))
+
+    checkAnswer(
+      sql("select * from streaming_longstr.stream_table_longstr_complex where id between 50 and 100000001"),
+      Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue), Row(wrap(Array("school_50", "school_5050")), 50)),
+        Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 20))))
+  }
+
+  test("[CARBONDATA-3497] Support to write long string for streaming table: StreamSQL") {
+    sql("DROP TABLE IF EXISTS source")
+    sql("DROP TABLE IF EXISTS sink")
+
+    var rows = sql("SHOW STREAMS").collect()
+    assertResult(0)(rows.length)
+
+    val csvDataDir = integrationPath + "/spark2/target/streamSql_longstr"
+    // streaming ingest 10 rows
+    generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
+
+    sql(
+      s"""
+         |CREATE TABLE source(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP,
+         | longstr STRING
+         |)
+         |STORED AS carbondata
+         |TBLPROPERTIES (
+         | 'streaming'='source',
+         | 'format'='csv',
+         | 'path'='$csvDataDir'
+         |)
+      """.stripMargin)
+
+    sql(
+      s"""
+         |CREATE TABLE sink(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP,
+         | longstr STRING
+         | )
+         |STORED AS carbondata
+         |TBLPROPERTIES('streaming'='sink', 'LONG_STRING_COLUMNS'='longstr')
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE STREAM stream123 ON TABLE sink
+        |STMPROPERTIES(
+        |  'trigger'='ProcessingTime',
+        |  'interval'='5 seconds')
+        |AS
+        |  SELECT *
+        |  FROM source
+        |  WHERE id % 2 = 1
+      """.stripMargin).show(false)
+
+    Thread.sleep(200)
+    sql("select * from sink").show
+
+    generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, SaveMode.Append)
+    Thread.sleep(7000)
+
+    // after 2 minibatch, there should be 10 row added (filter condition: id%2=1)
+    checkAnswer(sql("select count(*) from sink"), Seq(Row(10)))
+
+    val row = sql("select * from sink order by id").head()
+    val exceptedRow = Row(11, "name_11", "city_11", 110000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("11" + longStrValue))
+    assertResult(exceptedRow)(row)
+
+    sql("SHOW STREAMS").show(false)
+
+    rows = sql("SHOW STREAMS").collect()
+    assertResult(1)(rows.length)
+    assertResult("stream123")(rows.head.getString(0))
+    assertResult("RUNNING")(rows.head.getString(2))
+    assertResult("streaming_longstr.source")(rows.head.getString(3))
+    assertResult("streaming_longstr.sink")(rows.head.getString(4))
+
+    rows = sql("SHOW STREAMS ON TABLE sink").collect()
+    assertResult(1)(rows.length)
+    assertResult("stream123")(rows.head.getString(0))
+    assertResult("RUNNING")(rows.head.getString(2))
+    assertResult("streaming_longstr.source")(rows.head.getString(3))
+    assertResult("streaming_longstr.sink")(rows.head.getString(4))
+
+    sql("DROP STREAM stream123")
+    sql("DROP STREAM IF EXISTS stream123")
+
+    rows = sql("SHOW STREAMS").collect()
+    assertResult(0)(rows.length)
+
+    sql("DROP TABLE IF EXISTS source")
+    sql("DROP TABLE IF EXISTS sink")
+    new File(csvDataDir).delete()
+  }
+
+  def createWriteSocketThread(
+      serverSocket: ServerSocket,
+      writeNums: Int,
+      rowNums: Int,
+      intervalSecond: Int): Thread = {
+    new Thread() {
+      override def run(): Unit = {
+        // wait for client to connection request and accept
+        val clientSocket = serverSocket.accept()
+        val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+        var index = 0
+        for (_ <- 1 to writeNums) {
+          // write 5 records per iteration
+          val stringBuilder = new StringBuilder()
+          for (_ <- 1 to rowNums) {
+            index = index + 1
+            stringBuilder.append(index.toString + ",name_" + index
+                                 + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
+                                 ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01," +
+                                 index.toString() + ("abc" * 12000) +
+                                 ",school_" + index + ":school_" + index + index + "$" + index)
+            stringBuilder.append("\n")
+          }
+          socketWriter.append(stringBuilder.toString())
+          socketWriter.flush()
+          Thread.sleep(1000 * intervalSecond)
+        }
+        socketWriter.close()
+      }
+    }
+  }
+
+  def createSocketStreamingThread(
+      spark: SparkSession,
+      port: Int,
+      carbonTable: CarbonTable,
+      tableIdentifier: TableIdentifier,
+      intervalSecond: Int = 2,
+      handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
+      autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+  ): Thread = {
+    new Thread() {
+      override def run(): Unit = {
+        var qry: StreamingQuery = null
+        try {
+          import spark.implicits._
+          val readSocketDF = spark.readStream
+            .format("socket")
+            .option("host", "localhost")
+            .option("port", port)
+            .load().as[String]
+            .map(_.split(","))
+            .map { fields => {
+              val tmp = fields(10).split("\\$")
+              val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
+              StreamLongStrData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat,
+                  BigDecimal.valueOf(fields(4).toDouble), fields(5).toDouble,
+                  fields(6), fields(7), fields(8), fields(9), file)
+            } }
+
+          // Write data from socket stream to carbondata file
+          // repartition to simulate an empty partition when readSocketDF has only one row
+          qry = readSocketDF.repartition(2).writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime(s"$intervalSecond seconds"))
+            .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
+            .option("dbName", tableIdentifier.database.get)
+            .option("tableName", tableIdentifier.table)
+            .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
+            .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+            .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
+            .start()
+          qry.awaitTermination()
+        } catch {
+          case ex: Throwable =>
+            LOGGER.error(ex.getMessage)
+            throw new Exception(ex.getMessage, ex)
+        } finally {
+          if (null != qry) {
+            qry.stop()
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * start ingestion thread: write `rowNumsEachBatch` rows repeatly for `batchNums` times.
+   */
+  def executeStreamingIngest(
+      tableName: String,
+      batchNums: Int,
+      rowNumsEachBatch: Int,
+      intervalOfSource: Int,
+      intervalOfIngest: Int,
+      continueSeconds: Int,
+      handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
+      autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+  ): Unit = {
+    val identifier = new TableIdentifier(tableName, Option("streaming_longstr"))
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetaStore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    var server: ServerSocket = null
+    try {
+      server = getServerSocket()
+      val thread1 = createWriteSocketThread(
+        serverSocket = server,
+        writeNums = batchNums,
+        rowNums = rowNumsEachBatch,
+        intervalSecond = intervalOfSource)
+      val thread2 = createSocketStreamingThread(
+        spark = spark,
+        port = server.getLocalPort,
+        carbonTable = carbonTable,
+        tableIdentifier = identifier,
+        intervalSecond = intervalOfIngest,
+        handoffSize = handoffSize,
+        autoHandoff = autoHandoff)
+      thread1.start()
+      thread2.start()
+      Thread.sleep(continueSeconds * 1000)
+      thread2.interrupt()
+      thread1.interrupt()
+    } finally {
+      if (null != server) {
+        server.close()
+      }
+    }
+  }
+
+  def generateCSVDataFile(
+      spark: SparkSession,
+      idStart: Int,
+      rowNums: Int,
+      csvDirPath: String,
+      saveMode: SaveMode = SaveMode.Overwrite): Unit = {
+    // Create csv data frame file
+    val csvDataDF = {
+      // generate data with dimension columns (name and city)
+      val csvRDD = spark.sparkContext.parallelize(idStart until idStart + rowNums)
+        .map { id =>
+          (id,
+            "name_" + id,
+            "city_" + id,
+            10000.00 * id,
+            BigDecimal.valueOf(0.01),
+            80.01,
+            "1990-01-01",
+            "2010-01-01 10:01:01",
+            "2010-01-01 10:01:01",
+            id.toString() + ("abc" * 12000),
+            "school_" + id + "\002school_" + id + id + "\001" + id)
+        }
+      spark.createDataFrame(csvRDD).toDF(
+        "id", "name", "city", "salary", "tax", "percent", "birthday", "register", "updated", "longstr", "file")
+    }
+
+    csvDataDF.write
+      .option("header", "false")
+      .mode(saveMode)
+      .csv(csvDirPath)
+  }
+
+  def createFileStreamingThread(
+      spark: SparkSession,
+      carbonTable: CarbonTable,
+      csvDataDir: String,
+      intervalSecond: Int,
+      tableIdentifier: TableIdentifier): Thread = {
+    new Thread() {
+      override def run(): Unit = {
+        var qry: StreamingQuery = null
+        try {
+          val readSocketDF = spark.readStream.text(csvDataDir)
+
+          // Write data from socket stream to carbondata file
+          qry = readSocketDF.writeStream
+            .format("carbondata")
+            .trigger(ProcessingTime(s"${ intervalSecond } seconds"))
+            .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
+            .option("dbName", tableIdentifier.database.get)
+            .option("tableName", tableIdentifier.table)
+            .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+            .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+              CarbonStreamParser.CARBON_STREAM_PARSER_CSV)
+            .start()
+
+          qry.awaitTermination()
+        } catch {
+          case _: InterruptedException =>
+            println("Done reading and writing streaming data")
+        } finally {
+          if (qry != null) {
+            qry.stop()
+          }
+        }
+      }
+    }
+  }
+
+  def createTable(tableName: String, streaming: Boolean, withBatchLoad: Boolean): Unit = {
+    sql(
+      s"""
+         | CREATE TABLE streaming_longstr.$tableName(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP,
+         | longstr STRING
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
+         | 'sort_columns'='name', 'dictionary_include'='city,register', 'LONG_STRING_COLUMNS'='longstr')
+         | """.stripMargin)
+
+    if (withBatchLoad) {
+      // batch loading 5 rows
+      executeBatchLoad(tableName)
+    }
+  }
+
+  def createTableWithComplexType(
+      tableName: String,
+      streaming: Boolean,
+      withBatchLoad: Boolean): Unit = {
+    sql(
+      s"""
+         | CREATE TABLE streaming_longstr.$tableName(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP,
+         | longstr STRING,
+         | file struct<school:array<string>, age:int>
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
+         | 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated', 'LONG_STRING_COLUMNS'='longstr')
+         | """.stripMargin)
+
+    if (withBatchLoad) {
+      // batch loading 5 rows
+      executeBatchLoad(tableName)
+    }
+  }
+
+  def executeBatchLoad(tableName: String): Unit = {
+    sql(
+      s"LOAD DATA LOCAL INPATH '$dataFilePath' INTO TABLE streaming_longstr.$tableName OPTIONS" +
+      "('HEADER'='true','COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')")
+  }
+
+  def wrap(array: Array[String]) = {
+    new mutable.WrappedArray.ofRef(array)
+  }
+
+  /**
+   * get a ServerSocket
+   * if the address was already used, it will retry to use new port number.
+   *
+   * @return ServerSocket
+   */
+  def getServerSocket(): ServerSocket = {
+    var port = 7071
+    var serverSocket: ServerSocket = null
+    var retry = false
+    do {
+      try {
+        retry = false
+        serverSocket = new ServerSocket(port)
+      } catch {
+        case ex: BindException =>
+          retry = true
+          port = port + 2
+          if (port >= 65535) {
+            throw ex
+          }
+      }
+    } while (retry)
+    serverSocket
+  }
+}
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 5ef5ab9..1f642e5 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -91,6 +91,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
   private boolean[] isNoDictionaryDimensionColumn;
   private int dimensionWithComplexCount;
   private int measureCount;
+  private boolean[] dimensionsIsVarcharTypeMap;
   private DataType[] measureDataTypes;
   private StreamBlockletWriter output = null;
   private String compressorName;
@@ -147,6 +148,10 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
     dimensionWithComplexCount = configuration.getDimensionCount();
     measureCount = configuration.getMeasureCount();
     dataFields = configuration.getDataFields();
+    dimensionsIsVarcharTypeMap = new boolean[dimensionWithComplexCount];
+    for (int i = 0; i < dimensionWithComplexCount; i++) {
+      dimensionsIsVarcharTypeMap[i] = dataFields[i].getColumn().getDataType() == DataTypes.VARCHAR;
+    }
     measureDataTypes = new DataType[measureCount];
     for (int i = 0; i < measureCount; i++) {
       measureDataTypes[i] =
@@ -234,7 +239,11 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
         if (null != columnValue) {
           if (isNoDictionaryDimensionColumn[dimCount]) {
             byte[] col = (byte[]) columnValue;
-            output.writeShort(col.length);
+            if (dimensionsIsVarcharTypeMap[dimCount]) {
+              output.writeInt(col.length);
+            } else {
+              output.writeShort(col.length);
+            }
             output.writeBytes(col);
             output.dimStatsCollectors[dimCount].update(col);
           } else {
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java
index 00d06b6..bf2c460 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java
@@ -21,6 +21,7 @@ import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
 
 import com.univocity.parsers.csv.CsvParser;
 import com.univocity.parsers.csv.CsvParserSettings;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
@@ -32,7 +33,9 @@ public class CSVStreamParserImp implements CarbonStreamParser {
 
   private CsvParser csvParser;
 
-  @Override public void initialize(Configuration configuration, StructType structType) {
+  @Override public void initialize(
+      Configuration configuration,
+      StructType structType, boolean[] isVarcharTypeMapping) {
     CsvParserSettings settings = CSVInputFormat.extractCsvParserSettings(configuration);
     csvParser = new CsvParser(settings);
   }
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
index 94f0307..e68117c 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
@@ -36,7 +36,8 @@ public interface CarbonStreamParser {
 
   String CARBON_STREAM_PARSER_DEFAULT = CARBON_STREAM_PARSER_ROW_PARSER;
 
-  void initialize(Configuration configuration, StructType structType);
+  void initialize(Configuration configuration,
+      StructType structType, boolean[] isVarcharTypeMapping);
 
   Object[] parserRow(InternalRow value);
 
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
index cb12bb6..16e7258 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConst
 class RowStreamParserImp extends CarbonStreamParser {
 
   var configuration: Configuration = null
+  var isVarcharTypeMapping: Array[Boolean] = null
   var structType: StructType = null
   var encoder: ExpressionEncoder[Row] = null
 
@@ -44,10 +45,12 @@ class RowStreamParserImp extends CarbonStreamParser {
   var complexDelimiters: util.ArrayList[String] = new util.ArrayList[String]()
   var serializationNullFormat: String = null
 
-  override def initialize(configuration: Configuration, structType: StructType): Unit = {
+  override def initialize(configuration: Configuration,
+      structType: StructType, isVarcharTypeMapping: Array[Boolean]): Unit = {
     this.configuration = configuration
     this.structType = structType
     this.encoder = RowEncoder.apply(this.structType).resolveAndBind()
+    this.isVarcharTypeMapping = isVarcharTypeMapping
 
     this.timeStampFormat = new SimpleDateFormat(
       this.configuration.get(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT))
@@ -62,12 +65,12 @@ class RowStreamParserImp extends CarbonStreamParser {
   }
 
   override def parserRow(value: InternalRow): Array[Object] = {
-    this.encoder.fromRow(value).toSeq.map { x => {
+    this.encoder.fromRow(value).toSeq.zipWithIndex.map { case (x, i) =>
       FieldConverter.objectToString(
         x, serializationNullFormat, complexDelimiters,
-        timeStampFormat, dateFormat)
+        timeStampFormat, dateFormat,
+        isVarcharType = i < this.isVarcharTypeMapping.length && this.isVarcharTypeMapping(i))
     } }.toArray
-  }
 
   override def close(): Unit = {
   }