You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2022/06/27 02:05:19 UTC
[spark] branch master updated: [SPARK-39575][AVRO] add ByteBuffer#rewind after ByteBuffer#get in Avr…
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 558b3958806 [SPARK-39575][AVRO] add ByteBuffer#rewind after ByteBuffer#get in Avr…
558b3958806 is described below
commit 558b395880673ec45bf9514c98983e50e21d9398
Author: wangzixuan.wzxuan <wa...@bytedance.com>
AuthorDate: Sun Jun 26 21:05:08 2022 -0500
[SPARK-39575][AVRO] add ByteBuffer#rewind after ByteBuffer#get in Avr…
…oDeserializer
### What changes were proposed in this pull request?
Add ByteBuffer#rewind after ByteBuffer#get in AvroDeserializer.
### Why are the changes needed?
- HeapBuffer.get(bytes) puts the data from POS to the end into bytes, and sets POS as the end. The next call will return empty bytes.
- The second call of AvroDeserializer will return an InternalRow with empty binary column when avro record has binary column.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add ut in AvroCatalystDataConversionSuite.
Closes #36973 from wzx140/avro-fix.
Authored-by: wangzixuan.wzxuan <wa...@bytedance.com>
Signed-off-by: Sean Owen <sr...@gmail.com>
---
.../apache/spark/sql/avro/AvroDeserializer.scala | 2 ++
.../sql/avro/AvroCatalystDataConversionSuite.scala | 21 +++++++++++++++++++++
2 files changed, 23 insertions(+)
diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 5bb51a92977..1192856ae77 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -195,6 +195,8 @@ private[sql] class AvroDeserializer(
case b: ByteBuffer =>
val bytes = new Array[Byte](b.remaining)
b.get(bytes)
+ // Do not forget to reset the position
+ b.rewind()
bytes
case b: Array[Byte] => b
case other =>
diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
index a43d171fb52..5c0d64b4d55 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
@@ -360,4 +360,25 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
None,
new OrderedFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema))
}
+
+ test("AvroDeserializer with binary type") {
+ val jsonFormatSchema =
+ """
+ |{
+ | "type": "record",
+ | "name": "record",
+ | "fields" : [
+ | {"name": "a", "type": "bytes"}
+ | ]
+ |}
+ """.stripMargin
+ val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
+ val avroRecord = new GenericData.Record(avroSchema)
+ val bb = java.nio.ByteBuffer.wrap(Array[Byte](97, 48, 53))
+ avroRecord.put("a", bb)
+
+ val expected = InternalRow(Array[Byte](97, 48, 53))
+ checkDeserialization(avroSchema, avroRecord, Some(expected))
+ checkDeserialization(avroSchema, avroRecord, Some(expected))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org