You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/03/20 19:40:58 UTC
[avro] branch branch-1.11 updated: AVRO-3461: Resolve flow namespace resolution work (#1609)
This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.11 by this push:
new e9faaaa AVRO-3461: Resolve flow namespace resolution work (#1609)
e9faaaa is described below
commit e9faaaab775606d6e8478f89b5c283c403a0c5d6
Author: Jack Klamer <jf...@gmail.com>
AuthorDate: Sun Mar 20 14:40:26 2022 -0500
AVRO-3461: Resolve flow namespace resolution work (#1609)
* remove clones
* [AVRO-3461] added in fixes and tests
(cherry picked from commit c7fdf69d811b57e888f31ffa520ebeb2f055f55b)
---
lang/rust/avro/src/schema.rs | 4 +-
lang/rust/avro/src/types.rs | 434 ++++++++++++++++++++++++++++++++-----------
2 files changed, 324 insertions(+), 114 deletions(-)
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 81b7499..815aea6 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -188,8 +188,8 @@ impl SchemaKind {
}
}
-impl<'a> From<&'a types::Value> for SchemaKind {
- fn from(value: &'a types::Value) -> Self {
+impl From<&types::Value> for SchemaKind {
+ fn from(value: &types::Value) -> Self {
use crate::types::Value;
match value {
Value::Null => Self::Null,
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index 322d21b..685904b 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -19,7 +19,9 @@
use crate::{
decimal::Decimal,
duration::Duration,
- schema::{Name, Precision, RecordField, Scale, Schema, SchemaKind, UnionSchema},
+ schema::{
+ NamesRef, Precision, RecordField, ResolvedSchema, Scale, Schema, SchemaKind, UnionSchema,
+ },
AvroResult, Error,
};
use serde_json::{Number, Value as JsonValue};
@@ -410,86 +412,57 @@ impl Value {
/// resolution.
pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
// FIXME transition to using resolved Schema
- let mut schemas_by_name: HashMap<Name, Schema> = HashMap::new();
- self.resolve_internal(schema, &mut schemas_by_name)
- }
-
- fn resolve_internal(
- mut self,
- schema: &Schema,
- schemas_by_name: &mut HashMap<Name, Schema>,
- ) -> AvroResult<Self> {
- pub fn resolve0(
- value: &mut Value,
- schema: &Schema,
- schemas_by_name: &mut HashMap<Name, Schema>,
- ) -> AvroResult<Value> {
- // Check if this schema is a union, and if the reader schema is not.
- if SchemaKind::from(&value.clone()) == SchemaKind::Union
- && SchemaKind::from(schema) != SchemaKind::Union
- {
- // Pull out the Union, and attempt to resolve against it.
- let v = match value {
- Value::Union(_i, b) => &**b,
- _ => unreachable!(),
- };
- *value = v.clone();
- }
- let val: Value = value.clone();
- match *schema {
- Schema::Ref { ref name } => {
- if let Some(resolved) = schemas_by_name.get(name) {
- resolve0(value, resolved, &mut schemas_by_name.clone())
- } else {
- Err(Error::SchemaResolutionError(name.clone()))
- }
- }
- Schema::Null => val.resolve_null(),
- Schema::Boolean => val.resolve_boolean(),
- Schema::Int => val.resolve_int(),
- Schema::Long => val.resolve_long(),
- Schema::Float => val.resolve_float(),
- Schema::Double => val.resolve_double(),
- Schema::Bytes => val.resolve_bytes(),
- Schema::String => val.resolve_string(),
- Schema::Fixed { ref name, size, .. } => {
- schemas_by_name.insert(name.clone(), schema.clone());
- val.resolve_fixed(size)
- }
- Schema::Union(ref inner) => val.resolve_union(inner, schemas_by_name),
- Schema::Enum {
- ref name,
- ref symbols,
- ..
- } => {
- schemas_by_name.insert(name.clone(), schema.clone());
- val.resolve_enum(symbols)
- }
- Schema::Array(ref inner) => val.resolve_array(inner, schemas_by_name),
- Schema::Map(ref inner) => val.resolve_map(inner, schemas_by_name),
- Schema::Record {
- ref name,
- ref fields,
- ..
- } => {
- schemas_by_name.insert(name.clone(), schema.clone());
- val.resolve_record(fields, schemas_by_name)
+ let rs = ResolvedSchema::try_from(schema)?;
+ self.resolve_internal(schema, rs.get_names())
+ }
+
+ fn resolve_internal(mut self, schema: &Schema, names: &NamesRef) -> AvroResult<Self> {
+ // Check if this schema is a union, and if the reader schema is not.
+ if SchemaKind::from(&self) == SchemaKind::Union
+ && SchemaKind::from(schema) != SchemaKind::Union
+ {
+ // Pull out the Union, and attempt to resolve against it.
+ let v = match self {
+ Value::Union(_i, b) => *b,
+ _ => unreachable!(),
+ };
+ self = v;
+ }
+ match *schema {
+ Schema::Ref { ref name } => {
+ if let Some(resolved) = names.get(name) {
+ self.resolve_internal(resolved, names)
+ } else {
+ Err(Error::SchemaResolutionError(name.clone()))
}
- Schema::Decimal {
- scale,
- precision,
- ref inner,
- } => val.resolve_decimal(precision, scale, inner),
- Schema::Date => val.resolve_date(),
- Schema::TimeMillis => val.resolve_time_millis(),
- Schema::TimeMicros => val.resolve_time_micros(),
- Schema::TimestampMillis => val.resolve_timestamp_millis(),
- Schema::TimestampMicros => val.resolve_timestamp_micros(),
- Schema::Duration => val.resolve_duration(),
- Schema::Uuid => val.resolve_uuid(),
}
+ Schema::Null => self.resolve_null(),
+ Schema::Boolean => self.resolve_boolean(),
+ Schema::Int => self.resolve_int(),
+ Schema::Long => self.resolve_long(),
+ Schema::Float => self.resolve_float(),
+ Schema::Double => self.resolve_double(),
+ Schema::Bytes => self.resolve_bytes(),
+ Schema::String => self.resolve_string(),
+ Schema::Fixed { size, .. } => self.resolve_fixed(size),
+ Schema::Union(ref inner) => self.resolve_union(inner, names),
+ Schema::Enum { ref symbols, .. } => self.resolve_enum(symbols),
+ Schema::Array(ref inner) => self.resolve_array(inner, names),
+ Schema::Map(ref inner) => self.resolve_map(inner, names),
+ Schema::Record { ref fields, .. } => self.resolve_record(fields, names),
+ Schema::Decimal {
+ scale,
+ precision,
+ ref inner,
+ } => self.resolve_decimal(precision, scale, inner),
+ Schema::Date => self.resolve_date(),
+ Schema::TimeMillis => self.resolve_time_millis(),
+ Schema::TimeMicros => self.resolve_time_micros(),
+ Schema::TimestampMillis => self.resolve_timestamp_millis(),
+ Schema::TimestampMicros => self.resolve_timestamp_micros(),
+ Schema::Duration => self.resolve_duration(),
+ Schema::Uuid => self.resolve_uuid(),
}
- resolve0(&mut self, schema, schemas_by_name)
}
fn resolve_uuid(self) -> Result<Self, Error> {
@@ -719,11 +692,7 @@ impl Value {
}
}
- fn resolve_union(
- self,
- schema: &UnionSchema,
- schemas_by_name: &mut HashMap<Name, Schema>,
- ) -> Result<Self, Error> {
+ fn resolve_union(self, schema: &UnionSchema, names: &NamesRef) -> Result<Self, Error> {
let v = match self {
// Both are unions case.
Value::Union(_i, v) => *v,
@@ -731,34 +700,21 @@ impl Value {
v => v,
};
- schema.schemas.iter().for_each(|s| match s {
- Schema::Record { name, .. }
- | Schema::Enum { name, .. }
- | Schema::Fixed { name, .. } => {
- schemas_by_name.insert(name.clone(), s.clone());
- }
- _ => (),
- });
-
// Find the first match in the reader schema.
// FIXME: this might be wrong when the union consists of multiple same records that have different names
let (i, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
Ok(Value::Union(
i as u32,
- Box::new(v.resolve_internal(inner, schemas_by_name)?),
+ Box::new(v.resolve_internal(inner, names)?),
))
}
- fn resolve_array(
- self,
- schema: &Schema,
- schemas_by_name: &mut HashMap<Name, Schema>,
- ) -> Result<Self, Error> {
+ fn resolve_array(self, schema: &Schema, names: &NamesRef) -> Result<Self, Error> {
match self {
Value::Array(items) => Ok(Value::Array(
items
.into_iter()
- .map(|item| item.resolve_internal(schema, schemas_by_name))
+ .map(|item| item.resolve_internal(schema, names))
.collect::<Result<_, _>>()?,
)),
other => Err(Error::GetArray {
@@ -768,18 +724,14 @@ impl Value {
}
}
- fn resolve_map(
- self,
- schema: &Schema,
- schemas_by_name: &mut HashMap<Name, Schema>,
- ) -> Result<Self, Error> {
+ fn resolve_map(self, schema: &Schema, names: &NamesRef) -> Result<Self, Error> {
match self {
Value::Map(items) => Ok(Value::Map(
items
.into_iter()
.map(|(key, value)| {
value
- .resolve_internal(schema, schemas_by_name)
+ .resolve_internal(schema, names)
.map(|value| (key, value))
})
.collect::<Result<_, _>>()?,
@@ -791,11 +743,7 @@ impl Value {
}
}
- fn resolve_record(
- self,
- fields: &[RecordField],
- schemas_by_name: &mut HashMap<Name, Schema>,
- ) -> Result<Self, Error> {
+ fn resolve_record(self, fields: &[RecordField], names: &NamesRef) -> Result<Self, Error> {
let mut items = match self {
Value::Map(items) => Ok(items),
Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
@@ -828,7 +776,7 @@ impl Value {
0,
Box::new(
Value::from(value.clone())
- .resolve_internal(first, schemas_by_name)?,
+ .resolve_internal(first, names)?,
),
),
}
@@ -841,7 +789,7 @@ impl Value {
},
};
value
- .resolve_internal(&field.schema, schemas_by_name)
+ .resolve_internal(&field.schema, names)
.map(|value| (field.name.clone(), value))
})
.collect::<Result<Vec<_>, _>>()?;
@@ -1637,4 +1585,266 @@ mod tests {
.resolve(&schema)
.expect("Record definition defined in union must be resolvabled in other field");
}
+
+ #[test]
+ fn test_avro_3461_test_multi_level_resolve_outer_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "middle_record_name",
+ "fields":[
+ {
+ "name":"middle_field_1",
+ "type":[
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "space.inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
+ let middle_record_variation_1 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ )]);
+ let middle_record_variation_2 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(1, Box::new(inner_record.clone())),
+ )]);
+ let outer_record_variation_1 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_2 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_1)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_3 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_2)),
+ ),
+ ("outer_field_2".into(), inner_record),
+ ]);
+
+ outer_record_variation_1
+ .resolve(&schema)
+ .expect("Should be able to resolve value to the schema that is it's definition");
+ outer_record_variation_2
+ .resolve(&schema)
+ .expect("Should be able to resolve value to the schema that is it's definition");
+ outer_record_variation_3
+ .resolve(&schema)
+ .expect("Should be able to resolve value to the schema that is it's definition");
+ }
+
+ #[test]
+ fn test_avro_3461_test_multi_level_resolve_middle_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "middle_record_name",
+ "namespace":"middle_namespace",
+ "fields":[
+ {
+ "name":"middle_field_1",
+ "type":[
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "middle_namespace.inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+ let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
+ let middle_record_variation_1 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ )]);
+ let middle_record_variation_2 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(1, Box::new(inner_record.clone())),
+ )]);
+ let outer_record_variation_1 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_2 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_1)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_3 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_2)),
+ ),
+ ("outer_field_2".into(), inner_record),
+ ]);
+
+ outer_record_variation_1
+ .resolve(&schema)
+ .expect("Should be able to resolve value to the schema that is it's definition");
+ outer_record_variation_2
+ .resolve(&schema)
+ .expect("Should be able to resolve value to the schema that is it's definition");
+ outer_record_variation_3
+ .resolve(&schema)
+ .expect("Should be able to resolve value to the schema that is it's definition");
+ }
+
+ #[test]
+ fn test_avro_3461_test_multi_level_resolve_inner_namespace() {
+ let schema = r#"
+ {
+ "name": "record_name",
+ "namespace": "space",
+ "type": "record",
+ "fields": [
+ {
+ "name": "outer_field_1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "middle_record_name",
+ "namespace":"middle_namespace",
+ "fields":[
+ {
+ "name":"middle_field_1",
+ "type":[
+ "null",
+ {
+ "type":"record",
+ "name":"inner_record_name",
+ "namespace":"inner_namespace",
+ "fields":[
+ {
+ "name":"inner_field_1",
+ "type":"double"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "name": "outer_field_2",
+ "type" : "inner_namespace.inner_record_name"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema).unwrap();
+
+ let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
+ let middle_record_variation_1 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ )]);
+ let middle_record_variation_2 = Value::Record(vec![(
+ "middle_field_1".into(),
+ Value::Union(1, Box::new(inner_record.clone())),
+ )]);
+ let outer_record_variation_1 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(0, Box::new(Value::Null)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_2 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_1)),
+ ),
+ ("outer_field_2".into(), inner_record.clone()),
+ ]);
+ let outer_record_variation_3 = Value::Record(vec![
+ (
+ "outer_field_1".into(),
+ Value::Union(1, Box::new(middle_record_variation_2)),
+ ),
+ ("outer_field_2".into(), inner_record),
+ ]);
+
+ outer_record_variation_1
+ .resolve(&schema)
+ .expect("Should be able to resolve value to the schema that is it's definition");
+ outer_record_variation_2
+ .resolve(&schema)
+ .expect("Should be able to resolve value to the schema that is it's definition");
+ outer_record_variation_3
+ .resolve(&schema)
+ .expect("Should be able to resolve value to the schema that is it's definition");
+ }
}