You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Vignesh Mohan (JIRA)" <ji...@apache.org> on 2017/10/11 10:24:00 UTC
[jira] [Created] (SPARK-22248) spark marks all columns as null when
its unable to parse one column
Vignesh Mohan created SPARK-22248:
-------------------------------------
Summary: spark marks all columns as null when its unable to parse one column
Key: SPARK-22248
URL: https://issues.apache.org/jira/browse/SPARK-22248
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 2.2.0, 2.1.2
Reporter: Vignesh Mohan
when parsing JSON data in `PERMISSIVE` mode if one column mismatches the schema it attributes all column values as null.
{code}
val conf = new SparkConf().setMaster("local").setAppName("app")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val sparkschema : StructType = {
StructType(StructField("name", StringType) :: StructField("count", LongType) :: Nil)
}
val rdd = sc.parallelize(List(
"""
|{"name": "foo", "count": 24.0}}
|""".stripMargin,
"""
|{"name": "bar", "count": 24}}
|""".stripMargin
))
sqlContext.read.schema(sparkschema).json(rdd).createOrReplaceTempView("events")
sqlContext.sql(
"""
| select
| name,count
| from
| events
""".stripMargin).collect.foreach(println)
{code}
Output:
{code}
17/10/11 03:12:04 WARN JacksonParser: Found at least one malformed records (sample:
{"name": "foo", "count": 24.0}}
). The JSON reader will replace
all malformed records with placeholder null in current PERMISSIVE parser mode.
To find out which corrupted records have been replaced with null, please use the
default inferred schema instead of providing a custom schema.
Code example to print all malformed records (scala):
===================================================
// The corrupted record exists in column _corrupt_record.
val parsedJson = spark.read.json("/path/to/json/file/test.json")
[null,null]
[bar,24]
{code}
Expected output:
{code}
[foo,null]
[bar,24]
{code}
The problem comes from `spark-catalyst_2.11-2.1.0-sources.jar!/org/apache/spark/sql/catalyst/json/JacksonParser.scala`
{code}
private def failedConversion(
parser: JsonParser,
dataType: DataType): PartialFunction[JsonToken, Any] = {
case VALUE_STRING if parser.getTextLength < 1 =>
// If conversion is failed, this produces `null` rather than throwing exception.
// This will protect the mismatch of types.
null
case token =>
// We cannot parse this token based on the given data type. So, we throw a
// SparkSQLJsonProcessingException and this exception will be caught by
// `parse` method.
throw new SparkSQLJsonProcessingException(
s"Failed to parse a value for data type $dataType (current token: $token).")
}
{code}
this raises an exception when parsing the column and
{code}
def parse(input: String): Seq[InternalRow] = {
if (input.trim.isEmpty) {
Nil
} else {
try {
Utils.tryWithResource(factory.createParser(input)) { parser =>
parser.nextToken()
rootConverter.apply(parser) match {
case null => failedRecord(input)
case row: InternalRow => row :: Nil
case array: ArrayData =>
// Here, as we support reading top level JSON arrays and take every element
// in such an array as a row, this case is possible.
if (array.numElements() == 0) {
Nil
} else {
array.toArray[InternalRow](schema)
}
case _ =>
failedRecord(input)
}
}
} catch {
case _: JsonProcessingException =>
failedRecord(input)
case _: SparkSQLJsonProcessingException =>
failedRecord(input)
}
}
}
{code}
marks the whole record as failedRecord.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org