You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/03/08 06:02:08 UTC
[avro] branch branch-1.11 updated: [AVRO-3433][rust] Adding in Resolve Fixes and Tests (#1582)
This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.11 by this push:
new 9e2191a [AVRO-3433][rust] Adding in Resolve Fixes and Tests (#1582)
9e2191a is described below
commit 9e2191a460f6edec201fa87d08c75c407c9df12d
Author: Jack Klamer <jf...@gmail.com>
AuthorDate: Tue Mar 8 00:01:19 2022 -0600
[AVRO-3433][rust] Adding in Resolve Fixes and Tests (#1582)
* AVRO-3433: Rust: The canonical form should preserve schema references
Do not resolve Schema::Ref when printing as JSON.
The Java SDK complains that a type is redefined if it is not a Ref the
second time
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3433: Use Name as a key for parsed/resolving/input schemata
This is needed to have the fullname (namespace + '.' + name) for lookups
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3433: adding in resolve fixes and tests
* AVRO-3433: Fix formatting and clippy errors
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3433: Give better names to the new test cases
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3433: Collect all Union schemata in schemas_by_name to be able to resolve Schema::Ref's
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3433: Add schema name validation
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3433: Use the regex to capture the namespace and name
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3433: Register all union's schemata in schemas_by_name, so that Schema::Ref's can be resolved
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3433: Add in encoding tests
Co-authored-by: Martin Tzvetanov Grigorov <mg...@apache.org>
Co-authored-by: Martin Grigorov <ma...@users.noreply.github.com>
(cherry picked from commit aa22276152802a85cc628c418f304a164d82b3af)
---
lang/rust/avro/src/decode.rs | 4 +-
lang/rust/avro/src/encode.rs | 289 +++++++++++++++++++++++++++++++-
lang/rust/avro/src/error.rs | 3 +
lang/rust/avro/src/schema.rs | 99 +++++++----
lang/rust/avro/src/types.rs | 366 +++++++++++++++++++++++++++++++++++++----
lang/rust/avro/src/writer.rs | 4 +-
lang/rust/avro/tests/schema.rs | 2 +-
7 files changed, 695 insertions(+), 72 deletions(-)
diff --git a/lang/rust/avro/src/decode.rs b/lang/rust/avro/src/decode.rs
index 83ee68e..87337bb 100644
--- a/lang/rust/avro/src/decode.rs
+++ b/lang/rust/avro/src/decode.rs
@@ -332,7 +332,7 @@ mod tests {
let inner = Box::new(Schema::Fixed {
size: 2,
doc: None,
- name: Name::new("decimal"),
+ name: Name::new("decimal").unwrap(),
});
let schema = Schema::Decimal {
inner,
@@ -356,7 +356,7 @@ mod tests {
use num_bigint::ToBigInt;
let inner = Box::new(Schema::Fixed {
size: 13,
- name: Name::new("decimal"),
+ name: Name::new("decimal").unwrap(),
doc: None,
});
let schema = Schema::Decimal {
diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs
index 6fc969a..664cd92 100644
--- a/lang/rust/avro/src/encode.rs
+++ b/lang/rust/avro/src/encode.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::{
- schema::Schema,
+ schema::{Name, Schema},
types::Value,
util::{zig_i32, zig_i64},
};
@@ -55,17 +55,17 @@ pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
value: &Value,
schema: &Schema,
buffer: &mut Vec<u8>,
- schemas_by_name: &mut HashMap<String, Schema>,
+ schemas_by_name: &mut HashMap<Name, Schema>,
) {
match &schema {
Schema::Ref { ref name } => {
- let resolved = schemas_by_name.get(name.name.as_str()).unwrap();
+ let resolved = schemas_by_name.get(name).unwrap();
return encode_ref0(value, resolved, buffer, &mut schemas_by_name.clone());
}
Schema::Record { ref name, .. }
| Schema::Enum { ref name, .. }
| Schema::Fixed { ref name, .. } => {
- schemas_by_name.insert(name.name.clone(), schema.clone());
+ schemas_by_name.insert(name.clone(), schema.clone());
}
_ => (),
}
@@ -126,6 +126,15 @@ pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
Value::Enum(i, _) => encode_int(*i as i32, buffer),
Value::Union(idx, item) => {
if let Schema::Union(ref inner) = *schema {
+ inner.schemas.iter().for_each(|s| match s {
+ Schema::Record { name, .. }
+ | Schema::Enum { name, .. }
+ | Schema::Fixed { name, .. } => {
+ schemas_by_name.insert(name.clone(), s.clone());
+ }
+ _ => (),
+ });
+
let inner_schema = inner
.schemas
.get(*idx as usize)
@@ -215,4 +224,276 @@ mod tests {
);
assert_eq!(vec![0u8], buf);
}
+
+ #[test]
+ fn test_avro_3433_recursive_definition_encode_record() {
+ let mut buf = Vec::new();
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ },
+ {
+ "name":"b",
+ "type":"Inner"
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+ let outer_value =
+ Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
+ encode(&outer_value, &schema, &mut buf);
+ assert!(!buf.is_empty());
+ }
+
+ #[test]
+ fn test_avro_3433_recursive_definition_encode_array() {
+ let mut buf = Vec::new();
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"array",
+ "items": {
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ }
+ },
+ {
+ "name":"b",
+ "type": {
+ "type":"map",
+ "values":"Inner"
+ }
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+ let outer_value = Value::Record(vec![
+ ("a".into(), Value::Array(vec![inner_value1])),
+ (
+ "b".into(),
+ Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
+ ),
+ ]);
+ encode(&outer_value, &schema, &mut buf);
+ assert!(!buf.is_empty());
+ }
+
+ #[test]
+ fn test_avro_3433_recursive_definition_encode_map() {
+ let mut buf = Vec::new();
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ },
+ {
+ "name":"b",
+ "type": {
+ "type":"map",
+ "values":"Inner"
+ }
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+ let outer_value = Value::Record(vec![
+ ("a".into(), inner_value1),
+ (
+ "b".into(),
+ Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
+ ),
+ ]);
+ encode(&outer_value, &schema, &mut buf);
+ assert!(!buf.is_empty());
+ }
+
+ #[test]
+ fn test_avro_3433_recursive_definition_encode_record_wrapper() {
+ let mut buf = Vec::new();
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ },
+ {
+ "name":"b",
+ "type": {
+ "type":"record",
+ "name": "InnerWrapper",
+ "fields": [ {
+ "name":"j",
+ "type":"Inner"
+ }]
+ }
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![(
+ "j".into(),
+ Value::Record(vec![("z".into(), Value::Int(6))]),
+ )]);
+ let outer_value =
+ Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
+ encode(&outer_value, &schema, &mut buf);
+ assert!(!buf.is_empty());
+ }
+
+ #[test]
+ fn test_avro_3433_recursive_definition_encode_map_and_array() {
+ let mut buf = Vec::new();
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"map",
+ "values": {
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ }
+ },
+ {
+ "name":"b",
+ "type": {
+ "type":"array",
+ "items":"Inner"
+ }
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+ let outer_value = Value::Record(vec![
+ (
+ "a".into(),
+ Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
+ ),
+ ("b".into(), Value::Array(vec![inner_value1])),
+ ]);
+ encode(&outer_value, &schema, &mut buf);
+ assert!(!buf.is_empty());
+ }
+
+ #[test]
+ fn test_avro_3433_recursive_definition_encode_union() {
+ let mut buf = Vec::new();
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":["null", {
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }]
+ },
+ {
+ "name":"b",
+ "type":"Inner"
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+ let outer_value1 = Value::Record(vec![
+ ("a".into(), Value::Union(1, Box::new(inner_value1))),
+ ("b".into(), inner_value2.clone()),
+ ]);
+ encode(&outer_value1, &schema, &mut buf);
+ assert!(!buf.is_empty());
+
+ buf.drain(..);
+ let outer_value2 = Value::Record(vec![
+ ("a".into(), Value::Union(0, Box::new(Value::Null))),
+ ("b".into(), inner_value2),
+ ]);
+ encode(&outer_value2, &schema, &mut buf);
+ assert!(!buf.is_empty());
+ }
}
diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs
index 1ed2f38..b8c5939 100644
--- a/lang/rust/avro/src/error.rs
+++ b/lang/rust/avro/src/error.rs
@@ -279,6 +279,9 @@ pub enum Error {
#[error("Invalid enum symbol name {0}")]
EnumSymbolName(String),
+ #[error("Invalid schema name {0}. It must match the regex '{1}'")]
+ InvalidSchemaName(String, &'static str),
+
#[error("Duplicate enum symbol {0}")]
EnumSymbolDuplicate(String),
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 4474ffa..1a2f47e 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -36,7 +36,11 @@ use std::{
use strum_macros::{EnumDiscriminants, EnumString};
lazy_static! {
- static ref ENUM_SYMBOL_NAME: Regex = Regex::new(r"[A-Za-z_][A-Za-z0-9_]*").unwrap();
+ static ref ENUM_SYMBOL_NAME_R: Regex = Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").unwrap();
+
+ // An optional namespace (with optional dots) followed by a name without any dots in it.
+ static ref SCHEMA_NAME_R: Regex =
+ Regex::new(r"^((?P<namespace>[A-Za-z_][A-Za-z0-9_\.]*)*\.)?(?P<name>[A-Za-z_][A-Za-z0-9_]*)$").unwrap();
}
/// Represents an Avro schema fingerprint
@@ -235,30 +239,30 @@ impl Name {
/// Create a new `Name`.
/// Parses the optional `namespace` from the `name` string.
/// `aliases` will not be defined.
- pub fn new(name: &str) -> Name {
- let (name, namespace) = Name::get_name_and_namespace(name);
- Name {
+ pub fn new(name: &str) -> AvroResult<Name> {
+ let (name, namespace) = Name::get_name_and_namespace(name)?;
+ Ok(Name {
name,
namespace,
aliases: None,
- }
+ })
}
- fn get_name_and_namespace(name: &str) -> (String, Option<String>) {
- if let Some(idx) = name.rfind('.') {
- let namespace_from_name = name[..idx].to_owned();
- let name_from_name = name[idx + 1..].to_owned();
- (name_from_name, Some(namespace_from_name))
- } else {
- (name.to_owned(), None)
- }
+ fn get_name_and_namespace(name: &str) -> AvroResult<(String, Option<String>)> {
+ let caps = SCHEMA_NAME_R
+ .captures(name)
+ .ok_or_else(|| Error::InvalidSchemaName(name.to_string(), SCHEMA_NAME_R.as_str()))?;
+ Ok((
+ caps["name"].to_string(),
+ caps.name("namespace").map(|s| s.as_str().to_string()),
+ ))
}
/// Parse a `serde_json::Value` into a `Name`.
fn parse(complex: &Map<String, Value>) -> AvroResult<Self> {
let (name, namespace_from_name) = complex
.name()
- .map(|name| Name::get_name_and_namespace(name.as_str()))
+ .map(|name| Name::get_name_and_namespace(name.as_str()).unwrap())
.ok_or(Error::GetNameField)?;
// FIXME Reading name from the type is wrong ! The name there is just a metadata (AVRO-3430)
@@ -309,7 +313,7 @@ impl Name {
impl From<&str> for Name {
fn from(name: &str) -> Self {
- Name::new(name)
+ Name::new(name).unwrap()
}
}
@@ -635,7 +639,7 @@ impl Parser {
}
}
- let name = Name::new(name);
+ let name = Name::new(name)?;
if let Some(parsed) = self.parsed_schemas.get(&name) {
return Ok(get_schema_ref(parsed));
@@ -857,7 +861,8 @@ impl Parser {
Some(ref ns) => format!("{}.{}", ns, alias),
None => alias.clone(),
};
- let alias_name = Name::new(alias_fullname.as_str());
+
+ let alias_name = Name::new(alias_fullname.as_str()).unwrap();
self.resolving_schemas
.insert(alias_name, resolving_schema.clone());
});
@@ -876,7 +881,7 @@ impl Parser {
Some(ref ns) => format!("{}.{}", ns, alias),
None => alias.clone(),
};
- let alias_name = Name::new(alias_fullname.as_str());
+ let alias_name = Name::new(alias_fullname.as_str()).unwrap();
self.resolving_schemas.remove(&alias_name);
self.parsed_schemas.insert(alias_name, schema.clone());
});
@@ -887,7 +892,7 @@ impl Parser {
fn get_already_seen_schema(&self, complex: &Map<String, Value>) -> Option<&Schema> {
match complex.get("type") {
Some(Value::String(ref typ)) => {
- let name = Name::new(typ.as_str());
+ let name = Name::new(typ.as_str()).unwrap();
self.resolving_schemas
.get(&name)
.or_else(|| self.parsed_schemas.get(&name))
@@ -967,7 +972,7 @@ impl Parser {
let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len());
for symbol in symbols.iter() {
// Ensure enum symbol names match [A-Za-z_][A-Za-z0-9_]*
- if !ENUM_SYMBOL_NAME.is_match(symbol) {
+ if !ENUM_SYMBOL_NAME_R.is_match(symbol) {
return Err(Error::EnumSymbolName(symbol.to_string()));
}
@@ -1056,7 +1061,7 @@ impl Parser {
fn get_schema_type_name(name: Name, value: Value) -> Name {
match value.get("type") {
Some(Value::Object(complex_type)) => match complex_type.name() {
- Some(name) => Name::new(name.as_str()),
+ Some(name) => Name::new(name.as_str()).unwrap(),
_ => name,
},
_ => name,
@@ -1198,7 +1203,7 @@ impl Serialize for Schema {
// the Avro doesn't indicate what the name of the underlying fixed type of a
// duration should be or typically is.
let inner = Schema::Fixed {
- name: Name::new("duration"),
+ name: Name::new("duration").unwrap(),
doc: None,
size: 12,
};
@@ -1447,7 +1452,7 @@ mod tests {
.clone();
let schema_c_expected = Schema::Record {
- name: Name::new("C"),
+ name: Name::new("C").unwrap(),
doc: None,
fields: vec![RecordField {
name: "field_one".to_string(),
@@ -1456,10 +1461,10 @@ mod tests {
schema: Schema::Union(
UnionSchema::new(vec![
Schema::Ref {
- name: Name::new("A"),
+ name: Name::new("A").unwrap(),
},
Schema::Ref {
- name: Name::new("B"),
+ name: Name::new("B").unwrap(),
},
])
.unwrap(),
@@ -1502,7 +1507,7 @@ mod tests {
.clone();
let schema_option_a_expected = Schema::Record {
- name: Name::new("OptionA"),
+ name: Name::new("OptionA").unwrap(),
doc: None,
fields: vec![RecordField {
name: "field_one".to_string(),
@@ -1512,7 +1517,7 @@ mod tests {
UnionSchema::new(vec![
Schema::Null,
Schema::Ref {
- name: Name::new("A"),
+ name: Name::new("A").unwrap(),
},
])
.unwrap(),
@@ -1547,7 +1552,7 @@ mod tests {
lookup.insert("b".to_owned(), 1);
let expected = Schema::Record {
- name: Name::new("test"),
+ name: Name::new("test").unwrap(),
doc: None,
fields: vec![
RecordField {
@@ -1605,14 +1610,14 @@ mod tests {
node_lookup.insert("label".to_owned(), 0);
let expected = Schema::Record {
- name: Name::new("test"),
+ name: Name::new("test").unwrap(),
doc: None,
fields: vec![RecordField {
name: "recordField".to_string(),
doc: None,
default: None,
schema: Schema::Record {
- name: Name::new("Node"),
+ name: Name::new("Node").unwrap(),
doc: None,
fields: vec![
RecordField {
@@ -1628,7 +1633,7 @@ mod tests {
doc: None,
default: None,
schema: Schema::Array(Box::new(Schema::Ref {
- name: Name::new("Node"),
+ name: Name::new("Node").unwrap(),
})),
order: RecordFieldOrder::Ascending,
position: 1,
@@ -2034,7 +2039,7 @@ mod tests {
).unwrap();
let expected = Schema::Enum {
- name: Name::new("Suit"),
+ name: Name::new("Suit").unwrap(),
doc: None,
symbols: vec![
"diamonds".to_owned(),
@@ -2070,7 +2075,7 @@ mod tests {
let schema = Schema::parse_str(r#"{"type": "fixed", "name": "test", "size": 16}"#).unwrap();
let expected = Schema::Fixed {
- name: Name::new("test"),
+ name: Name::new("test").unwrap(),
doc: None,
size: 16usize,
};
@@ -2086,7 +2091,7 @@ mod tests {
.unwrap();
let expected = Schema::Fixed {
- name: Name::new("test"),
+ name: Name::new("test").unwrap(),
doc: Some(String::from("FixedSchema documentation")),
size: 16usize,
};
@@ -2334,4 +2339,30 @@ mod tests {
panic!("Expected a record schema!");
}
}
+
+ #[test]
+ /// Zero-length namespace is considered as no-namespace.
+ fn test_namespace_from_name_with_empty_value() {
+ let name = Name::new(".name").unwrap();
+ assert_eq!(name.name, "name");
+ assert_eq!(name.namespace, None);
+ }
+
+ #[test]
+ /// Whitespace is not allowed in the name.
+ fn test_name_with_whitespace_value() {
+ match Name::new(" ") {
+ Err(Error::InvalidSchemaName(_, _)) => {}
+ _ => panic!("Expected an Error::InvalidSchemaName!"),
+ }
+ }
+
+ #[test]
+ /// The name must be non-empty.
+ fn test_name_with_no_name_part() {
+ match Name::new("space.") {
+ Err(Error::InvalidSchemaName(_, _)) => {}
+ _ => panic!("Expected an Error::InvalidSchemaName!"),
+ }
+ }
}
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index 85e7341..e2f6962 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -19,7 +19,7 @@
use crate::{
decimal::Decimal,
duration::Duration,
- schema::{Precision, RecordField, Scale, Schema, SchemaKind, UnionSchema},
+ schema::{Name, Precision, RecordField, Scale, Schema, SchemaKind, UnionSchema},
AvroResult, Error,
};
use serde_json::{Number, Value as JsonValue};
@@ -408,11 +408,20 @@ impl Value {
/// See [Schema Resolution](https://avro.apache.org/docs/current/spec.html#Schema+Resolution)
/// in the Avro specification for the full set of rules of schema
/// resolution.
- pub fn resolve(mut self, schema: &Schema) -> AvroResult<Self> {
+ pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
+ let mut schemas_by_name: HashMap<Name, Schema> = HashMap::new();
+ self.resolve_internal(schema, &mut schemas_by_name)
+ }
+
+ fn resolve_internal(
+ mut self,
+ schema: &Schema,
+ schemas_by_name: &mut HashMap<Name, Schema>,
+ ) -> AvroResult<Self> {
pub fn resolve0(
value: &mut Value,
schema: &Schema,
- schemas_by_name: &mut HashMap<String, Schema>,
+ schemas_by_name: &mut HashMap<Name, Schema>,
) -> AvroResult<Value> {
// Check if this schema is a union, and if the reader schema is not.
if SchemaKind::from(&value.clone()) == SchemaKind::Union
@@ -428,10 +437,10 @@ impl Value {
let val: Value = value.clone();
match *schema {
Schema::Ref { ref name } => {
- if let Some(resolved) = schemas_by_name.get(name.name.as_str()) {
+ if let Some(resolved) = schemas_by_name.get(name) {
resolve0(value, resolved, &mut schemas_by_name.clone())
} else {
- Err(Error::SchemaResolutionError(name.name.clone()))
+ Err(Error::SchemaResolutionError(name.fullname(None)))
}
}
Schema::Null => val.resolve_null(),
@@ -443,27 +452,27 @@ impl Value {
Schema::Bytes => val.resolve_bytes(),
Schema::String => val.resolve_string(),
Schema::Fixed { ref name, size, .. } => {
- schemas_by_name.insert(name.name.clone(), schema.clone());
+ schemas_by_name.insert(name.clone(), schema.clone());
val.resolve_fixed(size)
}
- Schema::Union(ref inner) => val.resolve_union(inner),
+ Schema::Union(ref inner) => val.resolve_union(inner, schemas_by_name),
Schema::Enum {
ref name,
ref symbols,
..
} => {
- schemas_by_name.insert(name.name.clone(), schema.clone());
+ schemas_by_name.insert(name.clone(), schema.clone());
val.resolve_enum(symbols)
}
- Schema::Array(ref inner) => val.resolve_array(inner),
- Schema::Map(ref inner) => val.resolve_map(inner),
+ Schema::Array(ref inner) => val.resolve_array(inner, schemas_by_name),
+ Schema::Map(ref inner) => val.resolve_map(inner, schemas_by_name),
Schema::Record {
ref name,
ref fields,
..
} => {
- schemas_by_name.insert(name.name.clone(), schema.clone());
- val.resolve_record(fields)
+ schemas_by_name.insert(name.clone(), schema.clone());
+ val.resolve_record(fields, schemas_by_name)
}
Schema::Decimal {
scale,
@@ -479,9 +488,7 @@ impl Value {
Schema::Uuid => val.resolve_uuid(),
}
}
-
- let mut schemas_by_name: HashMap<String, Schema> = HashMap::new();
- resolve0(&mut self, schema, &mut schemas_by_name)
+ resolve0(&mut self, schema, schemas_by_name)
}
fn resolve_uuid(self) -> Result<Self, Error> {
@@ -711,25 +718,46 @@ impl Value {
}
}
- fn resolve_union(self, schema: &UnionSchema) -> Result<Self, Error> {
+ fn resolve_union(
+ self,
+ schema: &UnionSchema,
+ schemas_by_name: &mut HashMap<Name, Schema>,
+ ) -> Result<Self, Error> {
let v = match self {
// Both are unions case.
Value::Union(_i, v) => *v,
// Reader is a union, but writer is not.
v => v,
};
+
+ schema.schemas.iter().for_each(|s| match s {
+ Schema::Record { name, .. }
+ | Schema::Enum { name, .. }
+ | Schema::Fixed { name, .. } => {
+ schemas_by_name.insert(name.clone(), s.clone());
+ }
+ _ => (),
+ });
+
// Find the first match in the reader schema.
// FIXME: this might be wrong when the union consists of multiple same records that have different names
let (i, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
- Ok(Value::Union(i as u32, Box::new(v.resolve(inner)?)))
+ Ok(Value::Union(
+ i as u32,
+ Box::new(v.resolve_internal(inner, schemas_by_name)?),
+ ))
}
- fn resolve_array(self, schema: &Schema) -> Result<Self, Error> {
+ fn resolve_array(
+ self,
+ schema: &Schema,
+ schemas_by_name: &mut HashMap<Name, Schema>,
+ ) -> Result<Self, Error> {
match self {
Value::Array(items) => Ok(Value::Array(
items
.into_iter()
- .map(|item| item.resolve(schema))
+ .map(|item| item.resolve_internal(schema, schemas_by_name))
.collect::<Result<_, _>>()?,
)),
other => Err(Error::GetArray {
@@ -739,12 +767,20 @@ impl Value {
}
}
- fn resolve_map(self, schema: &Schema) -> Result<Self, Error> {
+ fn resolve_map(
+ self,
+ schema: &Schema,
+ schemas_by_name: &mut HashMap<Name, Schema>,
+ ) -> Result<Self, Error> {
match self {
Value::Map(items) => Ok(Value::Map(
items
.into_iter()
- .map(|(key, value)| value.resolve(schema).map(|value| (key, value)))
+ .map(|(key, value)| {
+ value
+ .resolve_internal(schema, schemas_by_name)
+ .map(|value| (key, value))
+ })
.collect::<Result<_, _>>()?,
)),
other => Err(Error::GetMap {
@@ -754,7 +790,11 @@ impl Value {
}
}
- fn resolve_record(self, fields: &[RecordField]) -> Result<Self, Error> {
+ fn resolve_record(
+ self,
+ fields: &[RecordField],
+ schemas_by_name: &mut HashMap<Name, Schema>,
+ ) -> Result<Self, Error> {
let mut items = match self {
Value::Map(items) => Ok(items),
Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
@@ -785,7 +825,10 @@ impl Value {
Schema::Null => Value::Union(0, Box::new(Value::Null)),
_ => Value::Union(
0,
- Box::new(Value::from(value.clone()).resolve(first)?),
+ Box::new(
+ Value::from(value.clone())
+ .resolve_internal(first, schemas_by_name)?,
+ ),
),
}
}
@@ -797,7 +840,7 @@ impl Value {
},
};
value
- .resolve(&field.schema)
+ .resolve_internal(&field.schema, schemas_by_name)
.map(|value| (field.name.clone(), value))
})
.collect::<Result<Vec<_>, _>>()?;
@@ -890,7 +933,7 @@ mod tests {
fn validate_fixed() {
let schema = Schema::Fixed {
size: 4,
- name: Name::new("some_fixed"),
+ name: Name::new("some_fixed").unwrap(),
doc: None,
};
@@ -903,7 +946,7 @@ mod tests {
#[test]
fn validate_enum() {
let schema = Schema::Enum {
- name: Name::new("some_enum"),
+ name: Name::new("some_enum").unwrap(),
doc: None,
symbols: vec![
"spades".to_string(),
@@ -920,7 +963,7 @@ mod tests {
assert!(!Value::String("lorem".to_string()).validate(&schema));
let other_schema = Schema::Enum {
- name: Name::new("some_other_enum"),
+ name: Name::new("some_other_enum").unwrap(),
doc: None,
symbols: vec![
"hearts".to_string(),
@@ -944,7 +987,7 @@ mod tests {
// ]
// }
let schema = Schema::Record {
- name: Name::new("some_record"),
+ name: Name::new("some_record").unwrap(),
doc: None,
fields: vec![
RecordField {
@@ -1095,7 +1138,7 @@ mod tests {
precision: 10,
scale: 1,
inner: Box::new(Schema::Fixed {
- name: Name::new("decimal"),
+ name: Name::new("decimal").unwrap(),
size: 20,
doc: None
})
@@ -1323,4 +1366,269 @@ mod tests {
JsonValue::String("936da01f-9abd-4d9d-80c7-02af85c822a8".into())
);
}
+
+ #[test]
+ fn test_avro_3433_recursive_resolves_record() {
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ },
+ {
+ "name":"b",
+ "type":"Inner"
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+ let outer = Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
+ outer
+ .resolve(&schema)
+ .expect("Record definition defined in one field must be availible in other field");
+ }
+
+ #[test]
+ fn test_avro_3433_recursive_resolves_array() {
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"array",
+ "items": {
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ }
+ },
+ {
+ "name":"b",
+ "type": {
+ "type":"map",
+ "values":"Inner"
+ }
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+ let outer_value = Value::Record(vec![
+ ("a".into(), Value::Array(vec![inner_value1])),
+ (
+ "b".into(),
+ Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
+ ),
+ ]);
+ outer_value
+ .resolve(&schema)
+ .expect("Record defined in array definition must be resolveable from map");
+ }
+
+ #[test]
+ fn test_avro_3433_recursive_resolves_map() {
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ },
+ {
+ "name":"b",
+ "type": {
+ "type":"map",
+ "values":"Inner"
+ }
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+ let outer_value = Value::Record(vec![
+ ("a".into(), inner_value1),
+ (
+ "b".into(),
+ Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
+ ),
+ ]);
+ outer_value
+ .resolve(&schema)
+ .expect("Record defined in record field must be resolvable from map field");
+ }
+
+ #[test]
+ fn test_avro_3433_recursive_resolves_record_wrapper() {
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ },
+ {
+ "name":"b",
+ "type": {
+ "type":"record",
+ "name": "InnerWrapper",
+ "fields": [ {
+ "name":"j",
+ "type":"Inner"
+ }]
+ }
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![(
+ "j".into(),
+ Value::Record(vec![("z".into(), Value::Int(6))]),
+ )]);
+ let outer_value =
+ Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
+ outer_value.resolve(&schema).expect("Record schema defined in field must be resolvable in Record schema defined in other field");
+ }
+
+ #[test]
+ fn test_avro_3433_recursive_resolves_map_and_array() {
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":{
+ "type":"map",
+ "values": {
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }
+ }
+ },
+ {
+ "name":"b",
+ "type": {
+ "type":"array",
+ "items":"Inner"
+ }
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+ let outer_value = Value::Record(vec![
+ (
+ "a".into(),
+ Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
+ ),
+ ("b".into(), Value::Array(vec![inner_value1])),
+ ]);
+ outer_value
+ .resolve(&schema)
+ .expect("Record defined in map definition must be resolveable from array");
+ }
+
+ #[test]
+ fn test_avro_3433_recursive_resolves_union() {
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"TestStruct",
+ "fields": [
+ {
+ "name":"a",
+ "type":["null", {
+ "type":"record",
+ "name": "Inner",
+ "fields": [ {
+ "name":"z",
+ "type":"int"
+ }]
+ }]
+ },
+ {
+ "name":"b",
+ "type":"Inner"
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+ let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+ let outer1 = Value::Record(vec![
+ ("a".into(), inner_value1),
+ ("b".into(), inner_value2.clone()),
+ ]);
+ outer1
+ .resolve(&schema)
+ .expect("Record definition defined in union must be resolvabled in other field");
+ let outer2 = Value::Record(vec![("a".into(), Value::Null), ("b".into(), inner_value2)]);
+ outer2
+ .resolve(&schema)
+ .expect("Record definition defined in union must be resolvabled in other field");
+ }
}
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 9aa8d7d..e9518e3 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -520,7 +520,7 @@ mod tests {
fn decimal_fixed() -> TestResult<()> {
let size = 30;
let inner = Schema::Fixed {
- name: Name::new("decimal"),
+ name: Name::new("decimal").unwrap(),
doc: None,
size,
};
@@ -558,7 +558,7 @@ mod tests {
#[test]
fn duration() -> TestResult<()> {
let inner = Schema::Fixed {
- name: Name::new("duration"),
+ name: Name::new("duration").unwrap(),
doc: None,
size: 12,
};
diff --git a/lang/rust/avro/tests/schema.rs b/lang/rust/avro/tests/schema.rs
index be2d9a8..24ae247 100644
--- a/lang/rust/avro/tests/schema.rs
+++ b/lang/rust/avro/tests/schema.rs
@@ -64,7 +64,7 @@ const FIXED_EXAMPLES: &[(&str, bool)] = &[
}"#,
true,
),
- (r#"{"type": "fixed", "name": "Missing size"}"#, false),
+ (r#"{"type": "fixed", "name": "MissingSize"}"#, false),
(r#"{"type": "fixed", "size": 314}"#, false),
];