You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by foryou2030 <gi...@git.apache.org> on 2016/09/02 09:24:38 UTC

[GitHub] incubator-carbondata pull request #122: [CARBONDATA-202] Handled exception t...

GitHub user foryou2030 opened a pull request:

    https://github.com/apache/incubator-carbondata/pull/122

    [CARBONDATA-202] Handled exception thrown in beeline for all dictionary 

    # Why raise this pr?
    Exception thrown in Beeline for data loading when dictionary file content is not in correct format is not proper.
    # How to solve?
    use accumulator as flag, record the bad record in dictionary files, then throw exception


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/foryou2030/incubator-carbondata dictionary_ex

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-carbondata/pull/122.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #122
    
----
commit 56b3a85ef8e371c302c94354d121ff267ddae329
Author: foryou2030 <fo...@126.com>
Date:   2016-09-02T09:18:37Z

    handled all dictionary exception

commit 6e99d68d3d616f5fd3e8a2b1a9dc07eb7edbb22d
Author: foryou2030 <fo...@126.com>
Date:   2016-09-02T09:19:33Z

    add testcase

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #122: [CARBONDATA-202] Handled exception t...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/122#discussion_r77335816
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---
    @@ -791,16 +819,23 @@ object GlobalDictionaryUtil extends Logging {
               if (requireDimension.nonEmpty) {
                 val model = createDictionaryLoadModel(carbonLoadModel, table, requireDimension,
                   hdfsLocation, dictfolderPath, false)
    +            // check if dictionary files contains bad record
    +            val accumulator = sqlContext.sparkContext.accumulator(0)
                 // read local dictionary file, and group by key
                 val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers,
    -              requireColumnNames, allDictionaryPath)
    +              requireColumnNames, allDictionaryPath, accumulator)
                 // read exist dictionary and combine
                 val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model)
                   .partitionBy(new ColumnPartitioner(model.primDimensions.length))
                 // generate global dictionary files
                 val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
                 // check result status
                 checkStatus(carbonLoadModel, sqlContext, model, statusList)
    +            // if the dictionary contains wrong format record, throw ex
    +            if (accumulator.value > 0) {
    +              throw new DataLoadingException("Data Loading failure, the dictionary file " +
    --- End diff --
    
    i think content term will be too generic, we can say dictionary values, Whats your opinion? better to use more meaningful term which is related to that particular file, like Dictionary values


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #122: [CARBONDATA-202] Handled exception t...

Posted by foryou2030 <gi...@git.apache.org>.
Github user foryou2030 commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/122#discussion_r77333560
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---
    @@ -588,30 +588,59 @@ object GlobalDictionaryUtil extends Logging {
                                          allDictionaryPath: String) = {
         var allDictionaryRdd: RDD[(String, Iterable[String])] = null
         try {
    -      // read local dictionary file, and spilt (columnIndex, columnValue)
    -      val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath)
    -        .map(x => {
    +      // parse record and validate record
    +      def parseRecord(x: String, accum: Accumulator[Int]) : (String, String) = {
             val tokens = x.split("" + CSVWriter.DEFAULT_SEPARATOR)
    -        if (tokens.size != 2) {
    -          logError("Read a bad dictionary record: " + x)
    -        }
    -        var columnName: String = CarbonCommonConstants.DEFAULT_COLUMN_NAME
    +        var columnName: String = ""
             var value: String = ""
    -        try {
    -          columnName = csvFileColumns(tokens(0).toInt)
    -          value = tokens(1)
    -        } catch {
    -          case ex: Exception =>
    -            logError("Reset bad dictionary record as default value")
    +        // such as "," , "", throw ex
    +        if (tokens.size == 0) {
    +          logError("Read a bad dictionary record: " + x)
    +          accum += 1
    +        } else if (tokens.size == 1) {
    +          // such as "1", "jone", throw ex
    +          if (x.contains(",") == false) {
    +            accum += 1
    +          } else {
    +            try {
    +              columnName = csvFileColumns(tokens(0).toInt)
    +            } catch {
    +              case ex: Exception =>
    +                logError("Read a bad dictionary record: " + x)
    +                accum += 1
    +            }
    +          }
    +        } else {
    +          try {
    +            columnName = csvFileColumns(tokens(0).toInt)
    +            value = tokens(1)
    +          } catch {
    +            case ex: Exception =>
    +              logError("Read a bad dictionary record: " + x)
    +              accum += 1
    +          }
             }
             (columnName, value)
    -      })
    +      }
     
    +      val accumulator = sqlContext.sparkContext.accumulator(0)
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #122: [CARBONDATA-202] Handled exception t...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/122#discussion_r77332668
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---
    @@ -588,30 +588,59 @@ object GlobalDictionaryUtil extends Logging {
                                          allDictionaryPath: String) = {
         var allDictionaryRdd: RDD[(String, Iterable[String])] = null
         try {
    -      // read local dictionary file, and spilt (columnIndex, columnValue)
    -      val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath)
    -        .map(x => {
    +      // parse record and validate record
    +      def parseRecord(x: String, accum: Accumulator[Int]) : (String, String) = {
             val tokens = x.split("" + CSVWriter.DEFAULT_SEPARATOR)
    -        if (tokens.size != 2) {
    -          logError("Read a bad dictionary record: " + x)
    -        }
    -        var columnName: String = CarbonCommonConstants.DEFAULT_COLUMN_NAME
    +        var columnName: String = ""
             var value: String = ""
    -        try {
    -          columnName = csvFileColumns(tokens(0).toInt)
    -          value = tokens(1)
    -        } catch {
    -          case ex: Exception =>
    -            logError("Reset bad dictionary record as default value")
    +        // such as "," , "", throw ex
    +        if (tokens.size == 0) {
    +          logError("Read a bad dictionary record: " + x)
    +          accum += 1
    +        } else if (tokens.size == 1) {
    +          // such as "1", "jone", throw ex
    +          if (x.contains(",") == false) {
    +            accum += 1
    +          } else {
    +            try {
    +              columnName = csvFileColumns(tokens(0).toInt)
    +            } catch {
    +              case ex: Exception =>
    +                logError("Read a bad dictionary record: " + x)
    +                accum += 1
    +            }
    +          }
    +        } else {
    +          try {
    +            columnName = csvFileColumns(tokens(0).toInt)
    +            value = tokens(1)
    +          } catch {
    +            case ex: Exception =>
    +              logError("Read a bad dictionary record: " + x)
    +              accum += 1
    +          }
             }
             (columnName, value)
    -      })
    +      }
     
    +      val accumulator = sqlContext.sparkContext.accumulator(0)
    +      // read local dictionary file, and spilt (columnIndex, columnValue)
    +      val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath)
    +        .map(x => parseRecord(x, accumulator)).persist()
    +      // for accumulator updates performed inside actions only
    +      basicRdd.count()
    --- End diff --
    
    again we are calling action which is not necessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #122: [CARBONDATA-202] Handled exception t...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-carbondata/pull/122


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #122: [CARBONDATA-202] Handled exception t...

Posted by foryou2030 <gi...@git.apache.org>.
Github user foryou2030 commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/122#discussion_r77336083
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---
    @@ -791,16 +819,23 @@ object GlobalDictionaryUtil extends Logging {
               if (requireDimension.nonEmpty) {
                 val model = createDictionaryLoadModel(carbonLoadModel, table, requireDimension,
                   hdfsLocation, dictfolderPath, false)
    +            // check if dictionary files contains bad record
    +            val accumulator = sqlContext.sparkContext.accumulator(0)
                 // read local dictionary file, and group by key
                 val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers,
    -              requireColumnNames, allDictionaryPath)
    +              requireColumnNames, allDictionaryPath, accumulator)
                 // read exist dictionary and combine
                 val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model)
                   .partitionBy(new ColumnPartitioner(model.primDimensions.length))
                 // generate global dictionary files
                 val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
                 // check result status
                 checkStatus(carbonLoadModel, sqlContext, model, statusList)
    +            // if the dictionary contains wrong format record, throw ex
    +            if (accumulator.value > 0) {
    +              throw new DataLoadingException("Data Loading failure, the dictionary file " +
    --- End diff --
    
    u r right. handled


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #122: [CARBONDATA-202] Handled exception t...

Posted by sujith71955 <gi...@git.apache.org>.
Github user sujith71955 commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/122#discussion_r77332892
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---
    @@ -588,30 +588,59 @@ object GlobalDictionaryUtil extends Logging {
                                          allDictionaryPath: String) = {
         var allDictionaryRdd: RDD[(String, Iterable[String])] = null
         try {
    -      // read local dictionary file, and spilt (columnIndex, columnValue)
    -      val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath)
    -        .map(x => {
    +      // parse record and validate record
    +      def parseRecord(x: String, accum: Accumulator[Int]) : (String, String) = {
             val tokens = x.split("" + CSVWriter.DEFAULT_SEPARATOR)
    -        if (tokens.size != 2) {
    -          logError("Read a bad dictionary record: " + x)
    -        }
    -        var columnName: String = CarbonCommonConstants.DEFAULT_COLUMN_NAME
    +        var columnName: String = ""
             var value: String = ""
    -        try {
    -          columnName = csvFileColumns(tokens(0).toInt)
    -          value = tokens(1)
    -        } catch {
    -          case ex: Exception =>
    -            logError("Reset bad dictionary record as default value")
    +        // such as "," , "", throw ex
    +        if (tokens.size == 0) {
    +          logError("Read a bad dictionary record: " + x)
    +          accum += 1
    +        } else if (tokens.size == 1) {
    +          // such as "1", "jone", throw ex
    +          if (x.contains(",") == false) {
    +            accum += 1
    +          } else {
    +            try {
    +              columnName = csvFileColumns(tokens(0).toInt)
    +            } catch {
    +              case ex: Exception =>
    +                logError("Read a bad dictionary record: " + x)
    +                accum += 1
    +            }
    +          }
    +        } else {
    +          try {
    +            columnName = csvFileColumns(tokens(0).toInt)
    +            value = tokens(1)
    +          } catch {
    +            case ex: Exception =>
    +              logError("Read a bad dictionary record: " + x)
    +              accum += 1
    +          }
             }
             (columnName, value)
    -      })
    +      }
     
    +      val accumulator = sqlContext.sparkContext.accumulator(0)
    --- End diff --
    
    return the accumulator by updating and let caller validate and throw exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-carbondata pull request #122: [CARBONDATA-202] Handled exception t...

Posted by foryou2030 <gi...@git.apache.org>.
Github user foryou2030 commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/122#discussion_r77333578
  
    --- Diff: integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---
    @@ -588,30 +588,59 @@ object GlobalDictionaryUtil extends Logging {
                                          allDictionaryPath: String) = {
         var allDictionaryRdd: RDD[(String, Iterable[String])] = null
         try {
    -      // read local dictionary file, and spilt (columnIndex, columnValue)
    -      val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath)
    -        .map(x => {
    +      // parse record and validate record
    +      def parseRecord(x: String, accum: Accumulator[Int]) : (String, String) = {
             val tokens = x.split("" + CSVWriter.DEFAULT_SEPARATOR)
    -        if (tokens.size != 2) {
    -          logError("Read a bad dictionary record: " + x)
    -        }
    -        var columnName: String = CarbonCommonConstants.DEFAULT_COLUMN_NAME
    +        var columnName: String = ""
             var value: String = ""
    -        try {
    -          columnName = csvFileColumns(tokens(0).toInt)
    -          value = tokens(1)
    -        } catch {
    -          case ex: Exception =>
    -            logError("Reset bad dictionary record as default value")
    +        // such as "," , "", throw ex
    +        if (tokens.size == 0) {
    +          logError("Read a bad dictionary record: " + x)
    +          accum += 1
    +        } else if (tokens.size == 1) {
    +          // such as "1", "jone", throw ex
    +          if (x.contains(",") == false) {
    +            accum += 1
    +          } else {
    +            try {
    +              columnName = csvFileColumns(tokens(0).toInt)
    +            } catch {
    +              case ex: Exception =>
    +                logError("Read a bad dictionary record: " + x)
    +                accum += 1
    +            }
    +          }
    +        } else {
    +          try {
    +            columnName = csvFileColumns(tokens(0).toInt)
    +            value = tokens(1)
    +          } catch {
    +            case ex: Exception =>
    +              logError("Read a bad dictionary record: " + x)
    +              accum += 1
    +          }
             }
             (columnName, value)
    -      })
    +      }
     
    +      val accumulator = sqlContext.sparkContext.accumulator(0)
    +      // read local dictionary file, and spilt (columnIndex, columnValue)
    +      val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath)
    +        .map(x => parseRecord(x, accumulator)).persist()
    +      // for accumulator updates performed inside actions only
    +      basicRdd.count()
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---