You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Vignesh Mohan (JIRA)" <ji...@apache.org> on 2017/10/11 10:24:00 UTC

[jira] [Created] (SPARK-22248) spark marks all columns as null when its unable to parse one column

Vignesh Mohan created SPARK-22248:
-------------------------------------

             Summary: spark marks all columns as null when its unable to parse one column
                 Key: SPARK-22248
                 URL: https://issues.apache.org/jira/browse/SPARK-22248
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.2.0, 2.1.2
            Reporter: Vignesh Mohan


when parsing JSON data in `PERMISSIVE` mode if one column mismatches the schema it attributes all column values as null.


{code}
val conf = new SparkConf().setMaster("local").setAppName("app")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val sparkschema : StructType = {
      StructType(StructField("name", StringType) :: StructField("count", LongType) :: Nil)
    }

      val rdd = sc.parallelize(List(
        """
          |{"name": "foo", "count": 24.0}}
          |""".stripMargin,
        """
          |{"name": "bar", "count": 24}}
          |""".stripMargin
      ))
      sqlContext.read.schema(sparkschema).json(rdd).createOrReplaceTempView("events")
      sqlContext.sql(
        """
          | select
          | name,count
          | from
          | events
        """.stripMargin).collect.foreach(println)

{code}

Output:
{code}
17/10/11 03:12:04 WARN JacksonParser: Found at least one malformed records (sample: 
{"name": "foo", "count": 24.0}}
). The JSON reader will replace
all malformed records with placeholder null in current PERMISSIVE parser mode.
To find out which corrupted records have been replaced with null, please use the
default inferred schema instead of providing a custom schema.

Code example to print all malformed records (scala):
===================================================
// The corrupted record exists in column _corrupt_record.
val parsedJson = spark.read.json("/path/to/json/file/test.json")

           
[null,null]
[bar,24]
{code}

Expected output:
{code}
[foo,null]
[bar,24]
{code}


The problem comes from `spark-catalyst_2.11-2.1.0-sources.jar!/org/apache/spark/sql/catalyst/json/JacksonParser.scala`
{code}
private def failedConversion(
      parser: JsonParser,
      dataType: DataType): PartialFunction[JsonToken, Any] = {
    case VALUE_STRING if parser.getTextLength < 1 =>
      // If conversion is failed, this produces `null` rather than throwing exception.
      // This will protect the mismatch of types.
      null

    case token =>
      // We cannot parse this token based on the given data type. So, we throw a
      // SparkSQLJsonProcessingException and this exception will be caught by
      // `parse` method.
      throw new SparkSQLJsonProcessingException(
        s"Failed to parse a value for data type $dataType (current token: $token).")
  }
{code}

this raises an exception when parsing the column and 
{code}
def parse(input: String): Seq[InternalRow] = {
    if (input.trim.isEmpty) {
      Nil
    } else {
      try {
        Utils.tryWithResource(factory.createParser(input)) { parser =>
          parser.nextToken()
          rootConverter.apply(parser) match {
            case null => failedRecord(input)
            case row: InternalRow => row :: Nil
            case array: ArrayData =>
              // Here, as we support reading top level JSON arrays and take every element
              // in such an array as a row, this case is possible.
              if (array.numElements() == 0) {
                Nil
              } else {
                array.toArray[InternalRow](schema)
              }
            case _ =>
              failedRecord(input)
          }
        }
      } catch {
        case _: JsonProcessingException =>
          failedRecord(input)
        case _: SparkSQLJsonProcessingException =>
          failedRecord(input)
      }
    }
  }
{code}
marks the whole record as failedRecord. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org