You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Gaurav Shah (JIRA)" <ji...@apache.org> on 2017/10/12 04:50:00 UTC

[jira] [Comment Edited] (SPARK-22248) spark marks all columns as null when its unable to parse single column

    [ https://issues.apache.org/jira/browse/SPARK-22248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16201433#comment-16201433 ] 

Gaurav Shah edited comment on SPARK-22248 at 10/12/17 4:49 AM:
---------------------------------------------------------------

[~maropu] I am not sure on CSV, but on JSON we tokenize the input in spark layer then parse each token via Jackson. So if a token fails we can recover from it. I'll give it a shot and push a patch.


was (Author: gaurav24):
[~maropu] I am not sure on CSV, but on JSON we tokenize the input in spark layer then parse each token via Jackson. So if a token fails we can recover from it. Let try and push a patch.

> spark marks all columns as null when its unable to parse single column
> ----------------------------------------------------------------------
>
>                 Key: SPARK-22248
>                 URL: https://issues.apache.org/jira/browse/SPARK-22248
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.2, 2.2.0
>            Reporter: Vignesh Mohan
>
> when parsing JSON data in `PERMISSIVE` mode if one column mismatches the schema it attributes all column values as null.
> {code}
> val conf = new SparkConf().setMaster("local").setAppName("app")
>     val sc = new SparkContext(conf)
>     val sqlContext = new SQLContext(sc)
>     val sparkschema : StructType = {
>       StructType(StructField("name", StringType) :: StructField("count", LongType) :: Nil)
>     }
>       val rdd = sc.parallelize(List(
>         """
>           |{"name": "foo", "count": 24.0}}
>           |""".stripMargin,
>         """
>           |{"name": "bar", "count": 24}}
>           |""".stripMargin
>       ))
>       sqlContext.read.schema(sparkschema).json(rdd).createOrReplaceTempView("events")
>       sqlContext.sql(
>         """
>           | select
>           | name,count
>           | from
>           | events
>         """.stripMargin).collect.foreach(println)
> {code}
> Output:
> {code}
> 17/10/11 03:12:04 WARN JacksonParser: Found at least one malformed records (sample: 
> {"name": "foo", "count": 24.0}}
> ). The JSON reader will replace
> all malformed records with placeholder null in current PERMISSIVE parser mode.
> To find out which corrupted records have been replaced with null, please use the
> default inferred schema instead of providing a custom schema.
> Code example to print all malformed records (scala):
> ===================================================
> // The corrupted record exists in column _corrupt_record.
> val parsedJson = spark.read.json("/path/to/json/file/test.json")
>            
> [null,null]
> [bar,24]
> {code}
> Expected output:
> {code}
> [foo,null]
> [bar,24]
> {code}
> The problem comes from `spark-catalyst_2.11-2.1.0-sources.jar!/org/apache/spark/sql/catalyst/json/JacksonParser.scala`
> {code}
> private def failedConversion(
>       parser: JsonParser,
>       dataType: DataType): PartialFunction[JsonToken, Any] = {
>     case VALUE_STRING if parser.getTextLength < 1 =>
>       // If conversion is failed, this produces `null` rather than throwing exception.
>       // This will protect the mismatch of types.
>       null
>     case token =>
>       // We cannot parse this token based on the given data type. So, we throw a
>       // SparkSQLJsonProcessingException and this exception will be caught by
>       // `parse` method.
>       throw new SparkSQLJsonProcessingException(
>         s"Failed to parse a value for data type $dataType (current token: $token).")
>   }
> {code}
> this raises an exception when parsing the column and 
> {code}
> def parse(input: String): Seq[InternalRow] = {
>     if (input.trim.isEmpty) {
>       Nil
>     } else {
>       try {
>         Utils.tryWithResource(factory.createParser(input)) { parser =>
>           parser.nextToken()
>           rootConverter.apply(parser) match {
>             case null => failedRecord(input)
>             case row: InternalRow => row :: Nil
>             case array: ArrayData =>
>               // Here, as we support reading top level JSON arrays and take every element
>               // in such an array as a row, this case is possible.
>               if (array.numElements() == 0) {
>                 Nil
>               } else {
>                 array.toArray[InternalRow](schema)
>               }
>             case _ =>
>               failedRecord(input)
>           }
>         }
>       } catch {
>         case _: JsonProcessingException =>
>           failedRecord(input)
>         case _: SparkSQLJsonProcessingException =>
>           failedRecord(input)
>       }
>     }
>   }
> {code}
> marks the whole record as failedRecord. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org