You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/11/18 12:20:01 UTC
[avro] 01/01: AVRO-3674: Pass the correct enclosing namespace to validate and resolve_internal
This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch avro-3674
in repository https://gitbox.apache.org/repos/asf/avro.git
commit bf6efc7e5caf56e3f469b531e895e5b43cbb0ee9
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Fri Nov 18 14:10:15 2022 +0200
AVRO-3674: Pass the correct enclosing namespace to validate and resolve_internal
Expose getters for Schema's name and namespace
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---
lang/rust/avro/src/schema.rs | 18 ++++++-
lang/rust/avro/src/types.rs | 119 +++++++++++++++++++++++++++----------------
lang/rust/avro/src/writer.rs | 18 ++++---
3 files changed, 103 insertions(+), 52 deletions(-)
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 2e2443ed3..6aa1d7fe0 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -696,7 +696,7 @@ impl UnionSchema {
let rs =
ResolvedSchema::try_from(*schema).expect("Schema didn't successfully parse");
value
- .validate_internal(schema, rs.get_names(), &None)
+ .validate_internal(schema, rs.get_names(), &schema.namespace())
.is_none()
})
}
@@ -823,6 +823,22 @@ impl Schema {
_ => None,
}
}
+
+ /// Returns the name of the schema if it has one.
+ pub fn name(&self) -> Option<&Name> {
+ match self {
+ Schema::Ref { ref name, .. }
+ | Schema::Record { ref name, .. }
+ | Schema::Enum { ref name, .. }
+ | Schema::Fixed { ref name, .. } => Some(name),
+ _ => None,
+ }
+ }
+
+ /// Returns the namespace of the schema if it has one.
+ pub fn namespace(&self) -> Namespace {
+ self.name().and_then(|n| n.namespace.clone())
+ }
}
impl Parser {
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index fadff4481..c91c0bf47 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -340,14 +340,9 @@ impl Value {
/// for the full set of rules of schema validation.
pub fn validate(&self, schema: &Schema) -> bool {
let rs = ResolvedSchema::try_from(schema).expect("Schema didn't successfully parse");
- let namespace = match schema {
- Schema::Record { name, .. }
- | Schema::Enum { name, .. }
- | Schema::Fixed { name, .. } => &name.namespace,
- _ => &None,
- };
+ let enclosing_namespace = schema.namespace();
- match self.validate_internal(schema, rs.get_names(), namespace) {
+ match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) {
Some(error_msg) => {
error!(
"Invalid value: {:?} for schema: {:?}. Reason: {}",
@@ -372,17 +367,11 @@ impl Value {
&self,
schema: &Schema,
names: &HashMap<Name, S>,
- namespace: &Namespace,
+ enclosing_namespace: &Namespace,
) -> Option<String> {
match (self, schema) {
(_, &Schema::Ref { ref name }) => {
- let name = match namespace {
- Some(namespace) => Name {
- name: name.name.to_owned(),
- namespace: Some(namespace.to_owned()),
- },
- None => name.to_owned(),
- };
+ let name = name.fully_qualified_name(enclosing_namespace);
names.get(&name).map_or_else(
|| {
@@ -392,7 +381,7 @@ impl Value {
names.keys()
))
},
- |s| self.validate_internal(s.borrow(), names, namespace),
+ |s| self.validate_internal(s.borrow(), names, &name.namespace),
)
}
(&Value::Null, &Schema::Null) => None,
@@ -473,7 +462,7 @@ impl Value {
(&Value::Union(i, ref value), &Schema::Union(ref inner)) => inner
.variants()
.get(i as usize)
- .map(|schema| value.validate_internal(schema, names, namespace))
+ .map(|schema| value.validate_internal(schema, names, enclosing_namespace))
.unwrap_or_else(|| Some(format!("No schema in the union at position '{}'", i))),
(v, &Schema::Union(ref inner)) => match inner.find_schema(v) {
Some(_) => None,
@@ -481,12 +470,18 @@ impl Value {
},
(&Value::Array(ref items), &Schema::Array(ref inner)) => {
items.iter().fold(None, |acc, item| {
- Value::accumulate(acc, item.validate_internal(inner, names, namespace))
+ Value::accumulate(
+ acc,
+ item.validate_internal(inner, names, enclosing_namespace),
+ )
})
}
(&Value::Map(ref items), &Schema::Map(ref inner)) => {
items.iter().fold(None, |acc, (_, value)| {
- Value::accumulate(acc, value.validate_internal(inner, names, namespace))
+ Value::accumulate(
+ acc,
+ value.validate_internal(inner, names, enclosing_namespace),
+ )
})
}
(
@@ -522,7 +517,11 @@ impl Value {
let field = &fields[*idx];
Value::accumulate(
acc,
- record_field.validate_internal(&field.schema, names, namespace),
+ record_field.validate_internal(
+ &field.schema,
+ names,
+ enclosing_namespace,
+ ),
)
}
None => Value::accumulate(
@@ -538,7 +537,7 @@ impl Value {
(&Value::Map(ref items), &Schema::Record { ref fields, .. }) => {
fields.iter().fold(None, |acc, field| {
if let Some(item) = items.get(&field.name) {
- let res = item.validate_internal(&field.schema, names, namespace);
+ let res = item.validate_internal(&field.schema, names, enclosing_namespace);
Value::accumulate(acc, res)
} else if !field.is_nullable() {
Value::accumulate(
@@ -564,12 +563,18 @@ impl Value {
/// in the Avro specification for the full set of rules of schema
/// resolution.
pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
+ let enclosing_namespace = schema.namespace();
// FIXME transition to using resolved Schema
let rs = ResolvedSchema::try_from(schema)?;
- self.resolve_internal(schema, rs.get_names())
+ self.resolve_internal(schema, rs.get_names(), &enclosing_namespace)
}
- fn resolve_internal(mut self, schema: &Schema, names: &NamesRef) -> AvroResult<Self> {
+ fn resolve_internal(
+ mut self,
+ schema: &Schema,
+ names: &NamesRef,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Self> {
// Check if this schema is a union, and if the reader schema is not.
if SchemaKind::from(&self) == SchemaKind::Union
&& SchemaKind::from(schema) != SchemaKind::Union
@@ -583,9 +588,11 @@ impl Value {
}
match *schema {
Schema::Ref { ref name } => {
- if let Some(resolved) = names.get(name) {
- info!("Resolved {:?}", name);
- self.resolve_internal(resolved, names)
+ let name = name.fully_qualified_name(enclosing_namespace);
+
+ if let Some(resolved) = names.get(&name) {
+ debug!("Resolved {:?}", name);
+ self.resolve_internal(resolved, names, &name.namespace)
} else {
error!("Failed to resolve schema {:?}", name);
Err(Error::SchemaResolutionError(name.clone()))
@@ -600,11 +607,13 @@ impl Value {
Schema::Bytes => self.resolve_bytes(),
Schema::String => self.resolve_string(),
Schema::Fixed { size, .. } => self.resolve_fixed(size),
- Schema::Union(ref inner) => self.resolve_union(inner, names),
+ Schema::Union(ref inner) => self.resolve_union(inner, names, enclosing_namespace),
Schema::Enum { ref symbols, .. } => self.resolve_enum(symbols),
- Schema::Array(ref inner) => self.resolve_array(inner, names),
- Schema::Map(ref inner) => self.resolve_map(inner, names),
- Schema::Record { ref fields, .. } => self.resolve_record(fields, names),
+ Schema::Array(ref inner) => self.resolve_array(inner, names, enclosing_namespace),
+ Schema::Map(ref inner) => self.resolve_map(inner, names, enclosing_namespace),
+ Schema::Record { ref fields, .. } => {
+ self.resolve_record(fields, names, enclosing_namespace)
+ }
Schema::Decimal {
scale,
precision,
@@ -848,7 +857,12 @@ impl Value {
}
}
- fn resolve_union(self, schema: &UnionSchema, names: &NamesRef) -> Result<Self, Error> {
+ fn resolve_union(
+ self,
+ schema: &UnionSchema,
+ names: &NamesRef,
+ enclosing_namespace: &Namespace,
+ ) -> Result<Self, Error> {
let v = match self {
// Both are unions case.
Value::Union(_i, v) => *v,
@@ -861,16 +875,21 @@ impl Value {
let (i, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
Ok(Value::Union(
i as u32,
- Box::new(v.resolve_internal(inner, names)?),
+ Box::new(v.resolve_internal(inner, names, enclosing_namespace)?),
))
}
- fn resolve_array(self, schema: &Schema, names: &NamesRef) -> Result<Self, Error> {
+ fn resolve_array(
+ self,
+ schema: &Schema,
+ names: &NamesRef,
+ enclosing_namespace: &Namespace,
+ ) -> Result<Self, Error> {
match self {
Value::Array(items) => Ok(Value::Array(
items
.into_iter()
- .map(|item| item.resolve_internal(schema, names))
+ .map(|item| item.resolve_internal(schema, names, enclosing_namespace))
.collect::<Result<_, _>>()?,
)),
other => Err(Error::GetArray {
@@ -880,14 +899,19 @@ impl Value {
}
}
- fn resolve_map(self, schema: &Schema, names: &NamesRef) -> Result<Self, Error> {
+ fn resolve_map(
+ self,
+ schema: &Schema,
+ names: &NamesRef,
+ enclosing_namespace: &Namespace,
+ ) -> Result<Self, Error> {
match self {
Value::Map(items) => Ok(Value::Map(
items
.into_iter()
.map(|(key, value)| {
value
- .resolve_internal(schema, names)
+ .resolve_internal(schema, names, enclosing_namespace)
.map(|value| (key, value))
})
.collect::<Result<_, _>>()?,
@@ -899,7 +923,12 @@ impl Value {
}
}
- fn resolve_record(self, fields: &[RecordField], names: &NamesRef) -> Result<Self, Error> {
+ fn resolve_record(
+ self,
+ fields: &[RecordField],
+ names: &NamesRef,
+ enclosing_namespace: &Namespace,
+ ) -> Result<Self, Error> {
let mut items = match self {
Value::Map(items) => Ok(items),
Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
@@ -930,10 +959,11 @@ impl Value {
Schema::Null => Value::Union(0, Box::new(Value::Null)),
_ => Value::Union(
0,
- Box::new(
- Value::from(value.clone())
- .resolve_internal(first, names)?,
- ),
+ Box::new(Value::from(value.clone()).resolve_internal(
+ first,
+ names,
+ enclosing_namespace,
+ )?),
),
}
}
@@ -945,7 +975,7 @@ impl Value {
},
};
value
- .resolve_internal(&field.schema, names)
+ .resolve_internal(&field.schema, names, enclosing_namespace)
.map(|value| (field.name.clone(), value))
})
.collect::<Result<Vec<_>, _>>()?;
@@ -2481,7 +2511,10 @@ Field with name '"b"' is not a member of the map items"#,
let test_value: Value = msg.serialize(&mut ser).unwrap();
assert!(test_value.validate(&schema), "test_value should validate");
// TODO (rikheijdens): I believe this should also resolve?
- //assert!(test_value.resolve(&schema).is_ok(), "test_value should resolve");
+ assert!(
+ test_value.resolve(&schema).is_ok(),
+ "test_value should resolve"
+ );
}
#[test]
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 054cf8075..3bc5c5122 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -488,18 +488,19 @@ fn write_value_ref_resolved(
value: &Value,
buffer: &mut Vec<u8>,
) -> AvroResult<()> {
+ let root_schema = resolved_schema.get_root_schema();
if let Some(err) = value.validate_internal(
- resolved_schema.get_root_schema(),
+ root_schema,
resolved_schema.get_names(),
- &None,
+ &root_schema.namespace(),
) {
return Err(Error::ValidationWithReason(err));
}
encode_internal(
value,
- resolved_schema.get_root_schema(),
+ root_schema,
resolved_schema.get_names(),
- &None,
+ &root_schema.namespace(),
buffer,
)?;
Ok(())
@@ -510,18 +511,19 @@ fn write_value_ref_owned_resolved(
value: &Value,
buffer: &mut Vec<u8>,
) -> AvroResult<()> {
+ let root_schema = resolved_schema.get_root_schema();
if let Some(err) = value.validate_internal(
- resolved_schema.get_root_schema(),
+ root_schema,
resolved_schema.get_names(),
- &None,
+ &root_schema.namespace(),
) {
return Err(Error::ValidationWithReason(err));
}
encode_internal(
value,
- resolved_schema.get_root_schema(),
+ root_schema,
resolved_schema.get_names(),
- &None,
+ &root_schema.namespace(),
buffer,
)?;
Ok(())