You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/02/01 11:51:37 UTC
[avro] 22/30: AVRO-3248: Rust: Support named types in UnionSchema (#1396)
This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git
commit 1a2d0a36b910fab16cc85399d38dd030dcd74f5c
Author: Litao Lu <lu...@gmail.com>
AuthorDate: Wed Jan 19 19:47:08 2022 +0800
AVRO-3248: Rust: Support named types in UnionSchema (#1396)
* AVRO-3248: Rust: Support named types in UnionSchema
previously union does not support named types, and we will get error if
we add 2 records into 1 UnionSchema.
* AVRO-3248: Fix a typo in error message
* AVRO-3248: Give better names for the schemata as string
* AVRO-3248: More better names for variables
* AVRO-3248: Code formatting
* AVRO-3248 Fix formatting & build
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3248: Fix generate_interop_data and formatting
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3248: Fix some regressions after the rebase to master
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3248 Fix the position in the Union for the Double value
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
Co-authored-by: Martin Grigorov <ma...@users.noreply.github.com>
Co-authored-by: Martin Tzvetanov Grigorov <mg...@apache.org>
(cherry picked from commit 49c6e1067c937d066503be9c7f4032fb03f67474)
---
lang/rust/examples/generate_interop_data.rs | 2 +-
lang/rust/src/de.rs | 100 +++++++++++++----------
lang/rust/src/decode.rs | 5 +-
lang/rust/src/encode.rs | 11 ++-
lang/rust/src/reader.rs | 2 +-
lang/rust/src/schema.rs | 121 ++++++++++++++++++++++++++--
lang/rust/src/ser.rs | 30 ++++---
lang/rust/src/types.rs | 84 +++++++++++--------
lang/rust/src/writer.rs | 8 +-
lang/rust/tests/io.rs | 5 +-
10 files changed, 257 insertions(+), 111 deletions(-)
diff --git a/lang/rust/examples/generate_interop_data.rs b/lang/rust/examples/generate_interop_data.rs
index cb8efda..211c9cb 100644
--- a/lang/rust/examples/generate_interop_data.rs
+++ b/lang/rust/examples/generate_interop_data.rs
@@ -51,7 +51,7 @@ fn create_datum(schema: &Schema) -> Record {
Value::Record(vec![("label".into(), Value::String("cee".into()))]),
);
datum.put("mapField", Value::Map(map));
- datum.put("unionField", Value::Union(Box::new(Value::Double(12.0))));
+ datum.put("unionField", Value::Union(1, Box::new(Value::Double(12.0))));
datum.put("enumField", Value::Enum(2, "C".to_owned()));
datum.put("fixedField", Value::Fixed(16, b"1019181716151413".to_vec()));
datum.put(
diff --git a/lang/rust/src/de.rs b/lang/rust/src/de.rs
index 6d89686..6324cae 100644
--- a/lang/rust/src/de.rs
+++ b/lang/rust/src/de.rs
@@ -162,7 +162,7 @@ impl<'de> de::EnumAccess<'de> for EnumDeserializer<'de> {
self.input.first().map_or(
Err(de::Error::custom("A record must have a least one field")),
|item| match (item.0.as_ref(), &item.1) {
- ("type", Value::String(x)) => Ok((
+ ("type", Value::String(x)) | ("type", Value::Enum(_, x)) => Ok((
seed.deserialize(StringDeserializer {
input: x.to_owned(),
})?,
@@ -173,7 +173,7 @@ impl<'de> de::EnumAccess<'de> for EnumDeserializer<'de> {
field
))),
(_, _) => Err(de::Error::custom(
- "Expected first field of type String for the type name".to_string(),
+ "Expected first field of type String or Enum for the type name".to_string(),
)),
},
)
@@ -250,7 +250,7 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
| Value::TimestampMicros(i) => visitor.visit_i64(*i),
&Value::Float(f) => visitor.visit_f32(f),
&Value::Double(d) => visitor.visit_f64(d),
- Value::Union(u) => match **u {
+ Value::Union(_i, u) => match **u {
Value::Null => visitor.visit_unit(),
Value::Boolean(b) => visitor.visit_bool(b),
Value::Int(i) => visitor.visit_i32(i),
@@ -316,7 +316,7 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
.map_err(|e| de::Error::custom(e.to_string()))
.and_then(|s| visitor.visit_string(s))
}
- Value::Union(ref x) => match **x {
+ Value::Union(_i, ref x) => match **x {
Value::String(ref s) => visitor.visit_string(s.to_owned()),
_ => Err(de::Error::custom("not a string|bytes|fixed")),
},
@@ -354,8 +354,8 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
V: Visitor<'de>,
{
match *self.input {
- Value::Union(ref inner) if inner.as_ref() == &Value::Null => visitor.visit_none(),
- Value::Union(ref inner) => visitor.visit_some(&Deserializer::new(inner)),
+ Value::Union(_i, ref inner) if inner.as_ref() == &Value::Null => visitor.visit_none(),
+ Value::Union(_i, ref inner) => visitor.visit_some(&Deserializer::new(inner)),
_ => Err(de::Error::custom("not a union")),
}
}
@@ -398,7 +398,7 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
{
match *self.input {
Value::Array(ref items) => visitor.visit_seq(SeqDeserializer::new(items)),
- Value::Union(ref inner) => match **inner {
+ Value::Union(_i, ref inner) => match **inner {
Value::Array(ref items) => visitor.visit_seq(SeqDeserializer::new(items)),
_ => Err(de::Error::custom("not an array")),
},
@@ -446,7 +446,7 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
{
match *self.input {
Value::Record(ref fields) => visitor.visit_map(StructDeserializer::new(fields)),
- Value::Union(ref inner) => match **inner {
+ Value::Union(_i, ref inner) => match **inner {
Value::Record(ref fields) => visitor.visit_map(StructDeserializer::new(fields)),
_ => Err(de::Error::custom("not a record")),
},
@@ -781,7 +781,7 @@ mod tests {
("type".to_owned(), Value::String("Double".to_owned())),
(
"value".to_owned(),
- Value::Union(Box::new(Value::Double(64.0))),
+ Value::Union(1, Box::new(Value::Double(64.0))),
),
]),
)]);
@@ -804,10 +804,13 @@ mod tests {
("type".to_owned(), Value::String("Val1".to_owned())),
(
"value".to_owned(),
- Value::Union(Box::new(Value::Record(vec![
- ("x".to_owned(), Value::Float(1.0)),
- ("y".to_owned(), Value::Float(2.0)),
- ]))),
+ Value::Union(
+ 0,
+ Box::new(Value::Record(vec![
+ ("x".to_owned(), Value::Float(1.0)),
+ ("y".to_owned(), Value::Float(2.0)),
+ ])),
+ ),
),
]),
)]);
@@ -830,10 +833,10 @@ mod tests {
("type".to_owned(), Value::String("Val1".to_owned())),
(
"value".to_owned(),
- Value::Union(Box::new(Value::Array(vec![
- Value::Float(1.0),
- Value::Float(2.0),
- ]))),
+ Value::Union(
+ 0,
+ Box::new(Value::Array(vec![Value::Float(1.0), Value::Float(2.0)])),
+ ),
),
]),
)]);
@@ -965,75 +968,84 @@ mod tests {
),
(
"a_union_string".to_string(),
- Value::Union(Box::new(Value::String("a union string".to_string()))),
+ Value::Union(0, Box::new(Value::String("a union string".to_string()))),
),
(
"a_union_long".to_string(),
- Value::Union(Box::new(Value::Long(412))),
+ Value::Union(0, Box::new(Value::Long(412))),
),
(
"a_union_long".to_string(),
- Value::Union(Box::new(Value::Long(412))),
+ Value::Union(0, Box::new(Value::Long(412))),
),
(
"a_time_micros".to_string(),
- Value::Union(Box::new(Value::TimeMicros(123))),
+ Value::Union(0, Box::new(Value::TimeMicros(123))),
),
(
"a_non_existing_time_micros".to_string(),
- Value::Union(Box::new(Value::TimeMicros(-123))),
+ Value::Union(0, Box::new(Value::TimeMicros(-123))),
),
(
"a_timestamp_millis".to_string(),
- Value::Union(Box::new(Value::TimestampMillis(234))),
+ Value::Union(0, Box::new(Value::TimestampMillis(234))),
),
(
"a_non_existing_timestamp_millis".to_string(),
- Value::Union(Box::new(Value::TimestampMillis(-234))),
+ Value::Union(0, Box::new(Value::TimestampMillis(-234))),
),
(
"a_timestamp_micros".to_string(),
- Value::Union(Box::new(Value::TimestampMicros(345))),
+ Value::Union(0, Box::new(Value::TimestampMicros(345))),
),
(
"a_non_existing_timestamp_micros".to_string(),
- Value::Union(Box::new(Value::TimestampMicros(-345))),
+ Value::Union(0, Box::new(Value::TimestampMicros(-345))),
),
(
"a_record".to_string(),
- Value::Union(Box::new(Value::Record(vec![(
- "record_in_union".to_string(),
- Value::Int(-2),
- )]))),
+ Value::Union(
+ 0,
+ Box::new(Value::Record(vec![(
+ "record_in_union".to_string(),
+ Value::Int(-2),
+ )])),
+ ),
),
(
"a_non_existing_record".to_string(),
- Value::Union(Box::new(Value::Record(vec![(
- "blah".to_string(),
- Value::Int(-22),
- )]))),
+ Value::Union(
+ 0,
+ Box::new(Value::Record(vec![("blah".to_string(), Value::Int(-22))])),
+ ),
),
(
"an_array".to_string(),
- Value::Union(Box::new(Value::Array(vec![
- Value::Boolean(true),
- Value::Boolean(false),
- ]))),
+ Value::Union(
+ 0,
+ Box::new(Value::Array(vec![
+ Value::Boolean(true),
+ Value::Boolean(false),
+ ])),
+ ),
),
(
"a_non_existing_array".to_string(),
- Value::Union(Box::new(Value::Array(vec![
- Value::Boolean(false),
- Value::Boolean(true),
- ]))),
+ Value::Union(
+ 0,
+ Box::new(Value::Array(vec![
+ Value::Boolean(false),
+ Value::Boolean(true),
+ ])),
+ ),
),
(
"a_union_map".to_string(),
- Value::Union(Box::new(Value::Map(value_map))),
+ Value::Union(0, Box::new(Value::Map(value_map))),
),
(
"a_non_existing_union_map".to_string(),
- Value::Union(Box::new(Value::Map(HashMap::new()))),
+ Value::Union(0, Box::new(Value::Map(HashMap::new()))),
),
]);
diff --git a/lang/rust/src/decode.rs b/lang/rust/src/decode.rs
index 26678bf..eb9c018 100644
--- a/lang/rust/src/decode.rs
+++ b/lang/rust/src/decode.rs
@@ -215,18 +215,17 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
num_variants: variants.len(),
})?;
let value = decode0(variant, reader, schemas_by_name)?;
- Ok(Value::Union(Box::new(value)))
+ Ok(Value::Union(index as i32, Box::new(value)))
}
Err(Error::ReadVariableIntegerBytes(io_err)) => {
if let ErrorKind::UnexpectedEof = io_err.kind() {
- Ok(Value::Union(Box::new(Value::Null)))
+ Ok(Value::Union(0, Box::new(Value::Null)))
} else {
Err(Error::ReadVariableIntegerBytes(io_err))
}
}
Err(io_err) => Err(io_err),
},
-
Schema::Record {
ref name,
ref fields,
diff --git a/lang/rust/src/encode.rs b/lang/rust/src/encode.rs
index 088def9..9a1fbef 100644
--- a/lang/rust/src/encode.rs
+++ b/lang/rust/src/encode.rs
@@ -124,14 +124,13 @@ pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
},
Value::Fixed(_, bytes) => buffer.extend(bytes),
Value::Enum(i, _) => encode_int(*i, buffer),
- Value::Union(item) => {
+ Value::Union(idx, item) => {
if let Schema::Union(ref inner) = *schema {
- // Find the schema that is matched here. Due to validation, this should always
- // return a value.
- let (idx, inner_schema) = inner
- .find_schema(item)
+ let inner_schema = inner
+ .schemas
+ .get(*idx as usize)
.expect("Invalid Union validation occurred");
- encode_long(idx as i64, buffer);
+ encode_long(*idx as i64, buffer);
encode_ref0(&*item, inner_schema, buffer, schemas_by_name);
} else {
error!("invalid schema type for Union: {:?}", schema);
diff --git a/lang/rust/src/reader.rs b/lang/rust/src/reader.rs
index 4f6d311..9634a27 100644
--- a/lang/rust/src/reader.rs
+++ b/lang/rust/src/reader.rs
@@ -432,7 +432,7 @@ mod tests {
assert_eq!(
from_avro_datum(&schema, &mut encoded, None).unwrap(),
- Value::Union(Box::new(Value::Long(0)))
+ Value::Union(1, Box::new(Value::Long(0)))
);
}
diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index 2697328..4f85ee9 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -173,6 +173,13 @@ impl SchemaKind {
| SchemaKind::String,
)
}
+
+ pub fn is_named(self) -> bool {
+ matches!(
+ self,
+ SchemaKind::Record | SchemaKind::Enum | SchemaKind::Fixed
+ )
+ }
}
impl<'a> From<&'a types::Value> for SchemaKind {
@@ -189,7 +196,7 @@ impl<'a> From<&'a types::Value> for SchemaKind {
Value::String(_) => Self::String,
Value::Array(_) => Self::Array,
Value::Map(_) => Self::Map,
- Value::Union(_) => Self::Union,
+ Value::Union(_, _) => Self::Union,
Value::Record(_) => Self::Record,
Value::Enum(_, _) => Self::Enum,
Value::Fixed(_, _) => Self::Fixed,
@@ -362,7 +369,7 @@ impl UnionSchema {
return Err(Error::GetNestedUnion);
}
let kind = SchemaKind::from(schema);
- if vindex.insert(kind, i).is_some() {
+ if !kind.is_named() && vindex.insert(kind, i).is_some() {
return Err(Error::GetUnionDuplicate);
}
}
@@ -385,12 +392,12 @@ impl UnionSchema {
/// Optionally returns a reference to the schema matched by this value, as well as its position
/// within this union.
pub fn find_schema(&self, value: &types::Value) -> Option<(usize, &Schema)> {
- let type_index = &SchemaKind::from(value);
- if let Some(&i) = self.variant_index.get(type_index) {
+ let schema_kind = SchemaKind::from(value);
+ if let Some(&i) = self.variant_index.get(&schema_kind) {
// fast path
Some((i, &self.schemas[i]))
} else {
- // slow path (required for matching logical types)
+ // slow path (required for matching logical or named types)
self.schemas
.iter()
.enumerate()
@@ -1315,6 +1322,110 @@ mod tests {
assert_eq!(variants.next(), None);
}
+ // AVRO-3248
+ #[test]
+ fn test_union_of_records() {
+ use std::iter::FromIterator;
+
+ // A and B are the same except the name.
+ let schema_str_a = r#"{
+ "name": "A",
+ "type": "record",
+ "fields": [
+ {"name": "field_one", "type": "float"}
+ ]
+ }"#;
+
+ let schema_str_b = r#"{
+ "name": "B",
+ "type": "record",
+ "fields": [
+ {"name": "field_one", "type": "float"}
+ ]
+ }"#;
+
+ // we get Error::GetNameField if we put ["A", "B"] directly here.
+ let schema_str_c = r#"{
+ "name": "C",
+ "type": "record",
+ "fields": [
+ {"name": "field_one", "type": ["A", "B"]}
+ ]
+ }"#;
+
+ let schema_a = Schema::parse_str(schema_str_a).unwrap();
+ let schema_b = Schema::parse_str(schema_str_b).unwrap();
+
+ let schema_c = Schema::parse_list(&[schema_str_a, schema_str_b, schema_str_c])
+ .unwrap()
+ .last()
+ .unwrap()
+ .clone();
+
+ let schema_c_expected = Schema::Record {
+ name: Name::new("C"),
+ doc: None,
+ fields: vec![RecordField {
+ name: "field_one".to_string(),
+ doc: None,
+ default: None,
+ schema: Schema::Union(UnionSchema::new(vec![schema_a, schema_b]).unwrap()),
+ order: RecordFieldOrder::Ignore,
+ position: 0,
+ }],
+ lookup: HashMap::from_iter(vec![("field_one".to_string(), 0)]),
+ };
+
+ assert_eq!(schema_c, schema_c_expected);
+ }
+
+ // AVRO-3248
+ #[test]
+ fn test_nullable_record() {
+ use std::iter::FromIterator;
+
+ let schema_str_a = r#"{
+ "name": "A",
+ "type": "record",
+ "fields": [
+ {"name": "field_one", "type": "float"}
+ ]
+ }"#;
+
+ // we get Error::GetNameField if we put ["null", "B"] directly here.
+ let schema_str_option_a = r#"{
+ "name": "OptionA",
+ "type": "record",
+ "fields": [
+ {"name": "field_one", "type": ["null", "A"], "default": "null"}
+ ]
+ }"#;
+
+ let schema_a = Schema::parse_str(schema_str_a).unwrap();
+
+ let schema_option_a = Schema::parse_list(&[schema_str_a, schema_str_option_a])
+ .unwrap()
+ .last()
+ .unwrap()
+ .clone();
+
+ let schema_option_a_expected = Schema::Record {
+ name: Name::new("OptionA"),
+ doc: None,
+ fields: vec![RecordField {
+ name: "field_one".to_string(),
+ doc: None,
+ default: Some(Value::Null),
+ schema: Schema::Union(UnionSchema::new(vec![Schema::Null, schema_a]).unwrap()),
+ order: RecordFieldOrder::Ignore,
+ position: 0,
+ }],
+ lookup: HashMap::from_iter(vec![("field_one".to_string(), 0)]),
+ };
+
+ assert_eq!(schema_option_a, schema_option_a_expected);
+ }
+
#[test]
fn test_record_schema() {
let parsed = Schema::parse_str(
diff --git a/lang/rust/src/ser.rs b/lang/rust/src/ser.rs
index 480ea0e..444ee20 100644
--- a/lang/rust/src/ser.rs
+++ b/lang/rust/src/ser.rs
@@ -234,7 +234,7 @@ impl<'b> ser::Serializer for &'b mut Serializer {
),
(
"value".to_owned(),
- Value::Union(Box::new(value.serialize(self)?)),
+ Value::Union(index as i32, Box::new(value.serialize(self)?)),
),
]))
}
@@ -346,9 +346,10 @@ impl<'a> ser::SerializeSeq for SeqVariantSerializer<'a> {
where
T: Serialize,
{
- self.items.push(Value::Union(Box::new(
- value.serialize(&mut Serializer::default())?,
- )));
+ self.items.push(Value::Union(
+ self.index as i32,
+ Box::new(value.serialize(&mut Serializer::default())?),
+ ));
Ok(())
}
@@ -469,7 +470,7 @@ impl<'a> ser::SerializeStructVariant for StructVariantSerializer<'a> {
),
(
"value".to_owned(),
- Value::Union(Box::new(Value::Record(self.fields))),
+ Value::Union(self.index as i32, Box::new(Value::Record(self.fields))),
),
]))
}
@@ -776,7 +777,7 @@ mod tests {
("type".to_owned(), Value::Enum(0, "Double".to_owned())),
(
"value".to_owned(),
- Value::Union(Box::new(Value::Double(64.0))),
+ Value::Union(0, Box::new(Value::Double(64.0))),
),
]),
)]);
@@ -836,10 +837,13 @@ mod tests {
("type".to_owned(), Value::Enum(0, "Val1".to_owned())),
(
"value".to_owned(),
- Value::Union(Box::new(Value::Record(vec![
- ("x".to_owned(), Value::Float(1.0)),
- ("y".to_owned(), Value::Float(2.0)),
- ]))),
+ Value::Union(
+ 0,
+ Box::new(Value::Record(vec![
+ ("x".to_owned(), Value::Float(1.0)),
+ ("y".to_owned(), Value::Float(2.0)),
+ ])),
+ ),
),
]),
)]);
@@ -946,9 +950,9 @@ mod tests {
(
"value".to_owned(),
Value::Array(vec![
- Value::Union(Box::new(Value::Float(1.0))),
- Value::Union(Box::new(Value::Float(2.0))),
- Value::Union(Box::new(Value::Float(3.0))),
+ Value::Union(1, Box::new(Value::Float(1.0))),
+ Value::Union(1, Box::new(Value::Float(2.0))),
+ Value::Union(1, Box::new(Value::Float(3.0))),
]),
),
]),
diff --git a/lang/rust/src/types.rs b/lang/rust/src/types.rs
index 48a1813..77472b0 100644
--- a/lang/rust/src/types.rs
+++ b/lang/rust/src/types.rs
@@ -66,7 +66,12 @@ pub enum Value {
/// reading values.
Enum(i32, String),
/// An `union` Avro value.
- Union(Box<Value>),
+ ///
+ /// A Union is represented by the value it holds and its position in the type list
+ /// of its corresponding schema
+ /// This allows schema-less encoding, as well as schema resolution while
+ /// reading values.
+ Union(i32, Box<Value>),
/// An `array` Avro value.
Array(Vec<Value>),
/// A `map` Avro value.
@@ -168,7 +173,11 @@ where
T: Into<Self>,
{
fn from(value: Option<T>) -> Self {
- Self::Union(Box::new(value.map_or_else(|| Self::Null, Into::into)))
+ // FIXME: this is incorrect in case first type in union is not "none"
+ Self::Union(
+ value.is_some() as i32,
+ Box::new(value.map_or_else(|| Self::Null, Into::into)),
+ )
}
}
@@ -285,7 +294,7 @@ impl std::convert::TryFrom<Value> for JsonValue {
Ok(Self::Array(items.into_iter().map(|v| v.into()).collect()))
}
Value::Enum(_i, s) => Ok(Self::String(s)),
- Value::Union(b) => Self::try_from(*b),
+ Value::Union(_i, b) => Self::try_from(*b),
Value::Array(items) => items
.into_iter()
.map(Self::try_from)
@@ -358,9 +367,11 @@ impl Value {
.map(|ref symbol| symbol == &s)
.unwrap_or(false),
// (&Value::Union(None), &Schema::Union(_)) => true,
- (&Value::Union(ref value), &Schema::Union(ref inner)) => {
- inner.find_schema(value).is_some()
- }
+ (&Value::Union(i, ref value), &Schema::Union(ref inner)) => inner
+ .variants()
+ .get(i as usize)
+ .map(|schema| value.validate(schema))
+ .unwrap_or(false),
(&Value::Array(ref items), &Schema::Array(ref inner)) => {
items.iter().all(|item| item.validate(inner))
}
@@ -409,7 +420,7 @@ impl Value {
{
// Pull out the Union, and attempt to resolve against it.
let v = match value {
- Value::Union(b) => &**b,
+ Value::Union(_i, b) => &**b,
_ => unreachable!(),
};
*value = v.clone();
@@ -703,13 +714,14 @@ impl Value {
fn resolve_union(self, schema: &UnionSchema) -> Result<Self, Error> {
let v = match self {
// Both are unions case.
- Value::Union(v) => *v,
+ Value::Union(_i, v) => *v,
// Reader is a union, but writer is not.
v => v,
};
// Find the first match in the reader schema.
- let (_, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
- Ok(Value::Union(Box::new(v.resolve(inner)?)))
+ // FIXME: this might be wrong when the union consists of multiple same records that have different names
+ let (i, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
+ Ok(Value::Union(i as i32, Box::new(v.resolve(inner)?)))
}
fn resolve_array(self, schema: &Schema) -> Result<Self, Error> {
@@ -770,10 +782,11 @@ impl Value {
// NOTE: this match exists only to optimize null defaults for large
// backward-compatible schemas with many nullable fields
match first {
- Schema::Null => Value::Union(Box::new(Value::Null)),
- _ => Value::Union(Box::new(
- Value::from(value.clone()).resolve(first)?,
- )),
+ Schema::Null => Value::Union(0, Box::new(Value::Null)),
+ _ => Value::Union(
+ 0,
+ Box::new(Value::from(value.clone()).resolve(first)?),
+ ),
}
}
_ => Value::from(value.clone()),
@@ -821,22 +834,22 @@ mod tests {
(Value::Int(42), Schema::Int, true),
(Value::Int(42), Schema::Boolean, false),
(
- Value::Union(Box::new(Value::Null)),
+ Value::Union(0, Box::new(Value::Null)),
Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
true,
),
(
- Value::Union(Box::new(Value::Int(42))),
+ Value::Union(1, Box::new(Value::Int(42))),
Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
true,
),
(
- Value::Union(Box::new(Value::Null)),
+ Value::Union(0, Box::new(Value::Null)),
Schema::Union(UnionSchema::new(vec![Schema::Double, Schema::Int]).unwrap()),
false,
),
(
- Value::Union(Box::new(Value::Int(42))),
+ Value::Union(3, Box::new(Value::Int(42))),
Schema::Union(
UnionSchema::new(vec![
Schema::Null,
@@ -849,7 +862,7 @@ mod tests {
true,
),
(
- Value::Union(Box::new(Value::Long(42i64))),
+ Value::Union(1, Box::new(Value::Long(42i64))),
Schema::Union(
UnionSchema::new(vec![Schema::Null, Schema::TimestampMillis]).unwrap(),
),
@@ -997,20 +1010,26 @@ mod tests {
let union_schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema]).unwrap());
- assert!(Value::Union(Box::new(Value::Record(vec![
- ("a".to_string(), Value::Long(42i64)),
- ("b".to_string(), Value::String("foo".to_string())),
- ])))
- .validate(&union_schema));
-
- assert!(Value::Union(Box::new(Value::Map(
- vec![
+ assert!(Value::Union(
+ 1,
+ Box::new(Value::Record(vec![
("a".to_string(), Value::Long(42i64)),
("b".to_string(), Value::String("foo".to_string())),
- ]
- .into_iter()
- .collect()
- )))
+ ]))
+ )
+ .validate(&union_schema));
+
+ assert!(Value::Union(
+ 1,
+ Box::new(Value::Map(
+ vec![
+ ("a".to_string(), Value::Long(42i64)),
+ ("b".to_string(), Value::String("foo".to_string())),
+ ]
+ .into_iter()
+ .collect()
+ ))
+ )
.validate(&union_schema));
}
@@ -1193,7 +1212,8 @@ mod tests {
JsonValue::String("test_enum".into())
);
assert_eq!(
- JsonValue::try_from(Value::Union(Box::new(Value::String("test_enum".into())))).unwrap(),
+ JsonValue::try_from(Value::Union(1, Box::new(Value::String("test_enum".into()))))
+ .unwrap(),
JsonValue::String("test_enum".into())
);
assert_eq!(
diff --git a/lang/rust/src/writer.rs b/lang/rust/src/writer.rs
index 41e77c9..a222a0f 100644
--- a/lang/rust/src/writer.rs
+++ b/lang/rust/src/writer.rs
@@ -398,7 +398,7 @@ mod tests {
#[test]
fn test_union_not_null() {
let schema = Schema::parse_str(UNION_SCHEMA).unwrap();
- let union = Value::Union(Box::new(Value::Long(3)));
+ let union = Value::Union(1, Box::new(Value::Long(3)));
let mut expected = Vec::new();
zig_i64(1, &mut expected);
@@ -410,7 +410,7 @@ mod tests {
#[test]
fn test_union_null() {
let schema = Schema::parse_str(UNION_SCHEMA).unwrap();
- let union = Value::Union(Box::new(Value::Null));
+ let union = Value::Union(0, Box::new(Value::Null));
let mut expected = Vec::new();
zig_i64(0, &mut expected);
@@ -781,11 +781,11 @@ mod tests {
let mut record1 = Record::new(&schema).unwrap();
record1.put(
"a",
- Value::Union(Box::new(Value::TimestampMicros(1234_i64))),
+ Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
);
let mut record2 = Record::new(&schema).unwrap();
- record2.put("a", Value::Union(Box::new(Value::Null)));
+ record2.put("a", Value::Union(0, Box::new(Value::Null)));
let n1 = writer.append(record1).unwrap();
let n2 = writer.append(record2).unwrap();
diff --git a/lang/rust/tests/io.rs b/lang/rust/tests/io.rs
index c1ab1d7..2d4c7ee 100644
--- a/lang/rust/tests/io.rs
+++ b/lang/rust/tests/io.rs
@@ -34,7 +34,7 @@ lazy_static! {
(r#"{"type": "enum", "name": "Test", "symbols": ["A", "B"]}"#, Value::Enum(1, "B".to_string())),
(r#"{"type": "array", "items": "long"}"#, Value::Array(vec![Value::Long(1), Value::Long(3), Value::Long(2)])),
(r#"{"type": "map", "values": "long"}"#, Value::Map([("a".to_string(), Value::Long(1i64)), ("b".to_string(), Value::Long(3i64)), ("c".to_string(), Value::Long(2i64))].iter().cloned().collect())),
- (r#"["string", "null", "long"]"#, Value::Union(Box::new(Value::Null))),
+ (r#"["string", "null", "long"]"#, Value::Union(1, Box::new(Value::Null))),
(r#"{"type": "record", "name": "Test", "fields": [{"name": "f", "type": "long"}]}"#, Value::Record(vec![("f".to_string(), Value::Long(1))]))
];
@@ -65,8 +65,9 @@ lazy_static! {
(r#"{"type": "enum", "name": "F", "symbols": ["FOO", "BAR"]}"#, r#""FOO""#, Value::Enum(0, "FOO".to_string())),
(r#"{"type": "array", "items": "int"}"#, "[1, 2, 3]", Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)])),
(r#"{"type": "map", "values": "int"}"#, r#"{"a": 1, "b": 2}"#, Value::Map([("a".to_string(), Value::Int(1)), ("b".to_string(), Value::Int(2))].iter().cloned().collect())),
- (r#"["int", "null"]"#, "5", Value::Union(Box::new(Value::Int(5)))),
+ (r#"["int", "null"]"#, "5", Value::Union(0, Box::new(Value::Int(5)))),
(r#"{"type": "record", "name": "F", "fields": [{"name": "A", "type": "int"}]}"#, r#"{"A": 5}"#,Value::Record(vec![("A".to_string(), Value::Int(5))])),
+ (r#"["null", "int"]"#, "null", Value::Union(0, Box::new(Value::Null))),
];
static ref LONG_RECORD_SCHEMA: Schema = Schema::parse_str(r#"