You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Takeshi Yamamuro (JIRA)" <ji...@apache.org> on 2017/10/12 01:33:00 UTC
[jira] [Commented] (SPARK-22248) spark marks all columns as null
when its unable to parse single column
[ https://issues.apache.org/jira/browse/SPARK-22248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16201309#comment-16201309 ]
Takeshi Yamamuro commented on SPARK-22248:
------------------------------------------
Once failed, I feel it is difficult to recover the parsing even in a single-line mode (more difficult in a multi-line mode). CSV has the same behaviour. cc: [~hyukjin.kwon]
> spark marks all columns as null when its unable to parse single column
> ----------------------------------------------------------------------
>
> Key: SPARK-22248
> URL: https://issues.apache.org/jira/browse/SPARK-22248
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.1.2, 2.2.0
> Reporter: Vignesh Mohan
>
> when parsing JSON data in `PERMISSIVE` mode if one column mismatches the schema it attributes all column values as null.
> {code}
> val conf = new SparkConf().setMaster("local").setAppName("app")
> val sc = new SparkContext(conf)
> val sqlContext = new SQLContext(sc)
> val sparkschema : StructType = {
> StructType(StructField("name", StringType) :: StructField("count", LongType) :: Nil)
> }
> val rdd = sc.parallelize(List(
> """
> |{"name": "foo", "count": 24.0}}
> |""".stripMargin,
> """
> |{"name": "bar", "count": 24}}
> |""".stripMargin
> ))
> sqlContext.read.schema(sparkschema).json(rdd).createOrReplaceTempView("events")
> sqlContext.sql(
> """
> | select
> | name,count
> | from
> | events
> """.stripMargin).collect.foreach(println)
> {code}
> Output:
> {code}
> 17/10/11 03:12:04 WARN JacksonParser: Found at least one malformed records (sample:
> {"name": "foo", "count": 24.0}}
> ). The JSON reader will replace
> all malformed records with placeholder null in current PERMISSIVE parser mode.
> To find out which corrupted records have been replaced with null, please use the
> default inferred schema instead of providing a custom schema.
> Code example to print all malformed records (scala):
> ===================================================
> // The corrupted record exists in column _corrupt_record.
> val parsedJson = spark.read.json("/path/to/json/file/test.json")
>
> [null,null]
> [bar,24]
> {code}
> Expected output:
> {code}
> [foo,null]
> [bar,24]
> {code}
> The problem comes from `spark-catalyst_2.11-2.1.0-sources.jar!/org/apache/spark/sql/catalyst/json/JacksonParser.scala`
> {code}
> private def failedConversion(
> parser: JsonParser,
> dataType: DataType): PartialFunction[JsonToken, Any] = {
> case VALUE_STRING if parser.getTextLength < 1 =>
> // If conversion is failed, this produces `null` rather than throwing exception.
> // This will protect the mismatch of types.
> null
> case token =>
> // We cannot parse this token based on the given data type. So, we throw a
> // SparkSQLJsonProcessingException and this exception will be caught by
> // `parse` method.
> throw new SparkSQLJsonProcessingException(
> s"Failed to parse a value for data type $dataType (current token: $token).")
> }
> {code}
> this raises an exception when parsing the column and
> {code}
> def parse(input: String): Seq[InternalRow] = {
> if (input.trim.isEmpty) {
> Nil
> } else {
> try {
> Utils.tryWithResource(factory.createParser(input)) { parser =>
> parser.nextToken()
> rootConverter.apply(parser) match {
> case null => failedRecord(input)
> case row: InternalRow => row :: Nil
> case array: ArrayData =>
> // Here, as we support reading top level JSON arrays and take every element
> // in such an array as a row, this case is possible.
> if (array.numElements() == 0) {
> Nil
> } else {
> array.toArray[InternalRow](schema)
> }
> case _ =>
> failedRecord(input)
> }
> }
> } catch {
> case _: JsonProcessingException =>
> failedRecord(input)
> case _: SparkSQLJsonProcessingException =>
> failedRecord(input)
> }
> }
> }
> {code}
> marks the whole record as failedRecord.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org