You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2019/09/06 07:48:14 UTC
[carbondata] branch master updated: [CARBONDATA-3497] Support to
write long string for streaming table
This is an automated email from the ASF dual-hosted git repository.
qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new a42f2e4 [CARBONDATA-3497] Support to write long string for streaming table
a42f2e4 is described below
commit a42f2e42017a04bf76f9cded0c929c29dec41abe
Author: Zhang Zhichao <44...@qq.com>
AuthorDate: Tue Aug 27 11:32:48 2019 +0800
[CARBONDATA-3497] Support to write long string for streaming table
This closes #3366
---
.../hadoop/stream/StreamRecordReader.java | 19 +-
.../resources/streamSample_with_long_string.csv | 6 +
.../streaming/CarbonAppendableStreamSink.scala | 19 +-
.../converter/SparkDataTypeConverterImpl.java | 6 +-
.../TestStreamingTableWithLongString.scala | 649 +++++++++++++++++++++
.../streaming/CarbonStreamRecordWriter.java | 11 +-
.../streaming/parser/CSVStreamParserImp.java | 5 +-
.../streaming/parser/CarbonStreamParser.java | 3 +-
.../streaming/parser/RowStreamParserImp.scala | 11 +-
9 files changed, 715 insertions(+), 14 deletions(-)
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
index 75e36be..1e40baa 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java
@@ -81,6 +81,7 @@ public class StreamRecordReader extends RecordReader<Void, Object> {
protected CarbonTable carbonTable;
private CarbonColumn[] storageColumns;
private boolean[] isRequired;
+ private boolean[] dimensionsIsVarcharTypeMap;
private DataType[] measureDataTypes;
private int dimensionCount;
private int measureCount;
@@ -163,6 +164,10 @@ public class StreamRecordReader extends RecordReader<Void, Object> {
.getDirectDictionaryGenerator(storageColumns[i].getDataType());
}
}
+ dimensionsIsVarcharTypeMap = new boolean[dimensionCount];
+ for (int i = 0; i < dimensionCount; i++) {
+ dimensionsIsVarcharTypeMap[i] = storageColumns[i].getDataType() == DataTypes.VARCHAR;
+ }
measureDataTypes = new DataType[measureCount];
for (int i = 0; i < measureCount; i++) {
measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType();
@@ -387,7 +392,12 @@ public class StreamRecordReader extends RecordReader<Void, Object> {
}
} else {
if (isNoDictColumn[colCount]) {
- int v = input.readShort();
+ int v = 0;
+ if (dimensionsIsVarcharTypeMap[colCount]) {
+ v = input.readInt();
+ } else {
+ v = input.readShort();
+ }
if (isRequired[colCount]) {
byte[] b = input.readBytes(v);
if (isFilterRequired[colCount]) {
@@ -561,7 +571,12 @@ public class StreamRecordReader extends RecordReader<Void, Object> {
outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
} else {
if (isNoDictColumn[colCount]) {
- int v = input.readShort();
+ int v = 0;
+ if (dimensionsIsVarcharTypeMap[colCount]) {
+ v = input.readInt();
+ } else {
+ v = input.readShort();
+ }
outputValues[colCount] = input.readBytes(v);
} else {
outputValues[colCount] = input.readInt();
diff --git a/integration/spark-common-test/src/test/resources/streamSample_with_long_string.csv b/integration/spark-common-test/src/test/resources/streamSample_with_long_string.csv
new file mode 100644
index 0000000..b010c07
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/streamSample_with_long_string.csv
@@ -0,0 +1,6 @@
+id,name,city,salary,tax,percent,birthday,register,updated,longstr,file
+100000001,batch_1,city_1,0.1,0.01,80.01,1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01,1abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...]
+100000002,batch_2,city_2,0.2,0.02,80.02,1990-01-02,2010-01-02 10:01:01,2010-01-02 10:01:01,2abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...]
+100000003,batch_3,city_3,0.3,0.03,80.03,1990-01-03,2010-01-03 10:01:01,2010-01-03 10:01:01,3abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...]
+100000004,batch_4,city_4,0.4,0.04,80.04,1990-01-04,2010-01-04 10:01:01,2010-01-04 10:01:01,4abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...]
+100000005,batch_5,city_5,0.5,0.05,80.05,1990-01-05,2010-01-05 10:01:01,2010-01-05 10:01:01,5abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...]
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 90132ff..4440e3a 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.stats.QueryStatistic
import org.apache.carbondata.core.util.CarbonProperties
@@ -261,6 +262,15 @@ object CarbonAppendableStreamSink {
}
val rowSchema = queryExecution.analyzed.schema
+ val isVarcharTypeMapping = {
+ val col2VarcharType = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ .getCreateOrderColumn(carbonLoadModel.getTableName).asScala
+ .map(c => c.getColName -> (c.getDataType == DataTypes.VARCHAR)).toMap
+ rowSchema.fieldNames.map(c => {
+ val r = col2VarcharType.get(c.toLowerCase)
+ r.isDefined && r.get
+ })
+ }
// write data file
result = sparkSession.sparkContext.runJob(queryExecution.toRdd,
(taskContext: TaskContext, iterator: Iterator[InternalRow]) => {
@@ -272,7 +282,8 @@ object CarbonAppendableStreamSink {
sparkAttemptNumber = taskContext.attemptNumber(),
committer,
iterator,
- rowSchema
+ rowSchema,
+ isVarcharTypeMapping
)
})
@@ -319,7 +330,8 @@ object CarbonAppendableStreamSink {
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
iterator: Iterator[InternalRow],
- rowSchema: StructType): (TaskCommitMessage, StreamFileIndex) = {
+ rowSchema: StructType,
+ isVarcharTypeMapping: Array[Boolean]): (TaskCommitMessage, StreamFileIndex) = {
val jobId = CarbonInputFormatUtil.getJobId(new Date, sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
@@ -350,7 +362,8 @@ object CarbonAppendableStreamSink {
val streamParser =
Class.forName(parserName).newInstance.asInstanceOf[CarbonStreamParser]
- streamParser.initialize(taskAttemptContext.getConfiguration, rowSchema)
+ streamParser.initialize(taskAttemptContext.getConfiguration,
+ rowSchema, isVarcharTypeMapping)
blockIndex = StreamSegment.appendBatchData(new InputIterator(iterator, streamParser),
taskAttemptContext, carbonLoadModel)
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
index 41b378d..4db1154 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
@@ -114,7 +114,8 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri
private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
DataType carbonDataType) {
- if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
+ if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING
+ || carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.VARCHAR) {
return DataTypes.StringType;
} else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
return DataTypes.ShortType;
@@ -170,7 +171,8 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri
|| dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT
|| dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT
|| dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG
- || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BINARY) {
+ || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BINARY
+ || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.VARCHAR) {
fields[i] = new StructField(carbonColumn.getColName(),
convertCarbonToSparkDataType(dataType), true, null);
} else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithLongString.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithLongString.scala
new file mode 100644
index 0000000..521b241
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithLongString.scala
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import java.io.{File, PrintWriter}
+import java.math.BigDecimal
+import java.net.{BindException, ServerSocket}
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+case class StreamLongStrData(id: Integer, name: String, city: String, salary: java.lang.Float,
+ tax: BigDecimal, percent: java.lang.Double, birthday: String,
+ register: String, updated: String, longStr: String,
+ file: FileElement)
+
+class TestStreamingTableWithLongString extends QueryTest with BeforeAndAfterAll {
+
+ private val spark = sqlContext.sparkSession
+ private val dataFilePath = s"$resourcesPath/streamSample_with_long_string.csv"
+ private val csvDataDir = integrationPath + "/spark2/target/csvdata_longstr"
+ private val longStrValue = "abc" * 12000
+
+ override def beforeAll {
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+ sql("DROP DATABASE IF EXISTS streaming_longstr CASCADE")
+ sql("CREATE DATABASE streaming_longstr")
+ sql("USE streaming_longstr")
+
+ dropTable()
+
+ // 1. streaming table with long string field
+ // socket source
+ createTable(tableName = "stream_table_longstr", streaming = true, withBatchLoad = true)
+
+ // 2. streaming table with long string field
+ // file source
+ createTable(tableName = "stream_table_longstr_file", streaming = true, withBatchLoad = true)
+
+ // 3. streaming table with long string and complex field
+ createTableWithComplexType(
+ tableName = "stream_table_longstr_complex", streaming = true, withBatchLoad = true)
+ }
+
+ override def afterAll {
+ dropTable()
+ sql("USE default")
+ sql("DROP DATABASE IF EXISTS streaming_longstr CASCADE")
+ new File(csvDataDir).delete()
+ }
+
+ def dropTable(): Unit = {
+ sql("drop table if exists streaming_longstr.stream_table_longstr")
+ sql("drop table if exists streaming_longstr.stream_table_longstr_file")
+ sql("drop table if exists streaming_longstr.stream_table_longstr_complex")
+ }
+
+ // input source: file
+ test("[CARBONDATA-3497] Support to write long string for streaming table: ingest from file source") {
+ val identifier = new TableIdentifier("stream_table_longstr_file", Option("streaming_longstr"))
+ val carbonTable = CarbonEnv.getInstance(spark).carbonMetaStore.lookupRelation(identifier)(spark)
+ .asInstanceOf[CarbonRelation].metaData.carbonTable
+ // streaming ingest 10 rows
+ generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
+ val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1,
+ identifier)
+ thread.start()
+ Thread.sleep(3000)
+ generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir)
+ Thread.sleep(5000)
+ thread.interrupt()
+ checkAnswer(
+ sql("select count(*) from streaming_longstr.stream_table_longstr_file"),
+ Seq(Row(25))
+ )
+
+ val row = sql("select * from streaming_longstr.stream_table_longstr_file order by id").head()
+ val exceptedRow = Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), "10" + longStrValue)
+ assertResult(exceptedRow)(row)
+ new File(csvDataDir).delete()
+ }
+
+ test("[CARBONDATA-3497] Support to write long string for streaming table") {
+ executeStreamingIngest(
+ tableName = "stream_table_longstr",
+ batchNums = 2,
+ rowNumsEachBatch = 25,
+ intervalOfSource = 5,
+ intervalOfIngest = 5,
+ continueSeconds = 20,
+ handoffSize = 51200,
+ autoHandoff = false
+ )
+
+ var result = sql("select * from streaming_longstr.stream_table_longstr order by id, name").collect()
+ assert(result != null)
+ assert(result.length == 55)
+ // check one row of streaming data
+ assert(result(1).getString(1) == "name_2")
+ assert(result(1).getString(9) == ("2" + longStrValue))
+ // check one row of batch loading
+ assert(result(50).getInt(0) == 100000001)
+ assert(result(50).getString(1) == "batch_1")
+ assert(result(50).getString(9) == ("1" + longStrValue))
+
+ checkAnswer(
+ sql("select * from streaming_longstr.stream_table_longstr where id = 1"),
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+ checkAnswer(
+ sql("select * from streaming_longstr.stream_table_longstr where id > 49 and id < 100000002"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+ checkAnswer(
+ sql("select * from streaming_longstr.stream_table_longstr where id between 50 and 100000001"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+ sql("show segments for table streaming_longstr.stream_table_longstr").show(20, false)
+ sql("alter table streaming_longstr.stream_table_longstr finish streaming")
+ sql("alter table streaming_longstr.stream_table_longstr compact 'streaming'")
+ sql("show segments for table streaming_longstr.stream_table_longstr").show(20, false)
+ Thread.sleep(5000)
+
+ result = sql("select * from streaming_longstr.stream_table_longstr order by id, name").collect()
+ assert(result != null)
+ assert(result.length == 55)
+ // check one row of streaming data
+ assert(result(2).getString(1) == "name_3")
+ assert(result(2).getString(9) == ("3" + longStrValue))
+ // check one row of batch loading
+ assert(result(51).getInt(0) == 100000002)
+ assert(result(51).getString(1) == "batch_2")
+ assert(result(51).getString(9) == ("2" + longStrValue))
+
+ checkAnswer(
+ sql("select * from streaming_longstr.stream_table_longstr where id = 1"),
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+ checkAnswer(
+ sql("select * from streaming_longstr.stream_table_longstr where id > 49 and id < 100000002"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+ checkAnswer(
+ sql("select * from streaming_longstr.stream_table_longstr where id between 50 and 100000001"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+ sql("alter table streaming_longstr.stream_table_longstr compact 'major'")
+ sql("show segments for table streaming_longstr.stream_table_longstr").show(20, false)
+ Thread.sleep(5000)
+
+ result = sql("select * from streaming_longstr.stream_table_longstr order by id, name").collect()
+ assert(result != null)
+ assert(result.length == 55)
+ // check one row of streaming data
+ assert(result(3).getString(1) == "name_4")
+ assert(result(3).getString(9) == ("4" + longStrValue))
+ // check one row of batch loading
+ assert(result(52).getInt(0) == 100000003)
+ assert(result(52).getString(1) == "batch_3")
+ assert(result(52).getString(9) == ("3" + longStrValue))
+
+ checkAnswer(
+ sql("select * from streaming_longstr.stream_table_longstr where id = 1"),
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+ checkAnswer(
+ sql("select * from streaming_longstr.stream_table_longstr where id > 49 and id < 100000002"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+
+ checkAnswer(
+ sql("select * from streaming_longstr.stream_table_longstr where id between 50 and 100000001"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue))))
+ }
+
+ test("[CARBONDATA-3497] Support to write long string for streaming table: include complex column") {
+ executeStreamingIngest(
+ tableName = "stream_table_longstr_complex",
+ batchNums = 2,
+ rowNumsEachBatch = 25,
+ intervalOfSource = 5,
+ intervalOfIngest = 5,
+ continueSeconds = 20,
+ handoffSize = 51200,
+ autoHandoff = false
+ )
+
+ // non-filter
+ val result = sql("select * from streaming_longstr.stream_table_longstr_complex order by id, name").collect()
+ assert(result != null)
+ assert(result.length == 55)
+ // check one row of streaming data
+ assert(result(3).getString(1) == "name_4")
+ assert(result(3).getString(9) == ("4" + longStrValue))
+ // check one row of batch loading
+ assert(result(52).getInt(0) == 100000003)
+ assert(result(52).getString(1) == "batch_3")
+ assert(result(52).getString(9) == ("3" + longStrValue))
+ assert(result(52).getStruct(10).getInt(1) == 40)
+
+ // filter
+ checkAnswer(
+ sql("select * from streaming_longstr.stream_table_longstr_complex where id = 1"),
+ Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 1))))
+
+ checkAnswer(
+ sql("select * from streaming_longstr.stream_table_longstr_complex where id > 49 and id < 100000002"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue), Row(wrap(Array("school_50", "school_5050")), 50)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 20))))
+
+ checkAnswer(
+ sql("select * from streaming_longstr.stream_table_longstr_complex where id between 50 and 100000001"),
+ Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue), Row(wrap(Array("school_50", "school_5050")), 50)),
+ Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 20))))
+ }
+
+ test("[CARBONDATA-3497] Support to write long string for streaming table: StreamSQL") {
+ sql("DROP TABLE IF EXISTS source")
+ sql("DROP TABLE IF EXISTS sink")
+
+ var rows = sql("SHOW STREAMS").collect()
+ assertResult(0)(rows.length)
+
+ val csvDataDir = integrationPath + "/spark2/target/streamSql_longstr"
+ // streaming ingest 10 rows
+ generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
+
+ sql(
+ s"""
+ |CREATE TABLE source(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT,
+ | tax DECIMAL(8,2),
+ | percent double,
+ | birthday DATE,
+ | register TIMESTAMP,
+ | updated TIMESTAMP,
+ | longstr STRING
+ |)
+ |STORED AS carbondata
+ |TBLPROPERTIES (
+ | 'streaming'='source',
+ | 'format'='csv',
+ | 'path'='$csvDataDir'
+ |)
+ """.stripMargin)
+
+ sql(
+ s"""
+ |CREATE TABLE sink(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT,
+ | tax DECIMAL(8,2),
+ | percent double,
+ | birthday DATE,
+ | register TIMESTAMP,
+ | updated TIMESTAMP,
+ | longstr STRING
+ | )
+ |STORED AS carbondata
+ |TBLPROPERTIES('streaming'='sink', 'LONG_STRING_COLUMNS'='longstr')
+ """.stripMargin)
+
+ sql(
+ """
+ |CREATE STREAM stream123 ON TABLE sink
+ |STMPROPERTIES(
+ | 'trigger'='ProcessingTime',
+ | 'interval'='5 seconds')
+ |AS
+ | SELECT *
+ | FROM source
+ | WHERE id % 2 = 1
+ """.stripMargin).show(false)
+
+ Thread.sleep(200)
+ sql("select * from sink").show
+
+ generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, SaveMode.Append)
+ Thread.sleep(7000)
+
+ // after 2 minibatch, there should be 10 row added (filter condition: id%2=1)
+ checkAnswer(sql("select count(*) from sink"), Seq(Row(10)))
+
+ val row = sql("select * from sink order by id").head()
+ val exceptedRow = Row(11, "name_11", "city_11", 110000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("11" + longStrValue))
+ assertResult(exceptedRow)(row)
+
+ sql("SHOW STREAMS").show(false)
+
+ rows = sql("SHOW STREAMS").collect()
+ assertResult(1)(rows.length)
+ assertResult("stream123")(rows.head.getString(0))
+ assertResult("RUNNING")(rows.head.getString(2))
+ assertResult("streaming_longstr.source")(rows.head.getString(3))
+ assertResult("streaming_longstr.sink")(rows.head.getString(4))
+
+ rows = sql("SHOW STREAMS ON TABLE sink").collect()
+ assertResult(1)(rows.length)
+ assertResult("stream123")(rows.head.getString(0))
+ assertResult("RUNNING")(rows.head.getString(2))
+ assertResult("streaming_longstr.source")(rows.head.getString(3))
+ assertResult("streaming_longstr.sink")(rows.head.getString(4))
+
+ sql("DROP STREAM stream123")
+ sql("DROP STREAM IF EXISTS stream123")
+
+ rows = sql("SHOW STREAMS").collect()
+ assertResult(0)(rows.length)
+
+ sql("DROP TABLE IF EXISTS source")
+ sql("DROP TABLE IF EXISTS sink")
+ new File(csvDataDir).delete()
+ }
+
+ def createWriteSocketThread(
+ serverSocket: ServerSocket,
+ writeNums: Int,
+ rowNums: Int,
+ intervalSecond: Int): Thread = {
+ new Thread() {
+ override def run(): Unit = {
+ // wait for client to connection request and accept
+ val clientSocket = serverSocket.accept()
+ val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+ var index = 0
+ for (_ <- 1 to writeNums) {
+ // write 5 records per iteration
+ val stringBuilder = new StringBuilder()
+ for (_ <- 1 to rowNums) {
+ index = index + 1
+ stringBuilder.append(index.toString + ",name_" + index
+ + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" +
+ ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01," +
+ index.toString() + ("abc" * 12000) +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ stringBuilder.append("\n")
+ }
+ socketWriter.append(stringBuilder.toString())
+ socketWriter.flush()
+ Thread.sleep(1000 * intervalSecond)
+ }
+ socketWriter.close()
+ }
+ }
+ }
+
+ def createSocketStreamingThread(
+ spark: SparkSession,
+ port: Int,
+ carbonTable: CarbonTable,
+ tableIdentifier: TableIdentifier,
+ intervalSecond: Int = 2,
+ handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
+ autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+ ): Thread = {
+ new Thread() {
+ override def run(): Unit = {
+ var qry: StreamingQuery = null
+ try {
+ import spark.implicits._
+ val readSocketDF = spark.readStream
+ .format("socket")
+ .option("host", "localhost")
+ .option("port", port)
+ .load().as[String]
+ .map(_.split(","))
+ .map { fields => {
+ val tmp = fields(10).split("\\$")
+ val file = FileElement(tmp(0).split(":"), tmp(1).toInt)
+ StreamLongStrData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat,
+ BigDecimal.valueOf(fields(4).toDouble), fields(5).toDouble,
+ fields(6), fields(7), fields(8), fields(9), file)
+ } }
+
+ // Write data from socket stream to carbondata file
+ // repartition to simulate an empty partition when readSocketDF has only one row
+ qry = readSocketDF.repartition(2).writeStream
+ .format("carbondata")
+ .trigger(ProcessingTime(s"$intervalSecond seconds"))
+ .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
+ .option("dbName", tableIdentifier.database.get)
+ .option("tableName", tableIdentifier.table)
+ .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize)
+ .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
+ .start()
+ qry.awaitTermination()
+ } catch {
+ case ex: Throwable =>
+ LOGGER.error(ex.getMessage)
+ throw new Exception(ex.getMessage, ex)
+ } finally {
+ if (null != qry) {
+ qry.stop()
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * start ingestion thread: write `rowNumsEachBatch` rows repeatly for `batchNums` times.
+ */
+ def executeStreamingIngest(
+ tableName: String,
+ batchNums: Int,
+ rowNumsEachBatch: Int,
+ intervalOfSource: Int,
+ intervalOfIngest: Int,
+ continueSeconds: Int,
+ handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT,
+ autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean
+ ): Unit = {
+ val identifier = new TableIdentifier(tableName, Option("streaming_longstr"))
+ val carbonTable = CarbonEnv.getInstance(spark).carbonMetaStore.lookupRelation(identifier)(spark)
+ .asInstanceOf[CarbonRelation].metaData.carbonTable
+ var server: ServerSocket = null
+ try {
+ server = getServerSocket()
+ val thread1 = createWriteSocketThread(
+ serverSocket = server,
+ writeNums = batchNums,
+ rowNums = rowNumsEachBatch,
+ intervalSecond = intervalOfSource)
+ val thread2 = createSocketStreamingThread(
+ spark = spark,
+ port = server.getLocalPort,
+ carbonTable = carbonTable,
+ tableIdentifier = identifier,
+ intervalSecond = intervalOfIngest,
+ handoffSize = handoffSize,
+ autoHandoff = autoHandoff)
+ thread1.start()
+ thread2.start()
+ Thread.sleep(continueSeconds * 1000)
+ thread2.interrupt()
+ thread1.interrupt()
+ } finally {
+ if (null != server) {
+ server.close()
+ }
+ }
+ }
+
+ def generateCSVDataFile(
+ spark: SparkSession,
+ idStart: Int,
+ rowNums: Int,
+ csvDirPath: String,
+ saveMode: SaveMode = SaveMode.Overwrite): Unit = {
+ // Create csv data frame file
+ val csvDataDF = {
+ // generate data with dimension columns (name and city)
+ val csvRDD = spark.sparkContext.parallelize(idStart until idStart + rowNums)
+ .map { id =>
+ (id,
+ "name_" + id,
+ "city_" + id,
+ 10000.00 * id,
+ BigDecimal.valueOf(0.01),
+ 80.01,
+ "1990-01-01",
+ "2010-01-01 10:01:01",
+ "2010-01-01 10:01:01",
+ id.toString() + ("abc" * 12000),
+ "school_" + id + "\002school_" + id + id + "\001" + id)
+ }
+ spark.createDataFrame(csvRDD).toDF(
+ "id", "name", "city", "salary", "tax", "percent", "birthday", "register", "updated", "longstr", "file")
+ }
+
+ csvDataDF.write
+ .option("header", "false")
+ .mode(saveMode)
+ .csv(csvDirPath)
+ }
+
+ def createFileStreamingThread(
+ spark: SparkSession,
+ carbonTable: CarbonTable,
+ csvDataDir: String,
+ intervalSecond: Int,
+ tableIdentifier: TableIdentifier): Thread = {
+ new Thread() {
+ override def run(): Unit = {
+ var qry: StreamingQuery = null
+ try {
+ val readSocketDF = spark.readStream.text(csvDataDir)
+
+ // Write data from socket stream to carbondata file
+ qry = readSocketDF.writeStream
+ .format("carbondata")
+ .trigger(ProcessingTime(s"${ intervalSecond } seconds"))
+ .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))
+ .option("dbName", tableIdentifier.database.get)
+ .option("tableName", tableIdentifier.table)
+ .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+ CarbonStreamParser.CARBON_STREAM_PARSER_CSV)
+ .start()
+
+ qry.awaitTermination()
+ } catch {
+ case _: InterruptedException =>
+ println("Done reading and writing streaming data")
+ } finally {
+ if (qry != null) {
+ qry.stop()
+ }
+ }
+ }
+ }
+ }
+
+ def createTable(tableName: String, streaming: Boolean, withBatchLoad: Boolean): Unit = {
+ sql(
+ s"""
+ | CREATE TABLE streaming_longstr.$tableName(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT,
+ | tax DECIMAL(8,2),
+ | percent double,
+ | birthday DATE,
+ | register TIMESTAMP,
+ | updated TIMESTAMP,
+ | longstr STRING
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
+ | 'sort_columns'='name', 'dictionary_include'='city,register', 'LONG_STRING_COLUMNS'='longstr')
+ | """.stripMargin)
+
+ if (withBatchLoad) {
+ // batch loading 5 rows
+ executeBatchLoad(tableName)
+ }
+ }
+
+ def createTableWithComplexType(
+ tableName: String,
+ streaming: Boolean,
+ withBatchLoad: Boolean): Unit = {
+ sql(
+ s"""
+ | CREATE TABLE streaming_longstr.$tableName(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT,
+ | tax DECIMAL(8,2),
+ | percent double,
+ | birthday DATE,
+ | register TIMESTAMP,
+ | updated TIMESTAMP,
+ | longstr STRING,
+ | file struct<school:array<string>, age:int>
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" }
+ | 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated', 'LONG_STRING_COLUMNS'='longstr')
+ | """.stripMargin)
+
+ if (withBatchLoad) {
+ // batch loading 5 rows
+ executeBatchLoad(tableName)
+ }
+ }
+
+ def executeBatchLoad(tableName: String): Unit = {
+ sql(
+ s"LOAD DATA LOCAL INPATH '$dataFilePath' INTO TABLE streaming_longstr.$tableName OPTIONS" +
+ "('HEADER'='true','COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')")
+ }
+
+ def wrap(array: Array[String]) = {
+ new mutable.WrappedArray.ofRef(array)
+ }
+
+ /**
+ * get a ServerSocket
+ * if the address was already used, it will retry to use new port number.
+ *
+ * @return ServerSocket
+ */
+ def getServerSocket(): ServerSocket = {
+ var port = 7071
+ var serverSocket: ServerSocket = null
+ var retry = false
+ do {
+ try {
+ retry = false
+ serverSocket = new ServerSocket(port)
+ } catch {
+ case ex: BindException =>
+ retry = true
+ port = port + 2
+ if (port >= 65535) {
+ throw ex
+ }
+ }
+ } while (retry)
+ serverSocket
+ }
+}
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 5ef5ab9..1f642e5 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -91,6 +91,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
private boolean[] isNoDictionaryDimensionColumn;
private int dimensionWithComplexCount;
private int measureCount;
+ private boolean[] dimensionsIsVarcharTypeMap;
private DataType[] measureDataTypes;
private StreamBlockletWriter output = null;
private String compressorName;
@@ -147,6 +148,10 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
dimensionWithComplexCount = configuration.getDimensionCount();
measureCount = configuration.getMeasureCount();
dataFields = configuration.getDataFields();
+ dimensionsIsVarcharTypeMap = new boolean[dimensionWithComplexCount];
+ for (int i = 0; i < dimensionWithComplexCount; i++) {
+ dimensionsIsVarcharTypeMap[i] = dataFields[i].getColumn().getDataType() == DataTypes.VARCHAR;
+ }
measureDataTypes = new DataType[measureCount];
for (int i = 0; i < measureCount; i++) {
measureDataTypes[i] =
@@ -234,7 +239,11 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
if (null != columnValue) {
if (isNoDictionaryDimensionColumn[dimCount]) {
byte[] col = (byte[]) columnValue;
- output.writeShort(col.length);
+ if (dimensionsIsVarcharTypeMap[dimCount]) {
+ output.writeInt(col.length);
+ } else {
+ output.writeShort(col.length);
+ }
output.writeBytes(col);
output.dimStatsCollectors[dimCount].update(col);
} else {
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java
index 00d06b6..bf2c460 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java
@@ -21,6 +21,7 @@ import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
@@ -32,7 +33,9 @@ public class CSVStreamParserImp implements CarbonStreamParser {
private CsvParser csvParser;
- @Override public void initialize(Configuration configuration, StructType structType) {
+ @Override public void initialize(
+ Configuration configuration,
+ StructType structType, boolean[] isVarcharTypeMapping) {
CsvParserSettings settings = CSVInputFormat.extractCsvParserSettings(configuration);
csvParser = new CsvParser(settings);
}
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
index 94f0307..e68117c 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
@@ -36,7 +36,8 @@ public interface CarbonStreamParser {
String CARBON_STREAM_PARSER_DEFAULT = CARBON_STREAM_PARSER_ROW_PARSER;
- void initialize(Configuration configuration, StructType structType);
+ void initialize(Configuration configuration,
+ StructType structType, boolean[] isVarcharTypeMapping);
Object[] parserRow(InternalRow value);
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
index cb12bb6..16e7258 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
@@ -36,6 +36,7 @@ import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConst
class RowStreamParserImp extends CarbonStreamParser {
var configuration: Configuration = null
+ var isVarcharTypeMapping: Array[Boolean] = null
var structType: StructType = null
var encoder: ExpressionEncoder[Row] = null
@@ -44,10 +45,12 @@ class RowStreamParserImp extends CarbonStreamParser {
var complexDelimiters: util.ArrayList[String] = new util.ArrayList[String]()
var serializationNullFormat: String = null
- override def initialize(configuration: Configuration, structType: StructType): Unit = {
+ override def initialize(configuration: Configuration,
+ structType: StructType, isVarcharTypeMapping: Array[Boolean]): Unit = {
this.configuration = configuration
this.structType = structType
this.encoder = RowEncoder.apply(this.structType).resolveAndBind()
+ this.isVarcharTypeMapping = isVarcharTypeMapping
this.timeStampFormat = new SimpleDateFormat(
this.configuration.get(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT))
@@ -62,12 +65,12 @@ class RowStreamParserImp extends CarbonStreamParser {
}
override def parserRow(value: InternalRow): Array[Object] = {
- this.encoder.fromRow(value).toSeq.map { x => {
+ this.encoder.fromRow(value).toSeq.zipWithIndex.map { case (x, i) =>
FieldConverter.objectToString(
x, serializationNullFormat, complexDelimiters,
- timeStampFormat, dateFormat)
+ timeStampFormat, dateFormat,
+ isVarcharType = i < this.isVarcharTypeMapping.length && this.isVarcharTypeMapping(i))
} }.toArray
- }
override def close(): Unit = {
}