You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/02/01 11:51:27 UTC

[avro] 12/30: AVRO-3240: fix deserializer schema backward compatibility (#1379)

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 578c81bc0a2ae3ffee1b4b58d6947a81c23054e2
Author: Ultrabug <ul...@users.noreply.github.com>
AuthorDate: Fri Jan 7 14:30:56 2022 +0100

    AVRO-3240: fix deserializer schema backward compatibility (#1379)
    
    * AVRO-3240: fix decoding schema when reaching the end of the reader
    
    * AVRO-3240: add from_avro_datum test for reader EOF
    
    * AVRO-3240: add reader eof handling to Schema::String
    
    (cherry picked from commit 7584320142768612bc7a58242231a476404bbceb)
---
 lang/rust/src/decode.rs | 78 +++++++++++++++++++++++++++++++++----------------
 lang/rust/src/reader.rs | 64 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 117 insertions(+), 25 deletions(-)

diff --git a/lang/rust/src/decode.rs b/lang/rust/src/decode.rs
index 9108d37..71ee4c1 100644
--- a/lang/rust/src/decode.rs
+++ b/lang/rust/src/decode.rs
@@ -23,7 +23,7 @@ use crate::{
     util::{safe_len, zag_i32, zag_i64},
     AvroResult, Error,
 };
-use std::{collections::HashMap, convert::TryFrom, io::Read, str::FromStr};
+use std::{collections::HashMap, convert::TryFrom, io::{ErrorKind, Read}, str::FromStr};
 use uuid::Uuid;
 
 #[inline]
@@ -67,15 +67,23 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
         Schema::Null => Ok(Value::Null),
         Schema::Boolean => {
             let mut buf = [0u8; 1];
-            reader
-                .read_exact(&mut buf[..])
-                .map_err(Error::ReadBoolean)?;
-
-            match buf[0] {
-                0u8 => Ok(Value::Boolean(false)),
-                1u8 => Ok(Value::Boolean(true)),
-                _ => Err(Error::BoolValue(buf[0])),
-            }
+            match reader
+                .read_exact(&mut buf[..]) {
+                    Ok(_) => {
+                        match buf[0] {
+                            0u8 => Ok(Value::Boolean(false)),
+                            1u8 => Ok(Value::Boolean(true)),
+                            _ => Err(Error::BoolValue(buf[0])),
+                        }
+                    },
+                    Err(io_err) => {
+                        if let ErrorKind::UnexpectedEof = io_err.kind() {
+                            Ok(Value::Null)
+                        } else {
+                            Err(Error::ReadBoolean(io_err))
+                        }
+                    },
+                }
         }
         Schema::Decimal { ref inner, .. } => match &**inner {
             Schema::Fixed { .. } => match decode(inner, reader)? {
@@ -126,11 +134,20 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
         Schema::String => {
             let len = decode_len(reader)?;
             let mut buf = vec![0u8; len];
-            reader.read_exact(&mut buf).map_err(Error::ReadString)?;
-
-            Ok(Value::String(
-                String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
-            ))
+            match reader.read_exact(&mut buf) {
+                Ok(_) => {
+                    Ok(Value::String(
+                        String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
+                    ))
+                },
+                Err(io_err) => {
+                    if let ErrorKind::UnexpectedEof = io_err.kind() {
+                        Ok(Value::Null)
+                    } else {
+                        Err(Error::ReadString(io_err))
+                    }
+                },
+            }
         }
         Schema::Fixed { size, .. } => {
             let mut buf = vec![0u8; size];
@@ -180,16 +197,27 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
             Ok(Value::Map(items))
         }
         Schema::Union(ref inner) => {
-            let index = zag_i64(reader)?;
-            let variants = inner.variants();
-            let variant = variants
-                .get(usize::try_from(index).map_err(|e| Error::ConvertI64ToUsize(e, index))?)
-                .ok_or_else(|| Error::GetUnionVariant {
-                    index,
-                    num_variants: variants.len(),
-                })?;
-            let value = decode(variant, reader)?;
-            Ok(Value::Union(Box::new(value)))
+            match zag_i64(reader) {
+                Ok(index) => {
+                    let variants = inner.variants();
+                    let variant = variants
+                        .get(usize::try_from(index).map_err(|e| Error::ConvertI64ToUsize(e, index))?)
+                        .ok_or_else(|| Error::GetUnionVariant {
+                            index,
+                            num_variants: variants.len(),
+                        })?;
+                    let value = decode(variant, reader)?;
+                    Ok(Value::Union(Box::new(value)))
+                },
+                Err(Error::ReadVariableIntegerBytes(io_err)) => {
+                    if let ErrorKind::UnexpectedEof = io_err.kind() {
+                        Ok(Value::Union(Box::new(Value::Null)))
+                    } else {
+                        Err(Error::ReadVariableIntegerBytes(io_err))
+                    }
+                },
+                Err(io_err) => Err(io_err),
+            }
         }
         Schema::Record { ref fields, .. } => {
             // Benchmarks indicate ~10% improvement using this method.
diff --git a/lang/rust/src/reader.rs b/lang/rust/src/reader.rs
index 467c658..e036991 100644
--- a/lang/rust/src/reader.rs
+++ b/lang/rust/src/reader.rs
@@ -304,8 +304,10 @@ pub fn from_avro_datum<R: Read>(
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::from_value;
     use crate::{types::Record, Reader};
     use std::io::Cursor;
+    use serde::Deserialize;
 
     const SCHEMA: &str = r#"
     {
@@ -341,6 +343,47 @@ mod tests {
         6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
         207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
     ];
+    const TEST_RECORD_SCHEMA_3240: &str = r#"
+    {
+      "type": "record",
+      "name": "test",
+      "fields": [
+        {
+          "name": "a",
+          "type": "long",
+          "default": 42
+        },
+        {
+          "name": "b",
+          "type": "string"
+        },
+        {
+            "name": "a_nullable_array",
+            "type": ["null", {"type": "array", "items": {"type": "string"}}],
+            "default": null
+        },
+        {
+            "name": "a_nullable_boolean",
+            "type": ["null", {"type": "boolean"}],
+            "default": null
+        },
+        {
+            "name": "a_nullable_string",
+            "type": ["null", {"type": "string"}],
+            "default": null
+        }
+      ]
+    }
+    "#;
+    #[derive(Default, Debug, Deserialize, PartialEq)]
+    struct TestRecord {
+        a: i64,
+        b: String,
+        a_nullable_array: Option<Vec<String>>,
+        // we are missing the 'a_nullable_boolean' field to simulate missing keys
+        // a_nullable_boolean: Option<bool>,
+        a_nullable_string: Option<String>,
+    }
 
     #[test]
     fn test_from_avro_datum() {
@@ -359,6 +402,27 @@ mod tests {
     }
 
     #[test]
+    fn test_from_avro_datum_with_union_to_struct() {
+        let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240).unwrap();
+        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
+
+        let expected_record: TestRecord = TestRecord {
+            a: 27i64,
+            b: String::from("foo"),
+            a_nullable_array: None,
+            a_nullable_string: None,
+        };
+
+        let avro_datum = from_avro_datum(&schema, &mut encoded, None).unwrap();
+        let parsed_record: TestRecord = match &avro_datum {
+            Value::Record(_) => from_value::<TestRecord>(&avro_datum).unwrap(),
+            unexpected => panic!("could not map avro data to struct, found unexpected: {:?}", unexpected),
+        };
+
+        assert_eq!(parsed_record, expected_record);
+    }
+
+    #[test]
     fn test_null_union() {
         let schema = Schema::parse_str(UNION_SCHEMA).unwrap();
         let mut encoded: &'static [u8] = &[2, 0];