You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by jackylk <gi...@git.apache.org> on 2017/12/28 14:59:08 UTC
[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1729#discussion_r158954562
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
@@ -484,161 +485,147 @@ case class CarbonLoadDataCommand(
// converted to hive standard fomat to let spark understand the data to partition.
val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
- val failAction =
- carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase(
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
- val ignoreAction =
- carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore")
- val query: LogicalPlan = if (dataFrame.isDefined) {
- var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
- val timeStampFormat = new SimpleDateFormat(timeStampformatString)
- var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
- val dateFormat = new SimpleDateFormat(dateFormatString)
- val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
- val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
- val serializationNullFormat =
- carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
- val attributes =
- StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
- val len = attributes.length
- val rdd = dataFrame.get.rdd.map { f =>
- val data = new Array[Any](len)
- var i = 0
- while (i < len) {
- data(i) =
- UTF8String.fromString(
- CarbonScalaUtil.getString(f.get(i),
- serializationNullFormat,
- delimiterLevel1,
- delimiterLevel2,
- timeStampFormat,
- dateFormat))
- i = i + 1
+ val badRecordAction =
+ carbonLoadModel.getBadRecordsAction.split(",")(1)
+ var timeStampformatString = carbonLoadModel.getTimestampformat
+ if (timeStampformatString.isEmpty) {
+ timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
+ }
+ val timeStampFormat = new SimpleDateFormat(timeStampformatString)
+ var dateFormatString = carbonLoadModel.getDateFormat
+ if (dateFormatString.isEmpty) {
+ dateFormatString = carbonLoadModel.getDefaultDateFormat
+ }
+ val dateFormat = new SimpleDateFormat(dateFormatString)
+ CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString)
+ CarbonSession.threadSet(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+ timeStampformatString)
+ CarbonSession.threadSet(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT,
+ serializationNullFormat)
+ CarbonSession.threadSet(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+ badRecordAction)
+ CarbonSession.threadSet(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+ carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1))
+ try {
+ val query: LogicalPlan = if (dataFrame.isDefined) {
+ val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+ val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+ val attributes =
+ StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
+ val len = attributes.length
+ val rdd = dataFrame.get.rdd.map { f =>
+ val data = new Array[Any](len)
+ var i = 0
+ while (i < len) {
+ data(i) =
+ UTF8String.fromString(
+ CarbonScalaUtil.getString(f.get(i),
+ serializationNullFormat,
+ delimiterLevel1,
+ delimiterLevel2,
+ timeStampFormat,
+ dateFormat))
+ i = i + 1
+ }
+ InternalRow.fromSeq(data)
}
- InternalRow.fromSeq(data)
- }
- if (updateModel.isDefined) {
- sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
- // In case of update, we don't need the segmrntid column in case of partitioning
- val dropAttributes = attributes.dropRight(1)
- val finalOutput = catalogTable.schema.map { attr =>
- dropAttributes.find { d =>
- val index = d.name.lastIndexOf("-updatedColumn")
- if (index > 0) {
- d.name.substring(0, index).equalsIgnoreCase(attr.name)
- } else {
- d.name.equalsIgnoreCase(attr.name)
- }
- }.get
+ if (updateModel.isDefined) {
--- End diff --
It is better to split this loading function into two functions, one for loading another for update. It is hard to read since it mixes two flows
---