You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by maropu <gi...@git.apache.org> on 2017/02/14 15:28:34 UTC

[GitHub] spark pull request #16928: [SPARK-18699][SQL] Fill NULL in a field when dete...

GitHub user maropu opened a pull request:

    https://github.com/apache/spark/pull/16928

    [SPARK-18699][SQL] Fill NULL in a field when detecting a malformed token

    ## What changes were proposed in this pull request?
    This pr added a logic to fill NULL when detecting malformed tokens in case of permissive modes.
    In the current master, if the CSV parser hits these malformed tokens, it throws an exception below (and then a job fails);
    ```
    Caused by: java.lang.IllegalArgumentException
    	at java.sql.Date.valueOf(Date.java:143)
    	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137)
    	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply$mcJ$sp(CSVInferSchema.scala:272)
    	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272)
    	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272)
    	at scala.util.Try.getOrElse(Try.scala:79)
    	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:269)
    	at 
    ```
    In case that users load large CSV-formatted data, the job failure caused by a few malformed tokens makes users get some confused. So, this fix puts NULL for the tokens.
    
    ## How was this patch tested?
    Added tests in `CSVSuite`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/maropu/spark SPARK-18699-2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16928.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16928
    
----
commit 266819a8e597d74be5ff2001afb638ee72bcdac6
Author: Takeshi Yamamuro <ya...@apache.org>
Date:   2017-02-14T13:26:05Z

    Fill NULL in a field when detecting a malformed token

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73173 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73173/testReport)** for PR 16928 at commit [`2a29b4a`](https://github.com/apache/spark/commit/2a29b4ab4b8defe3ced8112c17d47367e167de8e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102644680
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +45,41 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType)
    +    require(schema(corrFieldIndex).nullable)
    +  }
    +
    +  private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    --- End diff --
    
    json doesn't do this, why is this difference? cc @HyukjinKwon 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102683691
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +212,41 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && dataSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive) {
    +        // If a length of parsed tokens is not equal to expected one, it makes the length the same
    +        // with the expected. If the length is shorter, it adds extra tokens in the tail.
    +        // If longer, it drops extra tokens.
    +        val lengthSafeTokens = if (dataSchema.length > tokens.length) {
    +          tokens ++ new Array[String](dataSchema.length - tokens.length)
    +        } else if (dataSchema.length < tokens.length) {
    +          tokens.take(dataSchema.length)
    +        } else {
    +          tokens
    +        }
    +
    +        // If we need to handle corrupt fields, it adds an extra token to skip a field for malformed
    +        // strings when loading parsed tokens into a resulting `row`.
    +        corruptFieldIndex.map { corrFieldIndex =>
    +          val (front, back) = lengthSafeTokens.splitAt(corrFieldIndex)
    +          front ++ new Array[String](1) ++ back
    --- End diff --
    
    We value on the committer's opinion. I am fine if we revert. I personally prefer 1) revert this change then if this sounds not good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r120023340
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -191,10 +191,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
             :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                          set, it uses the default value, ``PERMISSIVE``.
     
    -                *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    -                  record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    +                 record, and puts the malformed string into a field configured by \
    +                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
    +                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
    +                 schema. If a schema does not have the field, it drops corrupt records during \
    +                 parsing. When inferring a schema, it implicitly adds a \
    +                 ``columnNameOfCorruptRecord`` field in an output schema.
    --- End diff --
    
    (Sorry for interrupting) yea, it should be consistent and we probably should change. Probably, we should also consider the records with tokens less or more than the schema as malformed records in PERMISSIVE mode rafher than filling some of it. @cloud-fan raised this issue before and I had a talk with some data analysists. It looked some agree and others do not. So, I just decided to not change the current behaviour for now.
    
    To cut it short, the reason (I assume) is I could not imagine a simple common case that fails to parse CSV (not during conversion) for the current implementation. If there are, we should match the behaviour. 
    
    I am currently outside and this is my phone. I will double check this when I get to my computer but this will be correct if I haven't missed some changes in this code path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73348 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73348/testReport)** for PR 16928 at commit [`3d514e5`](https://github.com/apache/spark/commit/3d514e546a77a3c73b8c7b2668932a90c1d92fd8).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73168 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73168/testReport)** for PR 16928 at commit [`fde8cc7`](https://github.com/apache/spark/commit/fde8cc78b1ad8019f6d13002a944bdec34a808cc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102645010
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +45,41 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType)
    +    require(schema(corrFieldIndex).nullable)
    +  }
    +
    +  private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    +
       private val valueConverters =
    -    schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
    +    inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
     
       private val parser = new CsvParser(options.asParserSettings)
     
       private var numMalformedRecords = 0
     
       private val row = new GenericInternalRow(requiredSchema.length)
     
    -  private val indexArr: Array[Int] = {
    +  // This parser loads an `indexArr._1`-th position value in input tokens,
    +  // then put the value in `row(indexArr._2)`.
    +  private val indexArr: Array[(Int, Int)] = {
         val fields = if (options.dropMalformed) {
           // If `dropMalformed` is enabled, then it needs to parse all the values
           // so that we can decide which row is malformed.
           requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
         } else {
           requiredSchema
         }
    -    fields.map(schema.indexOf(_: StructField)).toArray
    +    val fieldsWithIndexes = fields.zipWithIndex
    +    corruptFieldIndex.map { case corrFieldIndex =>
    +      fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex }
    +    }.getOrElse {
    +      fieldsWithIndexes
    +    }.map { case (f, i) =>
    +      (inputSchema.indexOf(f), i)
    +    }.toArray
    --- End diff --
    
    SGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #72991 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72991/testReport)** for PR 16928 at commit [`df39e39`](https://github.com/apache/spark/commit/df39e3934b2f3948847c0e9177155f940be949b5).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #72993 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72993/testReport)** for PR 16928 at commit [`df39e39`](https://github.com/apache/spark/commit/df39e3934b2f3948847c0e9177155f940be949b5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102366449
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -147,8 +165,6 @@ private[csv] class UnivocityParser(
     
         case udt: UserDefinedType[_] => (datum: String) =>
           makeConverter(name, udt.sqlType, nullable, options)
    -
    -    case _ => throw new RuntimeException(s"Unsupported type: ${dataType.typeName}")
    --- End diff --
    
    Hm, wouldn't removing it cause `MatchError`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102658185
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -193,8 +193,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
     
                     *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
                       record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                 ``columnNameOfCorruptRecord``. An user-defined schema can include \
    +                 a string type field named ``columnNameOfCorruptRecord`` for corrupt records. \
    +                 When a schema is set by user, it sets ``null`` for extra fields.
    --- End diff --
    
    does json have similar behavior?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73353/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r120024721
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -191,10 +191,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
             :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                          set, it uses the default value, ``PERMISSIVE``.
     
    -                *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    -                  record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    +                 record, and puts the malformed string into a field configured by \
    +                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
    +                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
    +                 schema. If a schema does not have the field, it drops corrupt records during \
    +                 parsing. When inferring a schema, it implicitly adds a \
    +                 ``columnNameOfCorruptRecord`` field in an output schema.
    --- End diff --
    
    It has more than one issue here. The default of `columnNameOfCorruptRecord` does not respect the session conf `spark.sql.columnNameOfCorruptRecord`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Fill NULL in a field when detecting a...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72877/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r120044161
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -191,10 +191,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
             :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                          set, it uses the default value, ``PERMISSIVE``.
     
    -                *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    -                  record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    +                 record, and puts the malformed string into a field configured by \
    +                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
    +                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
    +                 schema. If a schema does not have the field, it drops corrupt records during \
    +                 parsing. When inferring a schema, it implicitly adds a \
    +                 ``columnNameOfCorruptRecord`` field in an output schema.
    --- End diff --
    
    Sorry for my late response. yea, I also think these behaviour should be the same. But, I tried though in this pr though, I couldn't because (both you already noticed this...) we couldn't easily add a new column in the CSV code path. So, I think we probably need some refactoring to make this behaviour consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Fill NULL in a field when detecting a...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    @HyukjinKwon Could you check this and give me any insight before committers do?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r120027093
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -191,10 +191,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
             :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                          set, it uses the default value, ``PERMISSIVE``.
     
    -                *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    -                  record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    +                 record, and puts the malformed string into a field configured by \
    +                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
    +                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
    +                 schema. If a schema does not have the field, it drops corrupt records during \
    +                 parsing. When inferring a schema, it implicitly adds a \
    +                 ``columnNameOfCorruptRecord`` field in an output schema.
    --- End diff --
    
    @gatorsmile, I just got to my laptop.
    
    I checked when the length of tokens are more than the schema it fills the malformed column. with the data below:
    
    ```
    a,a
    ```
    
    (BTW, it looks respecting `spark.sql.columnNameOfCorruptRecord` ?)
    
    ```scala
    scala> spark.read.schema("a string, _corrupt_record string").csv("test.csv").show()
    +---+---------------+
    |  a|_corrupt_record|
    +---+---------------+
    |  a|            a,a|
    +---+---------------+
    ```
    
    ```scala
    scala> spark.conf.set("spark.sql.columnNameOfCorruptRecord", "abc")
    
    scala> spark.read.schema("a string, abc string").csv("test.csv").show()
    +---+---+
    |  a|abc|
    +---+---+
    |  a|a,a|
    +---+---+
    ```
    
    And, I found another bug (when the length is less then the schema):
    
    with data
    
    ```
    a
    a
    a
    a
    a
    ```
    
    ```scala
    scala> spark.read.schema("a string, b string, _corrupt_record string").csv("test.csv").show()
    ```
    
    prints ... 
    
    ```
    17/06/05 09:45:26 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
    java.lang.NullPointerException
    	at scala.collection.immutable.StringLike$class.stripLineEnd(StringLike.scala:89)
    	at scala.collection.immutable.StringOps.stripLineEnd(StringOps.scala:29)
    	at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$getCurrentInput(UnivocityParser.scala:56)
    	at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert$1.apply(UnivocityParser.scala:211)
    	at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$org$apache$spark$sql$execution$datasources$csv$UnivocityParser$$convert$1.apply(UnivocityParser.scala:211)
    	at org.apache.spark.sql.execution.datasources.FailureSafeParser$$anonfun$2.apply(FailureSafeParser.scala:50)
    	at org.apache.spark.sql.execution.datasources.FailureSafeParser$$anonfun$2.apply(FailureSafeParser.scala:43)
    	at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:64)
    	at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$parseIterator$1.apply(UnivocityParser.scala:312)
    	at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$parseIterator$1.apply(UnivocityParser.scala:312)
    	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
    	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
    	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
    	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:236)
    	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:230)
    	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    	at org.apache.spark.scheduler.Task.run(Task.scala:108)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:745)
    ```
    
    It looks `getCurrentInput` produces `null` as the input is all parsed.
    
    Another thing I would like to leave is, JSON produces `null` in the columns and put the contents in the malformed column:
    With the input:
    
    ```json
    {"a": 1, "b": "a"}
    ```
    
    ```scala
    scala> spark.read.json("test.json").show()
    +---+---+
    |  a|  b|
    +---+---+
    |  1|  a|
    +---+---+
    ```
    
    
    ```scala
    scala> spark.read.schema("a string, b int, _corrupt_record string").json("test.json").show()
    +----+----+------------------+
    |   a|   b|   _corrupt_record|
    +----+----+------------------+
    |null|null|{"a": 1, "b": "a"}|
    +----+----+------------------+
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r120024037
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -191,10 +191,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
             :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                          set, it uses the default value, ``PERMISSIVE``.
     
    -                *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    -                  record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    +                 record, and puts the malformed string into a field configured by \
    +                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
    +                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
    +                 schema. If a schema does not have the field, it drops corrupt records during \
    +                 parsing. When inferring a schema, it implicitly adds a \
    +                 ``columnNameOfCorruptRecord`` field in an output schema.
    --- End diff --
    
    The records with tokens less or more than the schema are already viewed as malformed records in (at least) 2.2. I did not check the previous versions.
    
    I think we need to implicitly add the column `columnNameOfCorruptRecord ` during the schema inference too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73331/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102657214
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +212,41 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && dataSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive) {
    +        // If a length of parsed tokens is not equal to expected one, it makes the length the same
    +        // with the expected. If the length is shorter, it adds extra tokens in the tail.
    +        // If longer, it drops extra tokens.
    --- End diff --
    
    we should revisit this in the future. If the token length doesn't match the expected schema, we should treat it as a malformed record. cc @HyukjinKwon 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73249/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102490842
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala ---
    @@ -958,4 +959,59 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
           checkAnswer(df, Row(1, null))
         }
       }
    +
    +  test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") {
    +    val schema = new StructType().add("a", IntegerType).add("b", TimestampType)
    +    val df1 = spark
    +      .read
    +      .option("mode", "PERMISSIVE")
    +      .schema(schema)
    +      .csv(testFile(valueMalformedFile))
    +    checkAnswer(df1,
    +      Row(null, null) ::
    +      Row(1, java.sql.Date.valueOf("1983-08-04")) ::
    +      Nil)
    +
    +    // If `schema` has `columnNameOfCorruptRecord`, it should handle corrupt records
    +    val columnNameOfCorruptRecord = "_unparsed"
    +    val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType)
    +    val df2 = spark
    +      .read
    +      .option("mode", "PERMISSIVE")
    +      .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
    +      .schema(schemaWithCorrField1)
    +      .csv(testFile(valueMalformedFile))
    +    checkAnswer(df2,
    +      Row(null, null, "0,2013-111-11 12:13:14") ::
    +      Row(1, java.sql.Date.valueOf("1983-08-04"), null) ::
    +      Nil)
    +
    +    // We put a `columnNameOfCorruptRecord` field in the middle of a schema
    +    new StructType
    --- End diff --
    
    It seems mistakenly added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73172 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73172/testReport)** for PR 16928 at commit [`2fd9275`](https://github.com/apache/spark/commit/2fd9275893ca9a688dcde88abfbe75dee2b2cf58).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73331 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73331/testReport)** for PR 16928 at commit [`512fb42`](https://github.com/apache/spark/commit/512fb42404fee1c702bc9e18ad36f15da9e0b273).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72971/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Fill NULL in a field when detecting a...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102363585
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +46,41 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType,
    +      "A field for corrupt records must have a string type")
    --- End diff --
    
    this is a sanity check as it should be guaranteed at analysis phase, we don't need to specify an error message here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73168/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Fill NULL in a field when detecting a...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #72877 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72877/testReport)** for PR 16928 at commit [`266819a`](https://github.com/apache/spark/commit/266819a8e597d74be5ff2001afb638ee72bcdac6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73172 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73172/testReport)** for PR 16928 at commit [`2fd9275`](https://github.com/apache/spark/commit/2fd9275893ca9a688dcde88abfbe75dee2b2cf58).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102656966
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -193,8 +193,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
     
                     *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
                       record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                 ``columnNameOfCorruptRecord``. An user-defined schema can include \
    +                 a string type field named ``columnNameOfCorruptRecord`` for corrupt records. \
    +                 When a schema is set by user, it sets ``null`` for extra fields.
    --- End diff --
    
    No. The two mode does not set null. In `failFast` mode, it fails a job. In `dropMalformed` mode, it drops the malformed lines whose length is shorter or longer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72969/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73250/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73168 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73168/testReport)** for PR 16928 at commit [`fde8cc7`](https://github.com/apache/spark/commit/fde8cc78b1ad8019f6d13002a944bdec34a808cc).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101910256
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---
    @@ -101,6 +101,11 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
         val broadcastedHadoopConf =
           sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     
    +    val parsedOptions = new CSVOptions(
    +      options,
    +      sparkSession.sessionState.conf.sessionLocalTimeZone,
    +      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
    --- End diff --
    
    (It seems `CSVOptions` is created twice above :)).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102638266
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -304,7 +304,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
                 comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
                 ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
                 negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
    -            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
    +            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None,
    +            columnNameOfCorruptRecord=None):
    --- End diff --
    
    okay, I'll check soon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Fill NULL in a field when detecting a...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Aha, looks good to me. Just a sec, and I'll modify the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73335 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73335/testReport)** for PR 16928 at commit [`512fb42`](https://github.com/apache/spark/commit/512fb42404fee1c702bc9e18ad36f15da9e0b273).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102660065
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,6 +45,14 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType)
    +    require(schema(corrFieldIndex).nullable)
    +  }
    +
    +  private val dataSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    --- End diff --
    
    I just realised now we only use the length of `dataSchema` now. Could we just use the length if more commits should be pushed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101911095
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +214,30 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && inputSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      if (options.permissive && shouldHandleCorruptRecord) {
    +        row.setNullAt(corruptIndex)
    +      }
    +      val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) {
    +        tokens ++ new Array[String](inputSchema.length - tokens.length)
    +      } else if (options.permissive && inputSchema.length < tokens.length) {
    +        tokens.take(inputSchema.length)
           } else {
             tokens
           }
     
           try {
             Some(convert(checkedTokens))
           } catch {
    +        // We only catch exceptions about malformed values here and pass over other exceptions
    +        // (e.g., SparkException about unsupported types).
    +        case _: NumberFormatException | _: IllegalArgumentException
    +            if options.permissive && shouldHandleCorruptRecord =>
    +          row(corruptIndex) = UTF8String.fromString(tokens.mkString(options.delimiter.toString))
    --- End diff --
    
    Ah, so it had to be manually concatenated. I think it is okay to rename `convertWithParseMode` to `parseWithMode` and make it take `intput: String` as an argument. I worry if the original `input` is different with what is stored in the corrupt column.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    @cloud-fan okay, so I'll make this pr pending for now. Then, I'll make a new pr to fix the json behaivour.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102029272
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala ---
    @@ -95,6 +104,9 @@ private[csv] class CSVOptions(
       val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
       val permissive = ParseModes.isPermissiveMode(parseMode)
     
    +  val columnNameOfCorruptRecord =
    +    parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)
    --- End diff --
    
    Added doc descriptions in `readwriter.py`, `DataFrameReader`, and `DataStreamReader `.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r120027395
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -191,10 +191,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
             :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                          set, it uses the default value, ``PERMISSIVE``.
     
    -                *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    -                  record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    +                 record, and puts the malformed string into a field configured by \
    +                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
    +                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
    +                 schema. If a schema does not have the field, it drops corrupt records during \
    +                 parsing. When inferring a schema, it implicitly adds a \
    +                 ``columnNameOfCorruptRecord`` field in an output schema.
    --- End diff --
    
    Let me give a shot to fix the bug I found above (second case). I think this can be easily fixed (but I am pretty sure the behaviour could be arguable). I will open a PR and cc you to show what it looks like.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102648114
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +45,41 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType)
    +    require(schema(corrFieldIndex).nullable)
    +  }
    +
    +  private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    +
       private val valueConverters =
    -    schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
    +    inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
     
       private val parser = new CsvParser(options.asParserSettings)
     
       private var numMalformedRecords = 0
     
       private val row = new GenericInternalRow(requiredSchema.length)
     
    -  private val indexArr: Array[Int] = {
    +  // This parser loads an `indexArr._1`-th position value in input tokens,
    +  // then put the value in `row(indexArr._2)`.
    +  private val indexArr: Array[(Int, Int)] = {
         val fields = if (options.dropMalformed) {
           // If `dropMalformed` is enabled, then it needs to parse all the values
           // so that we can decide which row is malformed.
           requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
         } else {
           requiredSchema
         }
    -    fields.map(schema.indexOf(_: StructField)).toArray
    +    val fieldsWithIndexes = fields.zipWithIndex
    +    corruptFieldIndex.map { case corrFieldIndex =>
    +      fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex }
    +    }.getOrElse {
    +      fieldsWithIndexes
    +    }.map { case (f, i) =>
    +      (inputSchema.indexOf(f), i)
    +    }.toArray
    --- End diff --
    
    okay, I'll update


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102661203
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -193,8 +193,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
     
                     *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
                       record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                 ``columnNameOfCorruptRecord``. An user-defined schema can include \
    --- End diff --
    
    okay, I'll brush up


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73170 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73170/testReport)** for PR 16928 at commit [`1956c63`](https://github.com/apache/spark/commit/1956c630b1e73e14376804b4bbf6adbf265bd086).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Fill NULL in a field when detecting a...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #72877 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72877/testReport)** for PR 16928 at commit [`266819a`](https://github.com/apache/spark/commit/266819a8e597d74be5ff2001afb638ee72bcdac6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r120025689
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -191,10 +191,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
             :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                          set, it uses the default value, ``PERMISSIVE``.
     
    -                *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    -                  record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    +                 record, and puts the malformed string into a field configured by \
    +                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
    +                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
    +                 schema. If a schema does not have the field, it drops corrupt records during \
    +                 parsing. When inferring a schema, it implicitly adds a \
    +                 ``columnNameOfCorruptRecord`` field in an output schema.
    --- End diff --
    
    Now, users have to manually add the column `columnNameOfCorruptRecord` for seeing these malformed records.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102646058
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -190,8 +208,9 @@ private[csv] class UnivocityParser(
       }
     
       private def convertWithParseMode(
    -      tokens: Array[String])(convert: Array[String] => InternalRow): Option[InternalRow] = {
    -    if (options.dropMalformed && schema.length != tokens.length) {
    +      input: String)(convert: Array[String] => InternalRow): Option[InternalRow] = {
    +    val tokens = parser.parseLine(input)
    +    if (options.dropMalformed && inputSchema.length != tokens.length) {
    --- End diff --
    
    I think it's more readable to write `options.dropMalformed && corruptFieldIndex.isDefiend`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102366908
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---
    @@ -96,31 +96,44 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
           filters: Seq[Filter],
           options: Map[String, String],
           hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
    -    val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
    -
    +    CSVUtils.verifySchema(dataSchema)
         val broadcastedHadoopConf =
           sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     
    +    val parsedOptions = new CSVOptions(
    +      options,
    +      sparkSession.sessionState.conf.sessionLocalTimeZone,
    +      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
    +
    +    // Check a field requirement for corrupt records here to throw an exception in a driver side
    +    dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).map { corruptFieldIndex =>
    +      val f = dataSchema(corruptFieldIndex)
    +      if (f.dataType != StringType || !f.nullable) {
    --- End diff --
    
    This check seems duplicated with https://github.com/apache/spark/pull/16928/files/4eed4a4bea927c6365dac2e2734c895a0a1a0026#diff-d19881aceddcaa5c60620fdcda99b4c4R51


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102653357
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +212,41 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && dataSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive) {
    +        // If a length of parsed tokens is not equal to expected one, it makes the length the same
    +        // with the expected. If the length is shorter, it adds extra tokens in the tail.
    +        // If longer, it drops extra tokens.
    +        val lengthSafeTokens = if (dataSchema.length > tokens.length) {
    +          tokens ++ new Array[String](dataSchema.length - tokens.length)
    +        } else if (dataSchema.length < tokens.length) {
    +          tokens.take(dataSchema.length)
    +        } else {
    +          tokens
    +        }
    +
    +        // If we need to handle corrupt fields, it adds an extra token to skip a field for malformed
    --- End diff --
    
    @HyukjinKwon This fix satisfies your intention? I slightly modified code based on your code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Fill NULL in a field when detecting a...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73170/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102372757
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +46,41 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType,
    +      "A field for corrupt records must have a string type")
    +    require(schema(corrFieldIndex).nullable, "A field for corrupt must be nullable")
    +  }
    +
    +  private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    +  CSVUtils.verifySchema(inputSchema)
    +
       private val valueConverters =
    -    schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
    +    inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
     
       private val parser = new CsvParser(options.asParserSettings)
     
       private var numMalformedRecords = 0
     
       private val row = new GenericInternalRow(requiredSchema.length)
     
    -  private val indexArr: Array[Int] = {
    +  private val indexArr: Array[(Int, Int)] = {
    --- End diff --
    
    @HyukjinKwon Sorry, but my bad. we use the second index here: https://github.com/apache/spark/pull/16928/files#diff-d19881aceddcaa5c60620fdcda99b4c4R202.
    I added a test for this case in the latest commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101894565
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala ---
    @@ -26,7 +26,7 @@ import org.apache.commons.lang3.time.FastDateFormat
     import org.apache.spark.internal.Logging
     import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes}
     
    -private[csv] class CSVOptions(
    +private[sql] class CSVOptions(
    --- End diff --
    
    `execution` package itself is meant to be a private according to https://github.com/apache/spark/commit/511f52f8423e151b0d0133baf040d34a0af3d422 if this should be accessible other than `csv` package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73322 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73322/testReport)** for PR 16928 at commit [`c86febe`](https://github.com/apache/spark/commit/c86febe6b018faafa62e0bf6444f8cd4326fb021).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101894427
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -222,12 +250,6 @@ private[csv] class UnivocityParser(
                 logWarning("Parse exception. " +
                   s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}")
               }
    -          if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) {
    --- End diff --
    
    Maybe, I missed something. Do you mind if I ask elaborate why it is removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101911173
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +214,30 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && inputSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      if (options.permissive && shouldHandleCorruptRecord) {
    +        row.setNullAt(corruptIndex)
    +      }
    +      val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) {
    +        tokens ++ new Array[String](inputSchema.length - tokens.length)
    +      } else if (options.permissive && inputSchema.length < tokens.length) {
    +        tokens.take(inputSchema.length)
           } else {
             tokens
           }
     
           try {
             Some(convert(checkedTokens))
           } catch {
    +        // We only catch exceptions about malformed values here and pass over other exceptions
    +        // (e.g., SparkException about unsupported types).
    +        case _: NumberFormatException | _: IllegalArgumentException
    +            if options.permissive && shouldHandleCorruptRecord =>
    +          row(corruptIndex) = UTF8String.fromString(tokens.mkString(options.delimiter.toString))
    --- End diff --
    
    Then,.. we would be able to do as below:
    
    ```scala
    case NonFatal(e) if options.permissive =>
        val row = new GenericInternalRow(requiredSchema.length)
        corruptFieldIndex.foreach { idx =>
          row(idx) = UTF8String.fromString(input)
        }
    ```
    
    For unsupported type, could we utilize `CSVUtils.verifySchema`? I proposed related change in https://github.com/apache/spark/pull/15751/files#diff-a549ac2e19ee7486911e2e6403444d9d
    
    If you add the change here, I will take this out in my PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Thanks, let me review further within tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    @HyukjinKwon okay, thanks! I'll check soon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102367601
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -147,8 +165,6 @@ private[csv] class UnivocityParser(
     
         case udt: UserDefinedType[_] => (datum: String) =>
           makeConverter(name, udt.sqlType, nullable, options)
    -
    -    case _ => throw new RuntimeException(s"Unsupported type: ${dataType.typeName}")
    --- End diff --
    
    I think we should keep it, although theoretically we won't hit it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r120027228
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -191,10 +191,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
             :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                          set, it uses the default value, ``PERMISSIVE``.
     
    -                *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    -                  record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    +                 record, and puts the malformed string into a field configured by \
    +                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
    +                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
    +                 schema. If a schema does not have the field, it drops corrupt records during \
    +                 parsing. When inferring a schema, it implicitly adds a \
    +                 ``columnNameOfCorruptRecord`` field in an output schema.
    --- End diff --
    
    Oh.. I was writing the comments before seeing your comments ...  Yes, I agree with your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r120021921
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -191,10 +191,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
             :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                          set, it uses the default value, ``PERMISSIVE``.
     
    -                *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    -                  record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    +                 record, and puts the malformed string into a field configured by \
    +                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
    +                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
    +                 schema. If a schema does not have the field, it drops corrupt records during \
    +                 parsing. When inferring a schema, it implicitly adds a \
    +                 ``columnNameOfCorruptRecord`` field in an output schema.
    --- End diff --
    
    @maropu For JSON, we implicitly adds the ``columnNameOfCorruptRecord`` field in schema inference. What is the reason we are not doing the same thing for CSV? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73251 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73251/testReport)** for PR 16928 at commit [`619094a`](https://github.com/apache/spark/commit/619094a4dbb0e400daac0d94905b40df07b650b6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101894380
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---
    @@ -101,6 +101,12 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
         val broadcastedHadoopConf =
           sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     
    +    val columnNameOfCorruptRecord = csvOptions.columnNameOfCorruptRecord.getOrElse(sparkSession
    +      .sessionState.conf.columnNameOfCorruptRecord)
    --- End diff --
    
    Maybe, we could put `columnNameOfCorruptRecord` as an argument in `CSVOptions` consistently with `JSONOptions`. I guess then it won't require the changes between 129L - 133L as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73167 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73167/testReport)** for PR 16928 at commit [`4df4bc6`](https://github.com/apache/spark/commit/4df4bc6ed6614b622ea1fefb6427d46aa82d0d83).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #72993 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72993/testReport)** for PR 16928 at commit [`df39e39`](https://github.com/apache/spark/commit/df39e3934b2f3948847c0e9177155f940be949b5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73328 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73328/testReport)** for PR 16928 at commit [`8d9386a`](https://github.com/apache/spark/commit/8d9386abea0941a40a89fd4860c5568ec55d7d95).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73322/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73322 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73322/testReport)** for PR 16928 at commit [`c86febe`](https://github.com/apache/spark/commit/c86febe6b018faafa62e0bf6444f8cd4326fb021).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102643961
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -367,10 +368,18 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
                              If None is set, it uses the default value, session local timezone.
     
                     * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
    -                    When a schema is set by user, it sets ``null`` for extra fields.
    +                    If users set a string type field named ``columnNameOfCorruptRecord`` in a
    --- End diff --
    
    okay


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    The CSV behavior makes more sense, we should send a new PR for json to fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #72991 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72991/testReport)** for PR 16928 at commit [`df39e39`](https://github.com/apache/spark/commit/df39e3934b2f3948847c0e9177155f940be949b5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102636720
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +45,41 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType)
    +    require(schema(corrFieldIndex).nullable)
    +  }
    +
    +  private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    +
       private val valueConverters =
    -    schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
    +    inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
     
       private val parser = new CsvParser(options.asParserSettings)
     
       private var numMalformedRecords = 0
     
       private val row = new GenericInternalRow(requiredSchema.length)
     
    -  private val indexArr: Array[Int] = {
    +  // This parser loads an `indexArr._1`-th position value in input tokens,
    +  // then put the value in `row(indexArr._2)`.
    +  private val indexArr: Array[(Int, Int)] = {
         val fields = if (options.dropMalformed) {
           // If `dropMalformed` is enabled, then it needs to parse all the values
           // so that we can decide which row is malformed.
           requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
         } else {
           requiredSchema
         }
    -    fields.map(schema.indexOf(_: StructField)).toArray
    +    val fieldsWithIndexes = fields.zipWithIndex
    +    corruptFieldIndex.map { case corrFieldIndex =>
    +      fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex }
    +    }.getOrElse {
    +      fieldsWithIndexes
    +    }.map { case (f, i) =>
    +      (inputSchema.indexOf(f), i)
    +    }.toArray
    --- End diff --
    
    @maropu, I understand it is nice to avoid per-record computation. However, I feel I really want to clean this part up (as I had a hard time to clean this path..). I am not confident enough for the current status here but I don't have a better idea to deal only with this in this way.
    
    How about putting this into `convertWithParseMode`? I tried a rough change for the suggestion to refer above - https://github.com/apache/spark/compare/619094a...HyukjinKwon:another-suggestion?expand=1
    
    Actually, we can avoid more per-record computation there in making safe tokens as well (such as if-condition with parse mode). 
    
    We could add it there for now and maybe I can resolve all things related with per-record computation in another PR. 
    
    Do you think this makes sense? WDYT @cloud-fan?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Fill NULL in a field when detecting a...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #72971 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72971/testReport)** for PR 16928 at commit [`f09a899`](https://github.com/apache/spark/commit/f09a8992d631694915bef3cec657713c0da4ea32).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102644140
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +45,41 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType)
    +    require(schema(corrFieldIndex).nullable)
    +  }
    +
    +  private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    +
       private val valueConverters =
    -    schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
    +    inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
     
       private val parser = new CsvParser(options.asParserSettings)
     
       private var numMalformedRecords = 0
     
       private val row = new GenericInternalRow(requiredSchema.length)
     
    -  private val indexArr: Array[Int] = {
    +  // This parser loads an `indexArr._1`-th position value in input tokens,
    +  // then put the value in `row(indexArr._2)`.
    +  private val indexArr: Array[(Int, Int)] = {
         val fields = if (options.dropMalformed) {
           // If `dropMalformed` is enabled, then it needs to parse all the values
           // so that we can decide which row is malformed.
           requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
         } else {
           requiredSchema
         }
    -    fields.map(schema.indexOf(_: StructField)).toArray
    +    val fieldsWithIndexes = fields.zipWithIndex
    +    corruptFieldIndex.map { case corrFieldIndex =>
    +      fieldsWithIndexes.filter { case (_, i) => i != corrFieldIndex }
    +    }.getOrElse {
    +      fieldsWithIndexes
    +    }.map { case (f, i) =>
    +      (inputSchema.indexOf(f), i)
    +    }.toArray
    --- End diff --
    
    Per 
    
    > a string converter in a corrupt field a bit looks weird,
    
    We are already filling up the tokens in permissive modes. We could make this NOOP too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102648089
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +221,25 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && inputSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) {
    +        tokens ++ new Array[String](inputSchema.length - tokens.length)
    +      } else if (options.permissive && inputSchema.length < tokens.length) {
    +        tokens.take(inputSchema.length)
           } else {
             tokens
           }
     
           try {
             Some(convert(checkedTokens))
           } catch {
    +        case NonFatal(e) if options.permissive =>
    +          val row = new GenericInternalRow(requiredSchema.length)
    +          corruptFieldIndex.map(row(_) = UTF8String.fromString(input))
    --- End diff --
    
    BTW, I would like to note that this change is actually an important point that we should avoid. This could cause some bugs that are hard to figure out. (see [SPARK-16694](https://issues.apache.org/jira/browse/SPARK-16694))


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101910249
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---
    @@ -101,6 +101,11 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
         val broadcastedHadoopConf =
           sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     
    +    val parsedOptions = new CSVOptions(
    +      options,
    +      sparkSession.sessionState.conf.sessionLocalTimeZone,
    +      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
    --- End diff --
    
    (It seems `CSVOptions` is created twice above :)).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102656121
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -193,8 +193,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
     
                     *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
                       record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                 ``columnNameOfCorruptRecord``. An user-defined schema can include \
    +                 a string type field named ``columnNameOfCorruptRecord`` for corrupt records. \
    +                 When a schema is set by user, it sets ``null`` for extra fields.
    --- End diff --
    
    what about the other 2 modes? do they also set null for extra fields?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Fill NULL in a field when detecting a...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73249 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73249/testReport)** for PR 16928 at commit [`8e83522`](https://github.com/apache/spark/commit/8e8352259d9eac1eb4ac973811379af7dfd6ed83).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73170 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73170/testReport)** for PR 16928 at commit [`1956c63`](https://github.com/apache/spark/commit/1956c630b1e73e14376804b4bbf6adbf265bd086).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102364069
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +46,41 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType,
    +      "A field for corrupt records must have a string type")
    +    require(schema(corrFieldIndex).nullable, "A field for corrupt must be nullable")
    +  }
    +
    +  private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    +  CSVUtils.verifySchema(inputSchema)
    +
       private val valueConverters =
    -    schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
    +    inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
     
       private val parser = new CsvParser(options.asParserSettings)
     
       private var numMalformedRecords = 0
     
       private val row = new GenericInternalRow(requiredSchema.length)
     
    -  private val indexArr: Array[Int] = {
    +  private val indexArr: Array[(Int, Int)] = {
         val fields = if (options.dropMalformed) {
           // If `dropMalformed` is enabled, then it needs to parse all the values
           // so that we can decide which row is malformed.
           requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
         } else {
           requiredSchema
         }
    -    fields.map(schema.indexOf(_: StructField)).toArray
    +    val fieldsWithIndexes = fields.zipWithIndex
    --- End diff --
    
    isn't it just `fields.map(inputSchema.indexOf(_: StructField)).toArray`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102362652
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -303,8 +303,9 @@ def text(self, paths):
         def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
                 comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
                 ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
    -            negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
    -            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
    +            negativeInf=None, columnNameOfCorruptRecord=None, dateFormat=None,
    --- End diff --
    
    okay, I'll fix soon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73348 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73348/testReport)** for PR 16928 at commit [`3d514e5`](https://github.com/apache/spark/commit/3d514e546a77a3c73b8c7b2668932a90c1d92fd8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101911057
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +45,39 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val shouldHandleCorruptRecord = options.permissive && requiredSchema.exists { f =>
    +    f.name == options.columnNameOfCorruptRecord && f.dataType == StringType && f.nullable
    +  }
    +
    +  private val inputSchema = if (shouldHandleCorruptRecord) {
    +    StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    +  } else {
    +    schema
    +  }
    +
       private val valueConverters =
    -    schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
    +    inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
     
       private val parser = new CsvParser(options.asParserSettings)
     
       private var numMalformedRecords = 0
     
       private val row = new GenericInternalRow(requiredSchema.length)
     
    -  private val indexArr: Array[Int] = {
    +  private val corruptIndex =
    +    requiredSchema.getFieldIndex(options.columnNameOfCorruptRecord).getOrElse(-1)
    --- End diff --
    
    So, as to https://github.com/apache/spark/pull/16928/files#r101911027, I suggest to change this to something like ...
    
    ```scala
    private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    corruptFieldIndex.foreach { idx =>
      require(schema(idx).dataType == StringType && schema(idx).nullable)
      require(schema(idx).nullable)
      // Corrupted schema should be added at the last.
      require(idx == schema.length - 1)
    }
    ```
    
    Then, it would not require the changes 48L - 57L, 78L - 80L, 191L & 186 and etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102667789
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +212,41 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && dataSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive) {
    +        // If a length of parsed tokens is not equal to expected one, it makes the length the same
    +        // with the expected. If the length is shorter, it adds extra tokens in the tail.
    +        // If longer, it drops extra tokens.
    +        val lengthSafeTokens = if (dataSchema.length > tokens.length) {
    +          tokens ++ new Array[String](dataSchema.length - tokens.length)
    +        } else if (dataSchema.length < tokens.length) {
    +          tokens.take(dataSchema.length)
    +        } else {
    +          tokens
    +        }
    +
    +        // If we need to handle corrupt fields, it adds an extra token to skip a field for malformed
    +        // strings when loading parsed tokens into a resulting `row`.
    +        corruptFieldIndex.map { corrFieldIndex =>
    +          val (front, back) = lengthSafeTokens.splitAt(corrFieldIndex)
    +          front ++ new Array[String](1) ++ back
    --- End diff --
    
    We have two options; 1) we just revert this part, or (2) modify this part to avoid the allocation based on this code. cc: @HyukjinKwon 
    
    e.x.) 
    This is just an example and it seems to be a little hard to understand.
    ```
      val parsedTokens = new Array[String](schema.length)
    
            ...
            // If we need to handle corrupt fields, it adds an extra token to skip a field for malformed
            // strings when loading parsed tokens into a resulting `row`.
            corruptFieldIndex.map { corrFieldIndex =>
              lengthSafeTokens.splitAt(corrFieldIndex) match { case (front, back) =>
                  front.zipWithIndex.foreach { case (s, i) =>
                    parsedTokens(i) = s
                  }
                  back.zipWithIndex.foreach { case (s, i) =>
                    parsedTokens(schema.length - back.length + i) = s
                  }
              }
              parsedTokens
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73250 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73250/testReport)** for PR 16928 at commit [`448e6fe`](https://github.com/apache/spark/commit/448e6fe9f20f11c1171bcaeebf27620fd2f93ac3).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102645047
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +45,41 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType)
    +    require(schema(corrFieldIndex).nullable)
    +  }
    +
    +  private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    --- End diff --
    
    It is because parsing CSV is dependent on the order of schema and tokens. In case of JSON, this can be just mapped by its key but for CSV it depends on the order of schema.
    
    So, it seems this filters the corrupt field out in order to match the data schema with parsed tokens.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73169 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73169/testReport)** for PR 16928 at commit [`9b9d043`](https://github.com/apache/spark/commit/9b9d0436c509bd93f7629ae2459326e5a87059cb).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    I also keep considering other ways to fix this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102366771
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +46,41 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType,
    +      "A field for corrupt records must have a string type")
    +    require(schema(corrFieldIndex).nullable, "A field for corrupt must be nullable")
    +  }
    +
    +  private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    +  CSVUtils.verifySchema(inputSchema)
    --- End diff --
    
    It seems `verifySchema` is being called duplicatedly (https://github.com/apache/spark/pull/16928/files/4eed4a4bea927c6365dac2e2734c895a0a1a0026#diff-a549ac2e19ee7486911e2e6403444d9dR99)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Let me try to review further at my best once it is decided.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101894462
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +46,35 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val inputSchema = columnNameOfCorruptRecord.map { fn =>
    +    StructType(schema.filter(_.name != fn))
    +  }.getOrElse(schema)
    +
       private val valueConverters =
    -    schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
    +    inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
     
       private val parser = new CsvParser(options.asParserSettings)
     
       private var numMalformedRecords = 0
     
       private val row = new GenericInternalRow(requiredSchema.length)
     
    -  private val indexArr: Array[Int] = {
    +  private val shouldHandleCorruptRecord = columnNameOfCorruptRecord.isDefined
    +  private val corruptIndex = columnNameOfCorruptRecord.flatMap { fn =>
    --- End diff --
    
    It is a trivial but maybe
    
    ```scala
    private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    ```
    
    if possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102017077
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +214,30 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && inputSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      if (options.permissive && shouldHandleCorruptRecord) {
    +        row.setNullAt(corruptIndex)
    +      }
    +      val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) {
    +        tokens ++ new Array[String](inputSchema.length - tokens.length)
    +      } else if (options.permissive && inputSchema.length < tokens.length) {
    +        tokens.take(inputSchema.length)
           } else {
             tokens
           }
     
           try {
             Some(convert(checkedTokens))
           } catch {
    +        // We only catch exceptions about malformed values here and pass over other exceptions
    +        // (e.g., SparkException about unsupported types).
    +        case _: NumberFormatException | _: IllegalArgumentException
    +            if options.permissive && shouldHandleCorruptRecord =>
    +          row(corruptIndex) = UTF8String.fromString(tokens.mkString(options.delimiter.toString))
    --- End diff --
    
    I added `CSVUtils.verifySchema` in the head of `buildReader`, then modified the code above to handle all the `NonFatal` exceptions with a permissive mode in `parseWithMode`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102645418
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +45,41 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType)
    +    require(schema(corrFieldIndex).nullable)
    +  }
    +
    +  private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    --- End diff --
    
    (For example, [this code path](https://github.com/apache/spark/pull/16928/files/c86febe6b018faafa62e0bf6444f8cd4326fb021#diff-d19881aceddcaa5c60620fdcda99b4c4R213) is dependent on the length of the data schema. It drops/adds tokens after comparing the length between data schema and tokens. If we keep the corrupt column, then the schema length would be different with the tokens.)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73313 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73313/testReport)** for PR 16928 at commit [`80c3775`](https://github.com/apache/spark/commit/80c3775fa2f5d4641dd05769cc85bf928f7806fe).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102734548
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +212,41 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && dataSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive) {
    +        // If a length of parsed tokens is not equal to expected one, it makes the length the same
    +        // with the expected. If the length is shorter, it adds extra tokens in the tail.
    +        // If longer, it drops extra tokens.
    +        val lengthSafeTokens = if (dataSchema.length > tokens.length) {
    +          tokens ++ new Array[String](dataSchema.length - tokens.length)
    +        } else if (dataSchema.length < tokens.length) {
    +          tokens.take(dataSchema.length)
    +        } else {
    +          tokens
    +        }
    +
    +        // If we need to handle corrupt fields, it adds an extra token to skip a field for malformed
    +        // strings when loading parsed tokens into a resulting `row`.
    +        corruptFieldIndex.map { corrFieldIndex =>
    +          val (front, back) = lengthSafeTokens.splitAt(corrFieldIndex)
    +          front ++ new Array[String](1) ++ back
    --- End diff --
    
    Added TODO comments in [here](https://github.com/apache/spark/pull/16928/files#diff-d19881aceddcaa5c60620fdcda99b4c4R75) and [here](https://github.com/apache/spark/pull/16928/files#diff-d19881aceddcaa5c60620fdcda99b4c4R235)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102367846
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -147,8 +165,6 @@ private[csv] class UnivocityParser(
     
         case udt: UserDefinedType[_] => (datum: String) =>
           makeConverter(name, udt.sqlType, nullable, options)
    -
    -    case _ => throw new RuntimeException(s"Unsupported type: ${dataType.typeName}")
    --- End diff --
    
    okay


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102016459
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -173,25 +188,22 @@ private[csv] class UnivocityParser(
        */
       def parse(input: String): Option[InternalRow] = {
         convertWithParseMode(parser.parseLine(input)) { tokens =>
    -      var i: Int = 0
    -      while (i < indexArr.length) {
    -        val pos = indexArr(i)
    +      indexArr.foreach { case (pos, i) =>
    --- End diff --
    
    fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    definitely we should match the behavior of json


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102657839
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +212,41 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && dataSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive) {
    +        // If a length of parsed tokens is not equal to expected one, it makes the length the same
    +        // with the expected. If the length is shorter, it adds extra tokens in the tail.
    +        // If longer, it drops extra tokens.
    +        val lengthSafeTokens = if (dataSchema.length > tokens.length) {
    +          tokens ++ new Array[String](dataSchema.length - tokens.length)
    +        } else if (dataSchema.length < tokens.length) {
    +          tokens.take(dataSchema.length)
    +        } else {
    +          tokens
    +        }
    +
    +        // If we need to handle corrupt fields, it adds an extra token to skip a field for malformed
    +        // strings when loading parsed tokens into a resulting `row`.
    +        corruptFieldIndex.map { corrFieldIndex =>
    +          val (front, back) = lengthSafeTokens.splitAt(corrFieldIndex)
    +          front ++ new Array[String](1) ++ back
    --- End diff --
    
    this will introduce a lot of extra object allocation, I think the previous version is better


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r120024728
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -191,10 +191,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
             :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                          set, it uses the default value, ``PERMISSIVE``.
     
    -                *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    -                  record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    +                 record, and puts the malformed string into a field configured by \
    +                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
    +                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
    +                 schema. If a schema does not have the field, it drops corrupt records during \
    +                 parsing. When inferring a schema, it implicitly adds a \
    +                 ``columnNameOfCorruptRecord`` field in an output schema.
    --- End diff --
    
    Will submit a PR soon for fixing both issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102008366
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -173,25 +188,22 @@ private[csv] class UnivocityParser(
        */
       def parse(input: String): Option[InternalRow] = {
         convertWithParseMode(parser.parseLine(input)) { tokens =>
    -      var i: Int = 0
    -      while (i < indexArr.length) {
    -        val pos = indexArr(i)
    +      indexArr.foreach { case (pos, i) =>
    --- End diff --
    
    (off-topic: oh, I found the style-guide has great Chinese and Korean versions!)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73167/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73251/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102654066
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +221,25 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && inputSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) {
    +        tokens ++ new Array[String](inputSchema.length - tokens.length)
    +      } else if (options.permissive && inputSchema.length < tokens.length) {
    +        tokens.take(inputSchema.length)
           } else {
             tokens
           }
     
           try {
             Some(convert(checkedTokens))
           } catch {
    +        case NonFatal(e) if options.permissive =>
    +          val row = new GenericInternalRow(requiredSchema.length)
    +          corruptFieldIndex.map(row(_) = UTF8String.fromString(input))
    --- End diff --
    
    Aha, fixed. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102734091
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -193,8 +193,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
     
                     *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
                       record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                 ``columnNameOfCorruptRecord``. An user-defined schema can include \
    --- End diff --
    
    Updated and could you check this again? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73169/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102599234
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -367,10 +368,18 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
                              If None is set, it uses the default value, session local timezone.
     
                     * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
    -                    When a schema is set by user, it sets ``null`` for extra fields.
    +                    If users set a string-type field named ``columnNameOfCorruptRecord`` in a
    --- End diff --
    
    maybe `string-type` to `string type` just for consistency?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102363460
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---
    @@ -96,31 +96,44 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
           filters: Seq[Filter],
           options: Map[String, String],
           hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
    -    val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
    -
    +    CSVUtils.verifySchema(dataSchema)
         val broadcastedHadoopConf =
           sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     
    +    val parsedOptions = new CSVOptions(
    +      options,
    +      sparkSession.sessionState.conf.sessionLocalTimeZone,
    +      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
    +
    +    // Check a field requirement for corrupt records here to throw an exception in a driver side
    +    dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).map { corruptFieldIndex =>
    +      val f = dataSchema(corruptFieldIndex)
    +      if (f.dataType != StringType || !f.nullable) {
    +        throw new AnalysisException(
    +          "A field for corrupt records must be a string type and nullable")
    --- End diff --
    
    nit: `The field for corrupt records must be string type and nullable`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102362562
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -303,8 +303,9 @@ def text(self, paths):
         def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
                 comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
                 ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
    -            negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
    -            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
    +            negativeInf=None, columnNameOfCorruptRecord=None, dateFormat=None,
    --- End diff --
    
    put the new parameter at last, for better backward compatibility


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73109 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73109/testReport)** for PR 16928 at commit [`873a383`](https://github.com/apache/spark/commit/873a383cd5c3c1b1179bf9a3b121acc3755beac0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73249 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73249/testReport)** for PR 16928 at commit [`8e83522`](https://github.com/apache/spark/commit/8e8352259d9eac1eb4ac973811379af7dfd6ed83).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73173/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    @HyukjinKwon @cloud-fan okay, all tests passed. Also, I made a pr to fix the json behaviour #17023.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    @HyukjinKwon The current patch has a bit different behaviour between csv and json cases when `_corrupt_record` has types other than `StringType`; in json cases, it hits `requirement failed` and, in csv cases, it hits `AnaysisException` in a driver side (See: https://github.com/apache/spark/pull/16928/files#diff-a549ac2e19ee7486911e2e6403444d9dR109). If we need to keep all the json behaviours, we need to drop the code to throw the `AnalysisException` in the csv case. WDYT?
    
    A json case:
    ```
    scala> Seq("""{"a": "a", "b" : 1}""").toDF().write.text("/Users/maropu/Desktop/data")
    scala> val dataSchema = StructType(StructField("a", IntegerType, true) :: StructField("b", StringType, true) :: Nil)
    scala> spark.read.schema(dataSchema.add("_corrupt_record", StringType)).option("mode", "PERMISSIVE").json("/Users/maropu/Desktop/data").show()
    +----+----+-------------------+
    |   a|   b|    _corrupt_record|
    +----+----+-------------------+
    |null|null|{"a": "a", "b" : 1}|
    +----+----+-------------------+
    
    scala> spark.read.schema(dataSchema.add("_corrupt_record", IntegerType)).option("mode", "PERMISSIVE").json("/Users/maropu/Desktop/data").show()
    17/02/21 02:18:04 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
    java.lang.IllegalArgumentException: requirement failed
            at scala.Predef$.require(Predef.scala:212)
            at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$1.apply$mcVI$sp(JacksonParser.scala:61)
            at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$1.apply(JacksonParser.scala:61)
            at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$1.apply(JacksonParser.scala:61)
            at scala.Option.foreach(Option.scala:257)
            at org.apache.spark.sql.catalyst.json.JacksonParser.<init>(JacksonParser.scala:61)
            at org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$1.apply(JsonFileFormat.scala:106)
            at org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$1.apply(JsonFileFormat.scala:105)
    ```
    
    A csv case:
    ```
    scala> Seq("0,2013-111-11 12:13:14").toDF().write.text("/Users/maropu/Desktop/data")
    scala> val dataSchema = StructType(StructField("a", IntegerType, true) :: StructField("b", TimestampType, true) :: Nil)
    scala> spark.read.schema(dataSchema.add("_corrupt_record", StringType)).option("mode", "PERMISSIVE").csv("/Users/maropu/Desktop/data").show()
    +----+----+--------------------+
    |   a|   b|     _corrupt_record|
    +----+----+--------------------+
    |null|null|0,2013-111-11 12:...|
    +----+----+--------------------+
    
    scala> spark.read.schema(dataSchema.add("_corrupt_record", IntegerType)).option("mode", "PERMISSIVE").csv("/Users/maropu/Desktop/data").show()
    org.apache.spark.sql.AnalysisException: A field for corrupt records must be a string type and nullable;
      at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply$mcVI$sp(CSVFileFormat.scala:112)
      at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:109)
      at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1.apply(CSVFileFormat.scala:109)
      at scala.Option.map(Option.scala:146)
    ```
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102666977
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,6 +45,14 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType)
    +    require(schema(corrFieldIndex).nullable)
    +  }
    +
    +  private val dataSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    --- End diff --
    
    ok, I'll update


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Fill NULL in a field when detecting a...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    @maropu, I just ran some similar tests with JSON datasource. What do you think about matching it to JSON's behaviour by introducing `columnNameOfCorruptRecord`?
    
    I ran with the data and schema as below:
    
    ```scala
    Seq("""{"a": "a", "b" : 1}""").toDF().write.text("/tmp/path")
    val schema = StructType(StructField("a", IntegerType, true) :: StructField("b", StringType, true) :: StructField("_corrupt_record", StringType, true) :: Nil)
    ```
    
    **`FAILFAST`**
    
    ```scala
    scala> spark.read.schema(schema).option("mode", "FAILFAST").json("/tmp/path").show()
    org.apache.spark.sql.catalyst.json.SparkSQLJsonProcessingException: Malformed line in FAILFAST mode: {"a": "a", "b" : 1}
    ```
    
    **`DROPMALFORMED`**
    
    ```scala
    scala> spark.read.schema(schema).option("mode", "DROPMALFORMED").json("/tmp/path").show()
    +---+---+---------------+
    |  a|  b|_corrupt_record|
    +---+---+---------------+
    +---+---+---------------+
    ```
    
    **`PERMISSIVE`**
    
    ```scala
    scala> spark.read.schema(schema).option("mode", "PERMISSIVE").json("/tmp/path").show()
    +----+----+-------------------+
    |   a|   b|    _corrupt_record|
    +----+----+-------------------+
    |null|null|{"a": "a", "b" : 1}|
    +----+----+-------------------+
    ```
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #72971 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72971/testReport)** for PR 16928 at commit [`f09a899`](https://github.com/apache/spark/commit/f09a8992d631694915bef3cec657713c0da4ea32).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73109/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101894442
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -173,25 +185,41 @@ private[csv] class UnivocityParser(
        */
       def parse(input: String): Option[InternalRow] = {
         convertWithParseMode(parser.parseLine(input)) { tokens =>
    -      var i: Int = 0
    -      while (i < indexArr.length) {
    -        val pos = indexArr(i)
    -        // It anyway needs to try to parse since it decides if this row is malformed
    -        // or not after trying to cast in `DROPMALFORMED` mode even if the casted
    -        // value is not stored in the row.
    -        val value = valueConverters(pos).apply(tokens(pos))
    -        if (i < requiredSchema.length) {
    -          row(i) = value
    +      var foundMalformed: Boolean = false
    +      indexArr.foreach { case (pos, i) =>
    +        try {
    +          // It anyway needs to try to parse since it decides if this row is malformed
    +          // or not after trying to cast in `DROPMALFORMED` mode even if the casted
    +          // value is not stored in the row.
    +          val value = valueConverters(pos).apply(tokens(pos))
    +          if (i < requiredSchema.length) {
    +            row(i) = value
    +          }
    +        } catch {
    +          case _: NumberFormatException | _: IllegalArgumentException
    --- End diff --
    
    Could we maybe decouple this parsing handling logics into `convertWithParseMode`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102657558
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +212,41 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && dataSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive) {
    +        // If a length of parsed tokens is not equal to expected one, it makes the length the same
    +        // with the expected. If the length is shorter, it adds extra tokens in the tail.
    +        // If longer, it drops extra tokens.
    --- End diff --
    
    Should we also put that malformed record (shorter or longer) into a corrupt field?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73166/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102366629
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +46,41 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType,
    +      "A field for corrupt records must have a string type")
    +    require(schema(corrFieldIndex).nullable, "A field for corrupt must be nullable")
    +  }
    +
    +  private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    +  CSVUtils.verifySchema(inputSchema)
    +
       private val valueConverters =
    -    schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
    +    inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
     
       private val parser = new CsvParser(options.asParserSettings)
     
       private var numMalformedRecords = 0
     
       private val row = new GenericInternalRow(requiredSchema.length)
     
    -  private val indexArr: Array[Int] = {
    +  private val indexArr: Array[(Int, Int)] = {
    --- End diff --
    
    Actually, it seems we don't need both? It seems second one is not used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73353 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73353/testReport)** for PR 16928 at commit [`a58ff1f`](https://github.com/apache/spark/commit/a58ff1f772e8ec86b4a320b826d2fc2959bb6439).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102367607
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -189,9 +205,10 @@ private[csv] class UnivocityParser(
         }
       }
     
    -  private def convertWithParseMode(
    -      tokens: Array[String])(convert: Array[String] => InternalRow): Option[InternalRow] = {
    -    if (options.dropMalformed && schema.length != tokens.length) {
    +  private def parseWithMode(input: String)(convert: Array[String] => InternalRow)
    +    : Option[InternalRow] = {
    --- End diff --
    
    Oh, one more, it seems it fits into 100 line length limit?
    
    ```scala
    private def convertWithParseMode(
        input: String)(convert: Array[String] => InternalRow): Option[InternalRow] = {
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102458663
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -190,8 +208,9 @@ private[csv] class UnivocityParser(
       }
     
       private def convertWithParseMode(
    -      tokens: Array[String])(convert: Array[String] => InternalRow): Option[InternalRow] = {
    -    if (options.dropMalformed && schema.length != tokens.length) {
    +    input: String)(convert: Array[String] => InternalRow): Option[InternalRow] = {
    --- End diff --
    
    2 spaces more :).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73328/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73335 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73335/testReport)** for PR 16928 at commit [`512fb42`](https://github.com/apache/spark/commit/512fb42404fee1c702bc9e18ad36f15da9e0b273).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73167 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73167/testReport)** for PR 16928 at commit [`4df4bc6`](https://github.com/apache/spark/commit/4df4bc6ed6614b622ea1fefb6427d46aa82d0d83).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73251 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73251/testReport)** for PR 16928 at commit [`619094a`](https://github.com/apache/spark/commit/619094a4dbb0e400daac0d94905b40df07b650b6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102688956
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +212,41 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && dataSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive) {
    +        // If a length of parsed tokens is not equal to expected one, it makes the length the same
    +        // with the expected. If the length is shorter, it adds extra tokens in the tail.
    +        // If longer, it drops extra tokens.
    +        val lengthSafeTokens = if (dataSchema.length > tokens.length) {
    +          tokens ++ new Array[String](dataSchema.length - tokens.length)
    +        } else if (dataSchema.length < tokens.length) {
    +          tokens.take(dataSchema.length)
    +        } else {
    +          tokens
    +        }
    +
    +        // If we need to handle corrupt fields, it adds an extra token to skip a field for malformed
    +        // strings when loading parsed tokens into a resulting `row`.
    +        corruptFieldIndex.map { corrFieldIndex =>
    +          val (front, back) = lengthSafeTokens.splitAt(corrFieldIndex)
    +          front ++ new Array[String](1) ++ back
    --- End diff --
    
    Probably, it'd be better to leave comments here as TODO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73175 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73175/testReport)** for PR 16928 at commit [`4eed4a4`](https://github.com/apache/spark/commit/4eed4a4bea927c6365dac2e2734c895a0a1a0026).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    thanks, merging to master!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101894489
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +46,35 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val inputSchema = columnNameOfCorruptRecord.map { fn =>
    +    StructType(schema.filter(_.name != fn))
    +  }.getOrElse(schema)
    +
       private val valueConverters =
    -    schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
    +    inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
     
       private val parser = new CsvParser(options.asParserSettings)
     
       private var numMalformedRecords = 0
     
       private val row = new GenericInternalRow(requiredSchema.length)
     
    -  private val indexArr: Array[Int] = {
    +  private val shouldHandleCorruptRecord = columnNameOfCorruptRecord.isDefined
    +  private val corruptIndex = columnNameOfCorruptRecord.flatMap { fn =>
    +    requiredSchema.getFieldIndex(fn)
    +  }.getOrElse(-1)
    +
    +  private val indexArr: Array[(Int, Int)] = {
         val fields = if (options.dropMalformed) {
           // If `dropMalformed` is enabled, then it needs to parse all the values
           // so that we can decide which row is malformed.
           requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
         } else {
           requiredSchema
         }
    -    fields.map(schema.indexOf(_: StructField)).toArray
    +    fields.zipWithIndex.filter { case (_, i) => i != corruptIndex }.map { case (f, i) =>
    +      (inputSchema.indexOf(f), i)
    +    }.toArray
    --- End diff --
    
    Let me checkout this PR and check if this bit can be cleaner but it might be even nicer if you could double check please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101910362
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala ---
    @@ -95,6 +104,9 @@ private[csv] class CSVOptions(
       val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
       val permissive = ParseModes.isPermissiveMode(parseMode)
     
    +  val columnNameOfCorruptRecord =
    +    parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)
    --- End diff --
    
    Maybe, we should add this in `readwriter.py` too and document this in `readwriter.py`, `DataFrameReader` and `DataStreamReader`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Fill NULL in a field when detecting a...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #72969 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72969/testReport)** for PR 16928 at commit [`5486f5d`](https://github.com/apache/spark/commit/5486f5d863f3ced9f54d042fc5806a396fdf838c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102016391
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---
    @@ -101,6 +101,11 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
         val broadcastedHadoopConf =
           sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     
    +    val parsedOptions = new CSVOptions(
    +      options,
    +      sparkSession.sessionState.conf.sessionLocalTimeZone,
    +      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
    --- End diff --
    
    Fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102648290
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +221,25 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && inputSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) {
    +        tokens ++ new Array[String](inputSchema.length - tokens.length)
    +      } else if (options.permissive && inputSchema.length < tokens.length) {
    +        tokens.take(inputSchema.length)
    --- End diff --
    
    okay, I'll update soon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/16928


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73175 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73175/testReport)** for PR 16928 at commit [`4eed4a4`](https://github.com/apache/spark/commit/4eed4a4bea927c6365dac2e2734c895a0a1a0026).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73166 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73166/testReport)** for PR 16928 at commit [`bf286c7`](https://github.com/apache/spark/commit/bf286c79f20ada32ea9e6f6808ec363582c78cb0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73172/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73335/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #72969 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72969/testReport)** for PR 16928 at commit [`5486f5d`](https://github.com/apache/spark/commit/5486f5d863f3ced9f54d042fc5806a396fdf838c).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102646336
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +221,25 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && inputSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) {
    +        tokens ++ new Array[String](inputSchema.length - tokens.length)
    +      } else if (options.permissive && inputSchema.length < tokens.length) {
    +        tokens.take(inputSchema.length)
           } else {
             tokens
           }
     
           try {
             Some(convert(checkedTokens))
           } catch {
    +        case NonFatal(e) if options.permissive =>
    +          val row = new GenericInternalRow(requiredSchema.length)
    +          corruptFieldIndex.map(row(_) = UTF8String.fromString(input))
    --- End diff --
    
    `map` -> `foreach`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101911027
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +46,35 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val inputSchema = columnNameOfCorruptRecord.map { fn =>
    +    StructType(schema.filter(_.name != fn))
    +  }.getOrElse(schema)
    +
       private val valueConverters =
    -    schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
    +    inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
     
       private val parser = new CsvParser(options.asParserSettings)
     
       private var numMalformedRecords = 0
     
       private val row = new GenericInternalRow(requiredSchema.length)
     
    -  private val indexArr: Array[Int] = {
    +  private val shouldHandleCorruptRecord = columnNameOfCorruptRecord.isDefined
    +  private val corruptIndex = columnNameOfCorruptRecord.flatMap { fn =>
    +    requiredSchema.getFieldIndex(fn)
    +  }.getOrElse(-1)
    +
    +  private val indexArr: Array[(Int, Int)] = {
         val fields = if (options.dropMalformed) {
           // If `dropMalformed` is enabled, then it needs to parse all the values
           // so that we can decide which row is malformed.
           requiredSchema ++ schema.filterNot(requiredSchema.contains(_))
         } else {
           requiredSchema
         }
    -    fields.map(schema.indexOf(_: StructField)).toArray
    +    fields.zipWithIndex.filter { case (_, i) => i != corruptIndex }.map { case (f, i) =>
    +      (inputSchema.indexOf(f), i)
    +    }.toArray
    --- End diff --
    
    I see. I got it. You meant to support arbitrarily located column as corrupt field. I think CSV parsing is dependent on original schema's order when initially loaded. IMHO, I think it is okay to force locating this field at the end.
    I mean, I can't imagine a user adding the corrupt column in the middle of the schema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101911409
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -173,25 +188,22 @@ private[csv] class UnivocityParser(
        */
       def parse(input: String): Option[InternalRow] = {
         convertWithParseMode(parser.parseLine(input)) { tokens =>
    -      var i: Int = 0
    -      while (i < indexArr.length) {
    -        val pos = indexArr(i)
    +      indexArr.foreach { case (pos, i) =>
    --- End diff --
    
    (Oh, BTW, we should just use `while` in the critical path if we are not pretty sure whether the byte codes are virtually the same or more efficient. This is also written as `Use while loops instead of for loops or functional transformations (e.g. map, foreach)` in [scala-style-guide#traversal-and-zipwithindex](https://github.com/databricks/scala-style-guide#traversal-and-zipwithindex)) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102643746
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -367,10 +368,18 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
                              If None is set, it uses the default value, session local timezone.
     
                     * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record.
    -                    When a schema is set by user, it sets ``null`` for extra fields.
    +                    If users set a string type field named ``columnNameOfCorruptRecord`` in a
    --- End diff --
    
    can we just copy-paste the doc from `json`? or you can make some changes but please make them consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73348/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73313/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73109 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73109/testReport)** for PR 16928 at commit [`873a383`](https://github.com/apache/spark/commit/873a383cd5c3c1b1179bf9a3b121acc3755beac0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102665176
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -193,8 +193,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
     
                     *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
                       record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                 ``columnNameOfCorruptRecord``. An user-defined schema can include \
    +                 a string type field named ``columnNameOfCorruptRecord`` for corrupt records. \
    +                 When a schema is set by user, it sets ``null`` for extra fields.
    --- End diff --
    
    Ah..., a bit different I think. As @HyukjinKwon said above(https://github.com/apache/spark/pull/16928#discussion_r102645047), CSV formats depend on a length of parsed tokens (if the length shorter, fills `null`, and if longer, drops them in permissive mode). One the other hand, in JSON formats, fields in a required schema are mapped by `key`. In case of missing keys in JSON formats, it just sets `null` in these fields with the keys in all the three mode. cc: @HyukjinKwon 
    e.x.)
    ```
    
    import org.apache.spark.sql.types._
    scala> Seq("""{"a": "a", "b" : 1}""", """{"a": "a"}""").toDF().write.text("/Users/maropu/Desktop/data")
    scala> val dataSchema = StructType(StructField("a", StringType, true) :: StructField("b", IntegerType, true) :: Nil)
    scala> spark.read.schema(dataSchema).option("mode", "PERMISSIVE").json("/Users/maropu/Desktop/data").show()
    +---+----+
    |  a|   b|
    +---+----+
    |  a|   1|
    |  a|null|
    +---+----+
    
    scala> spark.read.schema(dataSchema).option("mode", "FAILFAST").json("/Users/maropu/Desktop/data").show()
    +---+----+
    |  a|   b|
    +---+----+
    |  a|   1|
    |  a|null|
    +---+----+
    
    scala> spark.read.schema(dataSchema).option("mode", "DROPMALFORMED").json("/Users/maropu/Desktop/data").show()
    +---+----+
    |  a|   b|
    +---+----+
    |  a|   1|
    |  a|null|
    +---+----+
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73166 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73166/testReport)** for PR 16928 at commit [`bf286c7`](https://github.com/apache/spark/commit/bf286c79f20ada32ea9e6f6808ec363582c78cb0).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73313 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73313/testReport)** for PR 16928 at commit [`80c3775`](https://github.com/apache/spark/commit/80c3775fa2f5d4641dd05769cc85bf928f7806fe).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73173 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73173/testReport)** for PR 16928 at commit [`2a29b4a`](https://github.com/apache/spark/commit/2a29b4ab4b8defe3ced8112c17d47367e167de8e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102646572
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +221,25 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && inputSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) {
    +        tokens ++ new Array[String](inputSchema.length - tokens.length)
    +      } else if (options.permissive && inputSchema.length < tokens.length) {
    +        tokens.take(inputSchema.length)
    --- End diff --
    
    not related to this PR, but can you add some comments for this code block? It's kind if hard to follow the logic here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102660814
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +212,41 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && dataSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      val checkedTokens = if (options.permissive) {
    +        // If a length of parsed tokens is not equal to expected one, it makes the length the same
    +        // with the expected. If the length is shorter, it adds extra tokens in the tail.
    +        // If longer, it drops extra tokens.
    --- End diff --
    
    Yup, I agree in a way but I guess "it is pretty common that CSV is malformed in this way" (said by the analysis team in my company). Could we leave it as is for now here?
    
    Let me try to raise a different JIRA after checking R's `read.csv` or other libraries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102658465
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -193,8 +193,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
     
                     *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
                       record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                 ``columnNameOfCorruptRecord``. An user-defined schema can include \
    --- End diff --
    
    please rephrase this document a little bit, to make it more clear


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    I'll update in a day, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101988141
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -202,21 +214,30 @@ private[csv] class UnivocityParser(
           }
           numMalformedRecords += 1
           None
    -    } else if (options.failFast && schema.length != tokens.length) {
    +    } else if (options.failFast && inputSchema.length != tokens.length) {
           throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
             s"${tokens.mkString(options.delimiter.toString)}")
         } else {
    -      val checkedTokens = if (options.permissive && schema.length > tokens.length) {
    -        tokens ++ new Array[String](schema.length - tokens.length)
    -      } else if (options.permissive && schema.length < tokens.length) {
    -        tokens.take(schema.length)
    +      if (options.permissive && shouldHandleCorruptRecord) {
    +        row.setNullAt(corruptIndex)
    +      }
    +      val checkedTokens = if (options.permissive && inputSchema.length > tokens.length) {
    +        tokens ++ new Array[String](inputSchema.length - tokens.length)
    +      } else if (options.permissive && inputSchema.length < tokens.length) {
    +        tokens.take(inputSchema.length)
           } else {
             tokens
           }
     
           try {
             Some(convert(checkedTokens))
           } catch {
    +        // We only catch exceptions about malformed values here and pass over other exceptions
    +        // (e.g., SparkException about unsupported types).
    +        case _: NumberFormatException | _: IllegalArgumentException
    +            if options.permissive && shouldHandleCorruptRecord =>
    +          row(corruptIndex) = UTF8String.fromString(tokens.mkString(options.delimiter.toString))
    --- End diff --
    
    Aha, it seems reasonable to me and I'll re-check again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73169 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73169/testReport)** for PR 16928 at commit [`9b9d043`](https://github.com/apache/spark/commit/9b9d0436c509bd93f7629ae2459326e5a87059cb).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102369500
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---
    @@ -96,31 +96,44 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
           filters: Seq[Filter],
           options: Map[String, String],
           hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
    -    val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
    -
    +    CSVUtils.verifySchema(dataSchema)
         val broadcastedHadoopConf =
           sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
     
    +    val parsedOptions = new CSVOptions(
    +      options,
    +      sparkSession.sessionState.conf.sessionLocalTimeZone,
    +      sparkSession.sessionState.conf.columnNameOfCorruptRecord)
    +
    +    // Check a field requirement for corrupt records here to throw an exception in a driver side
    +    dataSchema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).map { corruptFieldIndex =>
    +      val f = dataSchema(corruptFieldIndex)
    +      if (f.dataType != StringType || !f.nullable) {
    --- End diff --
    
    I remove the entry in `UnivocityParser`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102701569
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,6 +45,14 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType)
    +    require(schema(corrFieldIndex).nullable)
    +  }
    +
    +  private val dataSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    --- End diff --
    
    I reverted some parts of code and then `dataSchema` is used except for the length https://github.com/apache/spark/pull/16928/files#diff-d19881aceddcaa5c60620fdcda99b4c4R57. So, I kept this variable as it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72993/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r101895589
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -222,12 +250,6 @@ private[csv] class UnivocityParser(
                 logWarning("Parse exception. " +
                   s"Dropping malformed line: ${tokens.mkString(options.delimiter.toString)}")
               }
    -          if (numMalformedRecords == options.maxMalformedLogPerPartition - 1) {
    --- End diff --
    
    Oh, my bad. I'll revert this part.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102370456
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -304,7 +304,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
                 comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
                 ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
                 negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
    -            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None):
    +            maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None,
    +            columnNameOfCorruptRecord=None):
    --- End diff --
    
    Doh, it seems we should add this in `streaming.py` and `DataStreamReader` too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73250 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73250/testReport)** for PR 16928 at commit [`448e6fe`](https://github.com/apache/spark/commit/448e6fe9f20f11c1171bcaeebf27620fd2f93ac3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73175/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r120025610
  
    --- Diff: python/pyspark/sql/readwriter.py ---
    @@ -191,10 +191,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
             :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                          set, it uses the default value, ``PERMISSIVE``.
     
    -                *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    -                  record and puts the malformed string into a new field configured by \
    -                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
    -                 ``null`` for extra fields.
    +                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
    +                 record, and puts the malformed string into a field configured by \
    +                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
    +                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
    +                 schema. If a schema does not have the field, it drops corrupt records during \
    +                 parsing. When inferring a schema, it implicitly adds a \
    +                 ``columnNameOfCorruptRecord`` field in an output schema.
    --- End diff --
    
    Unable to fix this issue... This will break the external schema. : ( 
    
    We should avoid this behavior inconsistency when we introduced CVS at the beginning. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16928: [SPARK-18699][SQL] Put malformed tokens into a ne...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16928#discussion_r102364205
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---
    @@ -45,24 +46,41 @@ private[csv] class UnivocityParser(
       // A `ValueConverter` is responsible for converting the given value to a desired type.
       private type ValueConverter = String => Any
     
    +  private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
    +  corruptFieldIndex.foreach { corrFieldIndex =>
    +    require(schema(corrFieldIndex).dataType == StringType,
    +      "A field for corrupt records must have a string type")
    +    require(schema(corrFieldIndex).nullable, "A field for corrupt must be nullable")
    +  }
    +
    +  private val inputSchema = StructType(schema.filter(_.name != options.columnNameOfCorruptRecord))
    +  CSVUtils.verifySchema(inputSchema)
    +
       private val valueConverters =
    -    schema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
    +    inputSchema.map(f => makeConverter(f.name, f.dataType, f.nullable, options)).toArray
     
       private val parser = new CsvParser(options.asParserSettings)
     
       private var numMalformedRecords = 0
     
       private val row = new GenericInternalRow(requiredSchema.length)
     
    -  private val indexArr: Array[Int] = {
    +  private val indexArr: Array[(Int, Int)] = {
    --- End diff --
    
    add comment to explain these 2 ints.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72991/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    cc @cloud-fan, could I ask if you think matching it seems reasonable? I wrote some details below for you to track this issue.
    
    Currently, JSON produces
    
    ```scala
    Seq("""{"a": "a", "b" : 1}""").toDF().write.text("/tmp/path")
    val schema = StructType(
      StructField("a", IntegerType, true) ::
      StructField("b", StringType, true) :: 
      StructField("_corrupt_record", StringType, true) :: Nil)
    spark.read.schema(schema)
      .option("mode", "PERMISSIVE")
      .json("/tmp/path").show()
    ```
    
    ```scala
    +----+----+-------------------+
    |   a|   b|    _corrupt_record|
    +----+----+-------------------+
    |null|null|{"a": "a", "b" : 1}|
    +----+----+-------------------+
    ```
    
    whereas CSV produces
    
    ```scala
    Seq("""a,1""").toDF().write.text("/tmp/path")
    val schema = StructType(
      StructField("a", IntegerType, true) ::
      StructField("b", StringType, true) :: Nil)
    spark.read.schema(schema)
      .option("mode", "PERMISSIVE")
      .csv("/tmp/path").show()
    ```
    
    ```scala
    java.lang.NumberFormatException: For input string: "a"
    	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    	at java.lang.Integer.parseInt(Integer.java:580)
    ```
    
    To cut it short, the problem is, parsing itself is fine but when a value is unable to convert. In case of JSON, it fills the value in the column specified in `columnNameOfCorruptRecord` whereas CSV throws an exception even if it is `PERMISSIVE` mode.
    
    It seems there are two ways to fix this. 
    
    One is what @maropu initially suggested - permissively fills it as `null`. My worry here is losing the data.
    
    The other way I suggested here - matching it to JSON's one, storing the value in the column specified in `columnNameOfCorruptRecord`. However, note that I guess we will not get this column while inferring in CSV in most cases as I can't imagine the case when CSV itself is malformed.
    
    JSON produces this column when the JSON is malformed as below:
    
    ```scala
    Seq("""{"a": "a", "b" :""").toDF().write.text("/tmp/test123")
    spark.read.json("/tmp/test123").printSchema
    root
     |-- _corrupt_record: string (nullable = true)
    ```
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16928: [SPARK-18699][SQL] Put malformed tokens into a new field...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16928
  
    **[Test build #73353 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73353/testReport)** for PR 16928 at commit [`a58ff1f`](https://github.com/apache/spark/commit/a58ff1f772e8ec86b4a320b826d2fc2959bb6439).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org