You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/02/01 11:51:27 UTC
[avro] 12/30: AVRO-3240: fix deserializer schema backward compatibility (#1379)
This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
commit 578c81bc0a2ae3ffee1b4b58d6947a81c23054e2
Author: Ultrabug <ul...@users.noreply.github.com>
AuthorDate: Fri Jan 7 14:30:56 2022 +0100
AVRO-3240: fix deserializer schema backward compatibility (#1379)
* AVRO-3240: fix decoding schema when reaching the end of the reader
* AVRO-3240: add from_avro_datum test for reader EOF
* AVRO-3240: add reader eof handling to Schema::String
(cherry picked from commit 7584320142768612bc7a58242231a476404bbceb)
---
lang/rust/src/decode.rs | 78 +++++++++++++++++++++++++++++++++----------------
lang/rust/src/reader.rs | 64 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 117 insertions(+), 25 deletions(-)
diff --git a/lang/rust/src/decode.rs b/lang/rust/src/decode.rs
index 9108d37..71ee4c1 100644
--- a/lang/rust/src/decode.rs
+++ b/lang/rust/src/decode.rs
@@ -23,7 +23,7 @@ use crate::{
util::{safe_len, zag_i32, zag_i64},
AvroResult, Error,
};
-use std::{collections::HashMap, convert::TryFrom, io::Read, str::FromStr};
+use std::{collections::HashMap, convert::TryFrom, io::{ErrorKind, Read}, str::FromStr};
use uuid::Uuid;
#[inline]
@@ -67,15 +67,23 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
Schema::Null => Ok(Value::Null),
Schema::Boolean => {
let mut buf = [0u8; 1];
- reader
- .read_exact(&mut buf[..])
- .map_err(Error::ReadBoolean)?;
-
- match buf[0] {
- 0u8 => Ok(Value::Boolean(false)),
- 1u8 => Ok(Value::Boolean(true)),
- _ => Err(Error::BoolValue(buf[0])),
- }
+ match reader
+ .read_exact(&mut buf[..]) {
+ Ok(_) => {
+ match buf[0] {
+ 0u8 => Ok(Value::Boolean(false)),
+ 1u8 => Ok(Value::Boolean(true)),
+ _ => Err(Error::BoolValue(buf[0])),
+ }
+ },
+ Err(io_err) => {
+ if let ErrorKind::UnexpectedEof = io_err.kind() {
+ Ok(Value::Null)
+ } else {
+ Err(Error::ReadBoolean(io_err))
+ }
+ },
+ }
}
Schema::Decimal { ref inner, .. } => match &**inner {
Schema::Fixed { .. } => match decode(inner, reader)? {
@@ -126,11 +134,20 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
Schema::String => {
let len = decode_len(reader)?;
let mut buf = vec![0u8; len];
- reader.read_exact(&mut buf).map_err(Error::ReadString)?;
-
- Ok(Value::String(
- String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
- ))
+ match reader.read_exact(&mut buf) {
+ Ok(_) => {
+ Ok(Value::String(
+ String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
+ ))
+ },
+ Err(io_err) => {
+ if let ErrorKind::UnexpectedEof = io_err.kind() {
+ Ok(Value::Null)
+ } else {
+ Err(Error::ReadString(io_err))
+ }
+ },
+ }
}
Schema::Fixed { size, .. } => {
let mut buf = vec![0u8; size];
@@ -180,16 +197,27 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
Ok(Value::Map(items))
}
Schema::Union(ref inner) => {
- let index = zag_i64(reader)?;
- let variants = inner.variants();
- let variant = variants
- .get(usize::try_from(index).map_err(|e| Error::ConvertI64ToUsize(e, index))?)
- .ok_or_else(|| Error::GetUnionVariant {
- index,
- num_variants: variants.len(),
- })?;
- let value = decode(variant, reader)?;
- Ok(Value::Union(Box::new(value)))
+ match zag_i64(reader) {
+ Ok(index) => {
+ let variants = inner.variants();
+ let variant = variants
+ .get(usize::try_from(index).map_err(|e| Error::ConvertI64ToUsize(e, index))?)
+ .ok_or_else(|| Error::GetUnionVariant {
+ index,
+ num_variants: variants.len(),
+ })?;
+ let value = decode(variant, reader)?;
+ Ok(Value::Union(Box::new(value)))
+ },
+ Err(Error::ReadVariableIntegerBytes(io_err)) => {
+ if let ErrorKind::UnexpectedEof = io_err.kind() {
+ Ok(Value::Union(Box::new(Value::Null)))
+ } else {
+ Err(Error::ReadVariableIntegerBytes(io_err))
+ }
+ },
+ Err(io_err) => Err(io_err),
+ }
}
Schema::Record { ref fields, .. } => {
// Benchmarks indicate ~10% improvement using this method.
diff --git a/lang/rust/src/reader.rs b/lang/rust/src/reader.rs
index 467c658..e036991 100644
--- a/lang/rust/src/reader.rs
+++ b/lang/rust/src/reader.rs
@@ -304,8 +304,10 @@ pub fn from_avro_datum<R: Read>(
#[cfg(test)]
mod tests {
use super::*;
+ use crate::from_value;
use crate::{types::Record, Reader};
use std::io::Cursor;
+ use serde::Deserialize;
const SCHEMA: &str = r#"
{
@@ -341,6 +343,47 @@ mod tests {
6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
];
+ const TEST_RECORD_SCHEMA_3240: &str = r#"
+ {
+ "type": "record",
+ "name": "test",
+ "fields": [
+ {
+ "name": "a",
+ "type": "long",
+ "default": 42
+ },
+ {
+ "name": "b",
+ "type": "string"
+ },
+ {
+ "name": "a_nullable_array",
+ "type": ["null", {"type": "array", "items": {"type": "string"}}],
+ "default": null
+ },
+ {
+ "name": "a_nullable_boolean",
+ "type": ["null", {"type": "boolean"}],
+ "default": null
+ },
+ {
+ "name": "a_nullable_string",
+ "type": ["null", {"type": "string"}],
+ "default": null
+ }
+ ]
+ }
+ "#;
+ #[derive(Default, Debug, Deserialize, PartialEq)]
+ struct TestRecord {
+ a: i64,
+ b: String,
+ a_nullable_array: Option<Vec<String>>,
+ // we are missing the 'a_nullable_boolean' field to simulate missing keys
+ // a_nullable_boolean: Option<bool>,
+ a_nullable_string: Option<String>,
+ }
#[test]
fn test_from_avro_datum() {
@@ -359,6 +402,27 @@ mod tests {
}
#[test]
+ fn test_from_avro_datum_with_union_to_struct() {
+ let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240).unwrap();
+ let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
+
+ let expected_record: TestRecord = TestRecord {
+ a: 27i64,
+ b: String::from("foo"),
+ a_nullable_array: None,
+ a_nullable_string: None,
+ };
+
+ let avro_datum = from_avro_datum(&schema, &mut encoded, None).unwrap();
+ let parsed_record: TestRecord = match &avro_datum {
+ Value::Record(_) => from_value::<TestRecord>(&avro_datum).unwrap(),
+ unexpected => panic!("could not map avro data to struct, found unexpected: {:?}", unexpected),
+ };
+
+ assert_eq!(parsed_record, expected_record);
+ }
+
+ #[test]
fn test_null_union() {
let schema = Schema::parse_str(UNION_SCHEMA).unwrap();
let mut encoded: &'static [u8] = &[2, 0];