You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yosuke Mori (JIRA)" <ji...@apache.org> on 2019/05/21 22:26:00 UTC
[jira] [Created] (SPARK-27798) from_avro can modify variables in
other rows
Yosuke Mori created SPARK-27798:
-----------------------------------
Summary: from_avro can modify variables in other rows
Key: SPARK-27798
URL: https://issues.apache.org/jira/browse/SPARK-27798
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.4.3
Reporter: Yosuke Mori
Steps to reproduce:
Create a local Dataset (at least two distinct rows) with a binary Avro field. Use the {{from_avro}} function to deserialize the binary into another column. Verify that the rows incorrectly have the same value.
Here's a concrete example (using Spark 2.4.3). All it does is converts a list of TestPayload objects into binary using the defined avro schema, then tries to re-serialize using {{from_avro}} with that same schema:
{noformat}
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecordBuilder}
import org.apache.avro.io.EncoderFactory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.from_avro
import org.apache.spark.sql.functions.col
import java.io.ByteArrayOutputStream
object TestApp extends App {
// Payload container
case class TestEvent(payload: Array[Byte])
// Deserialized Payload
case class TestPayload(message: String)
// Schema for Payload
val simpleSchema =
"""
|{
|"type": "record",
|"name" : "Payload",
|"fields" : [ {"name" : "message", "type" : [ "string", "null" ] } ]
|}
""".stripMargin
// Convert TestPayload into avro binary
def generateSimpleSchemaBinary(record: TestPayload, avsc: String): Array[Byte] = {
val schema = new Schema.Parser().parse(avsc)
val out = new ByteArrayOutputStream()
val writer = new GenericDatumWriter[GenericRecord](schema)
val encoder = EncoderFactory.get().binaryEncoder(out, null)
val rootRecord = new GenericRecordBuilder(schema).set("message", record.message).build()
writer.write(rootRecord, encoder)
encoder.flush()
out.toByteArray
}
val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
List(
TestPayload("one"),
TestPayload("two"),
TestPayload("three"),
TestPayload("four")
).map(payload => TestEvent(generateSimpleSchemaBinary(payload, simpleSchema)))
.toDS()
.withColumn("deserializedPayload", from_avro(col("payload"), simpleSchema))
.show(truncate = false)
}
{noformat}
And here is what this program outputs:
{noformat}
+----------------------+-------------------+
|payload |deserializedPayload|
+----------------------+-------------------+
|[00 06 6F 6E 65] |[four] |
|[00 06 74 77 6F] |[four] |
|[00 0A 74 68 72 65 65]|[four] |
|[00 08 66 6F 75 72] |[four] |
+----------------------+-------------------+{noformat}
Here, we can see that the avro binary is correctly generated, but the deserialized version is a copy of the last row.
I dug into a bit more of the code and it seems like the resuse of {{result}} in {{AvroDataToCatalyst}} is overwriting the decoded values of previous rows. I set a breakpoint in {{LocalRelation}} and the {{data}} sequence seem to all point to the same address in memory - and therefore a mutation in one variable will cause all of it to mutate.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org