You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/12/20 17:05:16 UTC
carbondata git commit: [CARBONDATA-3179] Map Data Load Failure and
Struct Projection Pushdown Issue
Repository: carbondata
Updated Branches:
refs/heads/master 34923db0e -> 96b2ea364
[CARBONDATA-3179] Map Data Load Failure and Struct Projection Pushdown Issue
Problem1 : Data Load failing for Insert into Select from same table in containing Map datatype.
Solution: Map type was not handled for this scenario. Handled it now.
Problem2 : Projection Pushdown not supported for table containing Struct of Map.
Solution: Pass the parent column only for projection pushdown if table contains MapType.
This closes #2993
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/96b2ea36
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/96b2ea36
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/96b2ea36
Branch: refs/heads/master
Commit: 96b2ea3646a2a768133880bb2e4c1318d366b482
Parents: 34923db
Author: manishnalla1994 <ma...@gmail.com>
Authored: Fri Dec 14 17:20:15 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Thu Dec 20 22:33:58 2018 +0530
----------------------------------------------------------------------
.../TestCreateDDLForComplexMapType.scala | 71 +++++++++++++++++++-
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 6 +-
.../spark/rdd/NewCarbonDataLoadRDD.scala | 12 ++--
.../carbondata/spark/util/CarbonScalaUtil.scala | 25 ++++---
.../spark/rdd/CarbonDataRDDFactory.scala | 5 +-
.../sql/CarbonDatasourceHadoopRelation.scala | 37 ++++++----
.../streaming/parser/FieldConverter.scala | 44 ++++++------
.../streaming/parser/RowStreamParserImp.scala | 16 +++--
8 files changed, 150 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
index 09f23e5..9006b61 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateDDLForComplexMapType.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk
-
import scala.collection.JavaConversions._
class TestCreateDDLForComplexMapType extends QueryTest with BeforeAndAfterAll {
@@ -471,4 +470,74 @@ class TestCreateDDLForComplexMapType extends QueryTest with BeforeAndAfterAll {
"sort_columns is unsupported for map datatype column: mapfield"))
}
+ test("Data Load Fail Issue") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<INT,STRING>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$path'
+ | INTO TABLE carbon OPTIONS(
+ | 'header' = 'false')
+ """.stripMargin)
+ sql("INSERT INTO carbon SELECT * FROM carbon")
+ checkAnswer(sql("select * from carbon"), Seq(
+ Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar")),
+ Row(Map(1 -> "Nalla", 2 -> "Singh", 4 -> "Kumar")),
+ Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar")),
+ Row(Map(10 -> "Nallaa", 20 -> "Sissngh", 100 -> "Gusspta", 40 -> "Kumar"))
+ ))
+ }
+
+ test("Struct inside map") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<INT,struct<kk:STRING,mm:STRING>>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ sql("INSERT INTO carbon values('1\002man\003nan\0012\002kands\003dsnknd')")
+ sql("INSERT INTO carbon SELECT * FROM carbon")
+ checkAnswer(sql("SELECT * FROM carbon limit 1"),
+ Seq(Row(Map(1 -> Row("man", "nan"), (2 -> Row("kands", "dsnknd"))))))
+ }
+
+ test("Struct inside map pushdown") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | mapField map<INT,struct<kk:STRING,mm:STRING>>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ sql("INSERT INTO carbon values('1\002man\003nan\0012\002kands\003dsnknd')")
+ checkAnswer(sql("SELECT mapField[1].kk FROM carbon"), Row("man"))
+ }
+
+ test("Map inside struct") {
+ sql("DROP TABLE IF EXISTS carbon")
+ sql(
+ s"""
+ | CREATE TABLE carbon(
+ | structField struct<intVal:INT,map1:MAP<STRING,STRING>>
+ | )
+ | STORED BY 'carbondata'
+ | """
+ .stripMargin)
+ sql("INSERT INTO carbon values('1\001man\003nan\002kands\003dsnknd')")
+ val res = sql("SELECT structField.intVal FROM carbon").show(false)
+ checkAnswer(sql("SELECT structField.intVal FROM carbon"), Seq(Row(1)))
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 8574e66..b67fc71 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.rdd
import java.io.{DataInputStream, InputStreamReader}
import java.nio.charset.Charset
import java.text.SimpleDateFormat
+import java.util
import java.util.regex.Pattern
import scala.collection.mutable
@@ -293,11 +294,12 @@ class CarbonBlockDistinctValuesCombineRDD(
row = rddIter.next()
if (row != null) {
rowCount += 1
+ val complexDelimiters = new util.ArrayList[String]
+ model.delimiters.foreach(x => complexDelimiters.add(x))
for (i <- 0 until dimNum) {
dimensionParsers(i).parseString(CarbonScalaUtil.getString(row.get(i),
model.serializationNullFormat,
- model.delimiters(0),
- model.delimiters(1),
+ complexDelimiters,
timeStampFormat,
dateFormat))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index f76a8d9..f44bb8d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -341,9 +341,7 @@ class NewRddIterator(rddIter: Iterator[Row],
private val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
private val dateFormat = new SimpleDateFormat(dateFormatString)
- private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
- private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
- private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2)
+ private val complexDelimiters = carbonLoadModel.getComplexDelimiters
private val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
import scala.collection.JavaConverters._
@@ -357,7 +355,7 @@ class NewRddIterator(rddIter: Iterator[Row],
val columns = new Array[AnyRef](row.length)
for (i <- 0 until columns.length) {
columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
- delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat,
+ complexDelimiters, timeStampFormat, dateFormat,
isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i))
}
columns
@@ -391,9 +389,7 @@ class LazyRddIterator(serializer: SerializerInstance,
.getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
private val dateFormat = new SimpleDateFormat(dateFormatString)
- private val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
- private val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
- private val delimiterLevel3 = carbonLoadModel.getComplexDelimiters.get(2)
+ private val complexDelimiters = carbonLoadModel.getComplexDelimiters
private val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
// the order of fields in dataframe and createTable may be different, here we need to know whether
@@ -431,7 +427,7 @@ class LazyRddIterator(serializer: SerializerInstance,
val columns = new Array[AnyRef](row.length)
for (i <- 0 until columns.length) {
columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
- delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat,
+ complexDelimiters, timeStampFormat, dateFormat,
isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i))
}
columns
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index ca9b4af..626d92f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -62,22 +62,22 @@ object CarbonScalaUtil {
def getString(value: Any,
serializationNullFormat: String,
- delimiterLevel1: String,
- delimiterLevel2: String,
+ complexDelimiters: util.ArrayList[String],
timeStampFormat: SimpleDateFormat,
dateFormat: SimpleDateFormat,
isVarcharType: Boolean = false,
- level: Int = 1): String = {
- FieldConverter.objectToString(value, serializationNullFormat, delimiterLevel1,
- delimiterLevel2, timeStampFormat, dateFormat, isVarcharType = isVarcharType, level)
+ level: Int = 0): String = {
+ FieldConverter.objectToString(value, serializationNullFormat, complexDelimiters,
+ timeStampFormat, dateFormat, isVarcharType = isVarcharType, level)
}
/**
* Converts incoming value to String after converting data as per the data type.
- * @param value Input value to convert
- * @param dataType Datatype to convert and then convert to String
+ *
+ * @param value Input value to convert
+ * @param dataType Datatype to convert and then convert to String
* @param timeStampFormat Timestamp format to convert in case of timestamp datatypes
- * @param dateFormat DataFormat to convert in case of DateType datatype
+ * @param dateFormat DataFormat to convert in case of DateType datatype
* @return converted String
*/
def convertToDateAndTimeFormats(
@@ -126,7 +126,8 @@ object CarbonScalaUtil {
/**
* Converts incoming value to String after converting data as per the data type.
- * @param value Input value to convert
+ *
+ * @param value Input value to convert
* @param column column which it value belongs to
* @return converted String
*/
@@ -183,7 +184,8 @@ object CarbonScalaUtil {
/**
* Converts incoming value to String after converting data as per the data type.
- * @param value Input value to convert
+ *
+ * @param value Input value to convert
* @param column column which it value belongs to
* @return converted String
*/
@@ -238,6 +240,7 @@ object CarbonScalaUtil {
/**
* Update partition values as per the right date and time format
+ *
* @return updated partition spec
*/
def updatePartitions(partitionSpec: mutable.LinkedHashMap[String, String],
@@ -466,7 +469,7 @@ object CarbonScalaUtil {
}
} catch {
case e: Exception =>
- // ignore it
+ // ignore it
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 3b69f9e..5d03026 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -983,15 +983,14 @@ object CarbonDataRDDFactory {
// generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
// input data from DataFrame
- val delimiterLevel1 = carbonLoadModel.getComplexDelimiters.get(0)
- val delimiterLevel2 = carbonLoadModel.getComplexDelimiters.get(1)
+ val complexDelimiters = carbonLoadModel.getComplexDelimiters
val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
dataFrame.get.rdd.map { row =>
if (null != row && row.length > partitionColumnIndex &&
null != row.get(partitionColumnIndex)) {
(CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
- delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+ complexDelimiters, timeStampFormat, dateFormat), row)
} else {
(null, row)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index f848ae1..672508f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -24,7 +24,7 @@ import scala.util.control.Breaks._
import org.apache.spark.CarbonInputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, GetArrayItem, GetStructField, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, GetArrayItem, GetMapValue, GetStructField, NamedExpression}
import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.optimizer.CarbonFilters
@@ -101,32 +101,41 @@ case class CarbonDatasourceHadoopRelation(
if (!complexFilterExists.exists(f => f.contains(true))) {
var parentColumn = new ListBuffer[String]
// In case of Struct or StructofStruct Complex type, get the project column for given
- // parent/child field and pushdown the corresponding project column. In case of Array,
- // ArrayofStruct or StructofArray, pushdown parent column
+ // parent/child field and pushdown the corresponding project column. In case of Array, Map,
+ // ArrayofStruct, StructofArray, MapOfStruct or StructOfMap, pushdown parent column
var reqColumns = projects.map {
case a@Alias(s: GetStructField, name) =>
- var arrayTypeExists = false
- var ifGetArrayItemExists = s
+ var arrayOrMapTypeExists = false
+ var ifGetArrayOrMapItemExists = s
breakable({
- while (ifGetArrayItemExists.containsChild != null) {
- if (ifGetArrayItemExists.childSchema.toString().contains("ArrayType")) {
- arrayTypeExists = ifGetArrayItemExists.childSchema.toString().contains("ArrayType")
+ while (ifGetArrayOrMapItemExists.containsChild != null) {
+ if (ifGetArrayOrMapItemExists.childSchema.toString().contains("ArrayType") ||
+ ifGetArrayOrMapItemExists.childSchema.toString().contains("MapType")) {
+ arrayOrMapTypeExists = true
break
}
- if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) {
- arrayTypeExists = s.childSchema.toString().contains("ArrayType")
+ if (ifGetArrayOrMapItemExists.child.isInstanceOf[AttributeReference]) {
+ arrayOrMapTypeExists = s.childSchema.toString().contains("ArrayType") ||
+ s.childSchema.toString().contains("MapType")
break
} else {
- if (ifGetArrayItemExists.child.isInstanceOf[GetArrayItem]) {
- arrayTypeExists = true
+ if (ifGetArrayOrMapItemExists.child.isInstanceOf[GetArrayItem] ||
+ ifGetArrayOrMapItemExists.child.isInstanceOf[GetMapValue]) {
+ arrayOrMapTypeExists = true
break
} else {
- ifGetArrayItemExists = ifGetArrayItemExists.child.asInstanceOf[GetStructField]
+ if (ifGetArrayOrMapItemExists.child.isInstanceOf[GetStructField]) {
+ ifGetArrayOrMapItemExists = ifGetArrayOrMapItemExists.child
+ .asInstanceOf[GetStructField]
+ } else {
+ arrayOrMapTypeExists = true
+ break
+ }
}
}
}
})
- if (!arrayTypeExists) {
+ if (!arrayOrMapTypeExists) {
parentColumn += s.toString().split("\\.")(0).replaceAll("#.*", "").toLowerCase
parentColumn = parentColumn.distinct
s.toString().replaceAll("#[0-9]*", "").toLowerCase
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
index cfafd40..5c67dfb 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.streaming.parser
import java.nio.charset.Charset
import java.text.SimpleDateFormat
+import java.util
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -28,8 +29,7 @@ object FieldConverter {
* Return a String representation of the input value
* @param value input value
* @param serializationNullFormat string for null value
- * @param delimiterLevel1 level 1 delimiter for complex type
- * @param delimiterLevel2 level 2 delimiter for complex type
+ * @param complexDelimiters List of Complex Delimiters
* @param timeStampFormat timestamp format
* @param dateFormat date format
* @param isVarcharType whether it is varchar type. A varchar type has no string length limit
@@ -38,12 +38,11 @@ object FieldConverter {
def objectToString(
value: Any,
serializationNullFormat: String,
- delimiterLevel1: String,
- delimiterLevel2: String,
+ complexDelimiters: util.ArrayList[String],
timeStampFormat: SimpleDateFormat,
dateFormat: SimpleDateFormat,
isVarcharType: Boolean = false,
- level: Int = 1): String = {
+ level: Int = 0): String = {
if (value == null) {
serializationNullFormat
} else {
@@ -66,30 +65,35 @@ object FieldConverter {
case bs: Array[Byte] => new String(bs,
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
case s: scala.collection.Seq[Any] =>
- val delimiter = if (level == 1) {
- delimiterLevel1
- } else {
- delimiterLevel2
- }
+ val delimiter = complexDelimiters.get(level)
val builder = new StringBuilder()
s.foreach { x =>
- builder.append(objectToString(x, serializationNullFormat, delimiterLevel1,
- delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1))
+ builder.append(objectToString(x, serializationNullFormat, complexDelimiters,
+ timeStampFormat, dateFormat, isVarcharType, level + 1))
.append(delimiter)
}
builder.substring(0, builder.length - delimiter.length())
+ // First convert the 'key' of Map and then append the keyValueDelimiter and then convert
+ // the 'value of the map and append delimiter
case m: scala.collection.Map[_, _] =>
- throw new Exception("Unsupported data type: Map")
- case r: org.apache.spark.sql.Row =>
- val delimiter = if (level == 1) {
- delimiterLevel1
- } else {
- delimiterLevel2
+ val delimiter = complexDelimiters.get(level)
+ val keyValueDelimiter = complexDelimiters.get(level + 1)
+ val builder = new StringBuilder()
+ m.foreach { x =>
+ builder.append(objectToString(x._1, serializationNullFormat, complexDelimiters,
+ timeStampFormat, dateFormat, isVarcharType, level + 2))
+ .append(keyValueDelimiter)
+ builder.append(objectToString(x._2, serializationNullFormat, complexDelimiters,
+ timeStampFormat, dateFormat, isVarcharType, level + 2))
+ .append(delimiter)
}
+ builder.substring(0, builder.length - delimiter.length())
+ case r: org.apache.spark.sql.Row =>
+ val delimiter = complexDelimiters.get(level)
val builder = new StringBuilder()
for (i <- 0 until r.length) {
- builder.append(objectToString(r(i), serializationNullFormat, delimiterLevel1,
- delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1))
+ builder.append(objectToString(r(i), serializationNullFormat, complexDelimiters,
+ timeStampFormat, dateFormat, isVarcharType, level + 1))
.append(delimiter)
}
builder.substring(0, builder.length - delimiter.length())
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96b2ea36/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
index 4dcb3ce..e279caf 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
@@ -18,6 +18,7 @@
package org.apache.carbondata.streaming.parser
import java.text.SimpleDateFormat
+import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
@@ -26,6 +27,8 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.processing.loading
+import org.apache.carbondata.processing.loading.ComplexDelimitersEnum
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
/**
@@ -39,9 +42,7 @@ class RowStreamParserImp extends CarbonStreamParser {
var timeStampFormat: SimpleDateFormat = null
var dateFormat: SimpleDateFormat = null
- var complexDelimiterLevel1: String = null
- var complexDelimiterLevel2: String = null
- var complexDelimiterLevel3: String = null
+ var complexDelimiters: util.ArrayList[String] = new util.ArrayList[String]()
var serializationNullFormat: String = null
override def initialize(configuration: Configuration, structType: StructType): Unit = {
@@ -53,9 +54,10 @@ class RowStreamParserImp extends CarbonStreamParser {
this.configuration.get(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT))
this.dateFormat = new SimpleDateFormat(
this.configuration.get(CarbonCommonConstants.CARBON_DATE_FORMAT))
- this.complexDelimiterLevel1 = this.configuration.get("carbon_complex_delimiter_level_1")
- this.complexDelimiterLevel2 = this.configuration.get("carbon_complex_delimiter_level_2")
- this.complexDelimiterLevel3 = this.configuration.get("carbon_complex_delimiter_level_3")
+ this.complexDelimiters.add(this.configuration.get("carbon_complex_delimiter_level_1"))
+ this.complexDelimiters.add(this.configuration.get("carbon_complex_delimiter_level_2"))
+ this.complexDelimiters.add(this.configuration.get("carbon_complex_delimiter_level_3"))
+ this.complexDelimiters.add(ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_4.value())
this.serializationNullFormat =
this.configuration.get(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
}
@@ -63,7 +65,7 @@ class RowStreamParserImp extends CarbonStreamParser {
override def parserRow(value: InternalRow): Array[Object] = {
this.encoder.fromRow(value).toSeq.map { x => {
FieldConverter.objectToString(
- x, serializationNullFormat, complexDelimiterLevel1, complexDelimiterLevel2,
+ x, serializationNullFormat, complexDelimiters,
timeStampFormat, dateFormat)
} }.toArray
}