You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/08/26 08:48:17 UTC
[avro] branch branch-1.11 updated: AVRO: [Rust] Improved resolution of nullable record fields. (#1837)
This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/branch-1.11 by this push:
new a3d120f37 AVRO: [Rust] Improved resolution of nullable record fields. (#1837)
a3d120f37 is described below
commit a3d120f3750677d1e6ab1b29384de5e8eaf38ac5
Author: Shaeq Ahmed <sh...@gmail.com>
AuthorDate: Fri Aug 26 12:47:43 2022 +0400
AVRO: [Rust] Improved resolution of nullable record fields. (#1837)
* AVRO: [Rust] Improved resolution of nullable record fields.
* AVRO: [Rust] Update to improvements for nullable record fields PR.
* AVRO-3621: Improve test method name
* AVRO-3621: Revert non-related change
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
Co-authored-by: Martin Tzvetanov Grigorov <mg...@apache.org>
(cherry picked from commit ddf10bd17f591638f12128c4fff2b4681b245ac0)
---
lang/rust/avro/src/schema.rs | 47 ++++++++++++++++
lang/rust/avro/src/types.rs | 129 +++++++++++++++++++++++++++++++++++++------
2 files changed, 160 insertions(+), 16 deletions(-)
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 3320e9e59..ec26a8bf9 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -634,6 +634,14 @@ impl RecordField {
}
custom_attributes
}
+
+ /// Returns true if this `RecordField` is nullable, meaning the schema is a `UnionSchema` where the first variant is `Null`.
+ pub fn is_nullable(&self) -> bool {
+ match self.schema {
+ Schema::Union(ref inner) => inner.is_nullable(),
+ _ => false,
+ }
+ }
}
#[derive(Debug, Clone)]
@@ -2016,6 +2024,45 @@ mod tests {
assert_eq!(variants.next(), None);
}
+ // AVRO-3621
+ #[test]
+ fn test_avro_3621_nullable_record_field() {
+ let nullable_record_field = RecordField {
+ name: "next".to_string(),
+ doc: None,
+ default: None,
+ schema: Schema::Union(
+ UnionSchema::new(vec![
+ Schema::Null,
+ Schema::Ref {
+ name: Name {
+ name: "LongList".to_owned(),
+ namespace: None,
+ },
+ },
+ ])
+ .unwrap(),
+ ),
+ order: RecordFieldOrder::Ascending,
+ position: 1,
+ custom_attributes: Default::default(),
+ };
+
+ assert!(nullable_record_field.is_nullable());
+
+ let non_nullable_record_field = RecordField {
+ name: "next".to_string(),
+ doc: None,
+ default: Some(json!(2)),
+ schema: Schema::Long,
+ order: RecordFieldOrder::Ascending,
+ position: 1,
+ custom_attributes: Default::default(),
+ };
+
+ assert!(!non_nullable_record_field.is_nullable());
+ }
+
// AVRO-3248
#[test]
fn test_union_of_records() {
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index e833a26b7..7e700b4ea 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -457,6 +457,10 @@ impl Value {
.get(i as usize)
.map(|schema| value.validate_internal(schema, names))
.unwrap_or_else(|| Some(format!("No schema in the union at position '{}'", i))),
+ (v, &Schema::Union(ref inner)) => match inner.find_schema(v) {
+ Some(_) => None,
+ None => Some("Could not find matching type in union".to_string()),
+ },
(&Value::Array(ref items), &Schema::Array(ref inner)) => {
items.iter().fold(None, |acc, item| {
Value::accumulate(acc, item.validate_internal(inner, names))
@@ -475,11 +479,20 @@ impl Value {
..
},
) => {
- if fields.len() != record_fields.len() {
+ let non_nullable_fields_count =
+ fields.iter().filter(|&rf| !rf.is_nullable()).count();
+
+ if record_fields.len() < non_nullable_fields_count {
+ return Some(format!(
+ "The value's records length ({}) doesn't match the schema ({} non-nullable fields)",
+ record_fields.len(),
+ non_nullable_fields_count
+ ));
+ } else if record_fields.len() > fields.len() {
return Some(format!(
- "The value's records length ({}) is different than the schema's ({})",
+ "The value's records length ({}) is greater than the schema's ({} fields)",
record_fields.len(),
- fields.len()
+ fields.len(),
));
}
@@ -509,7 +522,7 @@ impl Value {
if let Some(item) = items.get(&field.name) {
let res = item.validate_internal(&field.schema, names);
Value::accumulate(acc, res)
- } else {
+ } else if !field.is_nullable() {
Value::accumulate(
acc,
Some(format!(
@@ -517,6 +530,8 @@ impl Value {
field.name
)),
)
+ } else {
+ acc
}
})
}
@@ -1043,7 +1058,7 @@ mod tests {
attributes: Default::default(),
},
false,
- "Invalid value: Record([(\"unknown_field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Int, order: Ignore, position: 0, custom_attributes: {} }], lookup: {}, attributes: {} }. Reason: There is no schema field for field 'unknown_field_name'",
+ r#"Invalid value: Record([("unknown_field_name", Null)]) for schema: Record { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, default: None, schema: Int, order: Ignore, position: 0, custom_attributes: {} }], lookup: {}, attributes: {} }. Reason: There is no schema field for field 'unknown_field_name'"#,
),
(
Value::Record(vec![("field_name".to_string(), Value::Null)]),
@@ -1066,7 +1081,7 @@ mod tests {
attributes: Default::default(),
},
false,
- "Invalid value: Record([(\"field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Ref { name: Name { name: \"missing\", namespace: None } }, order: Ignore, position: 0, custom_attributes: {} }], lookup: {\"field_name\": 0}, attributes: {} }. Reason: Unresolved schema reference: 'missing'. Parsed names: []",
+ r#"Invalid value: Record([("field_name", Null)]) for schema: Record { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, default: None, schema: Ref { name: Name { name: "missing", namespace: None } }, order: Ignore, position: 0, custom_attributes: {} }], lookup: {"field_name": 0}, attributes: {} }. Reason: Unresolved schema reference: 'missing'. Parsed names: []"#,
),
];
@@ -1196,7 +1211,12 @@ mod tests {
// "type": "record",
// "fields": [
// {"type": "long", "name": "a"},
- // {"type": "string", "name": "b"}
+ // {"type": "string", "name": "b"},
+ // {
+ // "type": ["null", "int"]
+ // "name": "c",
+ // "default": null
+ // }
// ]
// }
let schema = Schema::Record {
@@ -1222,11 +1242,26 @@ mod tests {
position: 1,
custom_attributes: Default::default(),
},
+ RecordField {
+ name: "c".to_string(),
+ doc: None,
+ default: Some(JsonValue::Null),
+ schema: Schema::Union(
+ UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap(),
+ ),
+ order: RecordFieldOrder::Ascending,
+ position: 2,
+ custom_attributes: Default::default(),
+ },
],
- lookup: [("a".to_string(), 0), ("b".to_string(), 1)]
- .iter()
- .cloned()
- .collect(),
+ lookup: [
+ ("a".to_string(), 0),
+ ("b".to_string(), 1),
+ ("c".to_string(), 2),
+ ]
+ .iter()
+ .cloned()
+ .collect(),
attributes: Default::default(),
};
@@ -1247,7 +1282,9 @@ mod tests {
("b".to_string(), Value::String("foo".to_string())),
]);
assert!(!value.validate(&schema));
- assert_logged("Invalid value: Record([(\"a\", Boolean(false)), (\"b\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }], lookup: {\"a\": 0, \"b\": 1}, attr [...]
+ assert_logged(
+ r#"Invalid value: Record([("a", Boolean(false)), ("b", String("foo"))]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, default: Some(Null) [...]
+ );
let value = Value::Record(vec![
("a".to_string(), Value::Long(42i64)),
@@ -1255,17 +1292,27 @@ mod tests {
]);
assert!(!value.validate(&schema));
assert_logged(
- "Invalid value: Record([(\"a\", Long(42)), (\"c\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }], lookup: {\"a\": 0, \"b\": 1}, attributes: {} }. Re [...]
+ r#"Invalid value: Record([("a", Long(42)), ("c", String("foo"))]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, default: Some(Null), sche [...]
+ );
+
+ let value = Value::Record(vec![
+ ("a".to_string(), Value::Long(42i64)),
+ ("d".to_string(), Value::String("foo".to_string())),
+ ]);
+ assert!(!value.validate(&schema));
+ assert_logged(
+ r#"Invalid value: Record([("a", Long(42)), ("d", String("foo"))]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, default: Some(Null), sche [...]
);
let value = Value::Record(vec![
("a".to_string(), Value::Long(42i64)),
("b".to_string(), Value::String("foo".to_string())),
("c".to_string(), Value::Null),
+ ("d".to_string(), Value::Null),
]);
assert!(!value.validate(&schema));
assert_logged(
- r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null)]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }], lookup: {"a": 0, "b": 1}, attributes: {} }. Rea [...]
+ r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null), ("d", Null)]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, [...]
);
assert!(Value::Map(
@@ -1279,13 +1326,13 @@ mod tests {
.validate(&schema));
assert!(!Value::Map(
- vec![("c".to_string(), Value::Long(123_i64)),]
+ vec![("d".to_string(), Value::Long(123_i64)),]
.into_iter()
.collect()
)
.validate(&schema));
assert_logged(
- r#"Invalid value: Map({"c": Long(123)}) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }], lookup: {"a": 0, "b": 1}, attributes: {} }. Reason: Field with name '"a"' is not a mem [...]
+ r#"Invalid value: Map({"d": Long(123)}) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, default: Some(Null), schema: Union(UnionSchema { sc [...]
Field with name '"b"' is not a member of the map items"#,
);
@@ -1465,6 +1512,56 @@ Field with name '"b"' is not a member of the map items"#,
assert!(value.resolve(&Schema::TimestampMicros).is_err());
}
+ #[test]
+ fn test_avro_3621_resolve_to_nullable_union() {
+ let schema = Schema::parse_str(
+ r#"{
+ "type": "record",
+ "name": "root",
+ "fields": [
+ {
+ "name": "event",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "event",
+ "fields": [
+ {
+ "name": "amount",
+ "type": "int"
+ },
+ {
+ "name": "size",
+ "type": [
+ "null",
+ "int"
+ ],
+ "default": null
+ }
+ ]
+ }
+ ],
+ "default": null
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let value = Value::Record(vec![(
+ "event".to_string(),
+ Value::Record(vec![("amount".to_string(), Value::Int(200))]),
+ )]);
+ assert!(value.resolve(&schema).is_ok());
+
+ let value = Value::Record(vec![(
+ "event".to_string(),
+ Value::Record(vec![("size".to_string(), Value::Int(1))]),
+ )]);
+ assert!(value.resolve(&schema).is_err());
+ }
+
#[test]
fn json_from_avro() {
assert_eq!(JsonValue::try_from(Value::Null).unwrap(), JsonValue::Null);