You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by jackylk <gi...@git.apache.org> on 2017/12/28 14:59:08 UTC

[GitHub] carbondata pull request #1729: [CARBONDATA-1936][PARTITION] Corrected bad re...

Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1729#discussion_r158954562
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -484,161 +485,147 @@ case class CarbonLoadDataCommand(
         // converted to hive standard fomat to let spark understand the data to partition.
         val serializationNullFormat =
           carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
    -    val failAction =
    -      carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase(
    -        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
    -    val ignoreAction =
    -      carbonLoadModel.getBadRecordsAction.split(",")(1).equalsIgnoreCase("ignore")
    -    val query: LogicalPlan = if (dataFrame.isDefined) {
    -      var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
    -      val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    -      var dateFormatString = CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
    -      val dateFormat = new SimpleDateFormat(dateFormatString)
    -      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
    -      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
    -      val serializationNullFormat =
    -      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
    -      val attributes =
    -        StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
    -      val len = attributes.length
    -      val rdd = dataFrame.get.rdd.map { f =>
    -        val data = new Array[Any](len)
    -        var i = 0
    -        while (i < len) {
    -          data(i) =
    -            UTF8String.fromString(
    -              CarbonScalaUtil.getString(f.get(i),
    -                serializationNullFormat,
    -                delimiterLevel1,
    -                delimiterLevel2,
    -                timeStampFormat,
    -                dateFormat))
    -          i = i + 1
    +    val badRecordAction =
    +      carbonLoadModel.getBadRecordsAction.split(",")(1)
    +    var timeStampformatString = carbonLoadModel.getTimestampformat
    +    if (timeStampformatString.isEmpty) {
    +      timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
    +    }
    +    val timeStampFormat = new SimpleDateFormat(timeStampformatString)
    +    var dateFormatString = carbonLoadModel.getDateFormat
    +    if (dateFormatString.isEmpty) {
    +      dateFormatString = carbonLoadModel.getDefaultDateFormat
    +    }
    +    val dateFormat = new SimpleDateFormat(dateFormatString)
    +    CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString)
    +    CarbonSession.threadSet(
    +      CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
    +      timeStampformatString)
    +    CarbonSession.threadSet(
    +      CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT,
    +      serializationNullFormat)
    +    CarbonSession.threadSet(
    +      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
    +      badRecordAction)
    +    CarbonSession.threadSet(
    +      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
    +      carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1))
    +    try {
    +      val query: LogicalPlan = if (dataFrame.isDefined) {
    +        val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
    +        val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
    +        val attributes =
    +          StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
    +        val len = attributes.length
    +        val rdd = dataFrame.get.rdd.map { f =>
    +          val data = new Array[Any](len)
    +          var i = 0
    +          while (i < len) {
    +            data(i) =
    +              UTF8String.fromString(
    +                CarbonScalaUtil.getString(f.get(i),
    +                  serializationNullFormat,
    +                  delimiterLevel1,
    +                  delimiterLevel2,
    +                  timeStampFormat,
    +                  dateFormat))
    +            i = i + 1
    +          }
    +          InternalRow.fromSeq(data)
             }
    -        InternalRow.fromSeq(data)
    -      }
    -      if (updateModel.isDefined) {
    -        sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
    -        // In case of update, we don't need the segmrntid column in case of partitioning
    -        val dropAttributes = attributes.dropRight(1)
    -        val finalOutput = catalogTable.schema.map { attr =>
    -          dropAttributes.find { d =>
    -            val index = d.name.lastIndexOf("-updatedColumn")
    -            if (index > 0) {
    -              d.name.substring(0, index).equalsIgnoreCase(attr.name)
    -            } else {
    -              d.name.equalsIgnoreCase(attr.name)
    -            }
    -          }.get
    +        if (updateModel.isDefined) {
    --- End diff --
    
    It is better to split this loading function into two functions, one for loading another for update. It is hard to read since it mixes two flows


---