You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2022/06/27 02:05:38 UTC

[spark] branch branch-3.2 updated: [SPARK-39575][AVRO] add ByteBuffer#rewind after ByteBuffer#get in Avr…

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 61dc08da34a [SPARK-39575][AVRO] add ByteBuffer#rewind after ByteBuffer#get in Avr…
61dc08da34a is described below

commit 61dc08da34a405a61a77b0e173bf22bb9f11bcfd
Author: wangzixuan.wzxuan <wa...@bytedance.com>
AuthorDate: Sun Jun 26 21:05:08 2022 -0500

    [SPARK-39575][AVRO] add ByteBuffer#rewind after ByteBuffer#get in Avr…
    
    …oDeserializer
    
    ### What changes were proposed in this pull request?
    Add ByteBuffer#rewind after ByteBuffer#get in AvroDeserializer.
    
    ### Why are the changes needed?
    - HeapBuffer.get(bytes) puts the data from POS to the end into bytes, and sets POS as the end. The next call will return empty bytes.
    - The second call of AvroDeserializer will return an InternalRow with empty binary column when avro record has binary column.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Add ut in AvroCatalystDataConversionSuite.
    
    Closes #36973 from wzx140/avro-fix.
    
    Authored-by: wangzixuan.wzxuan <wa...@bytedance.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
    (cherry picked from commit 558b395880673ec45bf9514c98983e50e21d9398)
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../apache/spark/sql/avro/AvroDeserializer.scala    |  2 ++
 .../sql/avro/AvroCatalystDataConversionSuite.scala  | 21 +++++++++++++++++++++
 2 files changed, 23 insertions(+)

diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 792e1fddf14..620c04ae8f1 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -181,6 +181,8 @@ private[sql] class AvroDeserializer(
           case b: ByteBuffer =>
             val bytes = new Array[Byte](b.remaining)
             b.get(bytes)
+            // Do not forget to reset the position
+            b.rewind()
             bytes
           case b: Array[Byte] => b
           case other =>
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
index a43d171fb52..5c0d64b4d55 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala
@@ -360,4 +360,25 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
       None,
       new OrderedFilters(Seq(Not(EqualTo("Age", 39))), sqlSchema))
   }
+
+  test("AvroDeserializer with binary type") {
+    val jsonFormatSchema =
+      """
+        |{
+        |  "type": "record",
+        |  "name": "record",
+        |  "fields" : [
+        |    {"name": "a", "type": "bytes"}
+        |  ]
+        |}
+      """.stripMargin
+    val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
+    val avroRecord = new GenericData.Record(avroSchema)
+    val bb = java.nio.ByteBuffer.wrap(Array[Byte](97, 48, 53))
+    avroRecord.put("a", bb)
+
+    val expected = InternalRow(Array[Byte](97, 48, 53))
+    checkDeserialization(avroSchema, avroRecord, Some(expected))
+    checkDeserialization(avroSchema, avroRecord, Some(expected))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org