You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/02/01 11:51:37 UTC

[avro] 22/30: AVRO-3248: Rust: Support named types in UnionSchema (#1396)

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 1a2d0a36b910fab16cc85399d38dd030dcd74f5c
Author: Litao Lu <lu...@gmail.com>
AuthorDate: Wed Jan 19 19:47:08 2022 +0800

    AVRO-3248: Rust: Support named types in UnionSchema (#1396)
    
    * AVRO-3248: Rust: Support named types in UnionSchema
    
    previously union does not support named types, and we will get error if
    we add 2 records into 1 UnionSchema.
    
    * AVRO-3248: Fix a typo in error message
    
    * AVRO-3248: Give better names for the schemata as string
    
    * AVRO-3248: More better names for variables
    
    * AVRO-3248: Code formatting
    
    * AVRO-3248 Fix formatting & build
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3248: Fix generate_interop_data and formatting
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3248: Fix some regressions after the rebase to master
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3248 Fix the position in the Union for the Double value
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    Co-authored-by: Martin Grigorov <ma...@users.noreply.github.com>
    Co-authored-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit 49c6e1067c937d066503be9c7f4032fb03f67474)
---
 lang/rust/examples/generate_interop_data.rs |   2 +-
 lang/rust/src/de.rs                         | 100 +++++++++++++----------
 lang/rust/src/decode.rs                     |   5 +-
 lang/rust/src/encode.rs                     |  11 ++-
 lang/rust/src/reader.rs                     |   2 +-
 lang/rust/src/schema.rs                     | 121 ++++++++++++++++++++++++++--
 lang/rust/src/ser.rs                        |  30 ++++---
 lang/rust/src/types.rs                      |  84 +++++++++++--------
 lang/rust/src/writer.rs                     |   8 +-
 lang/rust/tests/io.rs                       |   5 +-
 10 files changed, 257 insertions(+), 111 deletions(-)

diff --git a/lang/rust/examples/generate_interop_data.rs b/lang/rust/examples/generate_interop_data.rs
index cb8efda..211c9cb 100644
--- a/lang/rust/examples/generate_interop_data.rs
+++ b/lang/rust/examples/generate_interop_data.rs
@@ -51,7 +51,7 @@ fn create_datum(schema: &Schema) -> Record {
         Value::Record(vec![("label".into(), Value::String("cee".into()))]),
     );
     datum.put("mapField", Value::Map(map));
-    datum.put("unionField", Value::Union(Box::new(Value::Double(12.0))));
+    datum.put("unionField", Value::Union(1, Box::new(Value::Double(12.0))));
     datum.put("enumField", Value::Enum(2, "C".to_owned()));
     datum.put("fixedField", Value::Fixed(16, b"1019181716151413".to_vec()));
     datum.put(
diff --git a/lang/rust/src/de.rs b/lang/rust/src/de.rs
index 6d89686..6324cae 100644
--- a/lang/rust/src/de.rs
+++ b/lang/rust/src/de.rs
@@ -162,7 +162,7 @@ impl<'de> de::EnumAccess<'de> for EnumDeserializer<'de> {
         self.input.first().map_or(
             Err(de::Error::custom("A record must have a least one field")),
             |item| match (item.0.as_ref(), &item.1) {
-                ("type", Value::String(x)) => Ok((
+                ("type", Value::String(x)) | ("type", Value::Enum(_, x)) => Ok((
                     seed.deserialize(StringDeserializer {
                         input: x.to_owned(),
                     })?,
@@ -173,7 +173,7 @@ impl<'de> de::EnumAccess<'de> for EnumDeserializer<'de> {
                     field
                 ))),
                 (_, _) => Err(de::Error::custom(
-                    "Expected first field of type String for the type name".to_string(),
+                    "Expected first field of type String or Enum for the type name".to_string(),
                 )),
             },
         )
@@ -250,7 +250,7 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
             | Value::TimestampMicros(i) => visitor.visit_i64(*i),
             &Value::Float(f) => visitor.visit_f32(f),
             &Value::Double(d) => visitor.visit_f64(d),
-            Value::Union(u) => match **u {
+            Value::Union(_i, u) => match **u {
                 Value::Null => visitor.visit_unit(),
                 Value::Boolean(b) => visitor.visit_bool(b),
                 Value::Int(i) => visitor.visit_i32(i),
@@ -316,7 +316,7 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
                     .map_err(|e| de::Error::custom(e.to_string()))
                     .and_then(|s| visitor.visit_string(s))
             }
-            Value::Union(ref x) => match **x {
+            Value::Union(_i, ref x) => match **x {
                 Value::String(ref s) => visitor.visit_string(s.to_owned()),
                 _ => Err(de::Error::custom("not a string|bytes|fixed")),
             },
@@ -354,8 +354,8 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
         V: Visitor<'de>,
     {
         match *self.input {
-            Value::Union(ref inner) if inner.as_ref() == &Value::Null => visitor.visit_none(),
-            Value::Union(ref inner) => visitor.visit_some(&Deserializer::new(inner)),
+            Value::Union(_i, ref inner) if inner.as_ref() == &Value::Null => visitor.visit_none(),
+            Value::Union(_i, ref inner) => visitor.visit_some(&Deserializer::new(inner)),
             _ => Err(de::Error::custom("not a union")),
         }
     }
@@ -398,7 +398,7 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
     {
         match *self.input {
             Value::Array(ref items) => visitor.visit_seq(SeqDeserializer::new(items)),
-            Value::Union(ref inner) => match **inner {
+            Value::Union(_i, ref inner) => match **inner {
                 Value::Array(ref items) => visitor.visit_seq(SeqDeserializer::new(items)),
                 _ => Err(de::Error::custom("not an array")),
             },
@@ -446,7 +446,7 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
     {
         match *self.input {
             Value::Record(ref fields) => visitor.visit_map(StructDeserializer::new(fields)),
-            Value::Union(ref inner) => match **inner {
+            Value::Union(_i, ref inner) => match **inner {
                 Value::Record(ref fields) => visitor.visit_map(StructDeserializer::new(fields)),
                 _ => Err(de::Error::custom("not a record")),
             },
@@ -781,7 +781,7 @@ mod tests {
                 ("type".to_owned(), Value::String("Double".to_owned())),
                 (
                     "value".to_owned(),
-                    Value::Union(Box::new(Value::Double(64.0))),
+                    Value::Union(1, Box::new(Value::Double(64.0))),
                 ),
             ]),
         )]);
@@ -804,10 +804,13 @@ mod tests {
                 ("type".to_owned(), Value::String("Val1".to_owned())),
                 (
                     "value".to_owned(),
-                    Value::Union(Box::new(Value::Record(vec![
-                        ("x".to_owned(), Value::Float(1.0)),
-                        ("y".to_owned(), Value::Float(2.0)),
-                    ]))),
+                    Value::Union(
+                        0,
+                        Box::new(Value::Record(vec![
+                            ("x".to_owned(), Value::Float(1.0)),
+                            ("y".to_owned(), Value::Float(2.0)),
+                        ])),
+                    ),
                 ),
             ]),
         )]);
@@ -830,10 +833,10 @@ mod tests {
                 ("type".to_owned(), Value::String("Val1".to_owned())),
                 (
                     "value".to_owned(),
-                    Value::Union(Box::new(Value::Array(vec![
-                        Value::Float(1.0),
-                        Value::Float(2.0),
-                    ]))),
+                    Value::Union(
+                        0,
+                        Box::new(Value::Array(vec![Value::Float(1.0), Value::Float(2.0)])),
+                    ),
                 ),
             ]),
         )]);
@@ -965,75 +968,84 @@ mod tests {
             ),
             (
                 "a_union_string".to_string(),
-                Value::Union(Box::new(Value::String("a union string".to_string()))),
+                Value::Union(0, Box::new(Value::String("a union string".to_string()))),
             ),
             (
                 "a_union_long".to_string(),
-                Value::Union(Box::new(Value::Long(412))),
+                Value::Union(0, Box::new(Value::Long(412))),
             ),
             (
                 "a_union_long".to_string(),
-                Value::Union(Box::new(Value::Long(412))),
+                Value::Union(0, Box::new(Value::Long(412))),
             ),
             (
                 "a_time_micros".to_string(),
-                Value::Union(Box::new(Value::TimeMicros(123))),
+                Value::Union(0, Box::new(Value::TimeMicros(123))),
             ),
             (
                 "a_non_existing_time_micros".to_string(),
-                Value::Union(Box::new(Value::TimeMicros(-123))),
+                Value::Union(0, Box::new(Value::TimeMicros(-123))),
             ),
             (
                 "a_timestamp_millis".to_string(),
-                Value::Union(Box::new(Value::TimestampMillis(234))),
+                Value::Union(0, Box::new(Value::TimestampMillis(234))),
             ),
             (
                 "a_non_existing_timestamp_millis".to_string(),
-                Value::Union(Box::new(Value::TimestampMillis(-234))),
+                Value::Union(0, Box::new(Value::TimestampMillis(-234))),
             ),
             (
                 "a_timestamp_micros".to_string(),
-                Value::Union(Box::new(Value::TimestampMicros(345))),
+                Value::Union(0, Box::new(Value::TimestampMicros(345))),
             ),
             (
                 "a_non_existing_timestamp_micros".to_string(),
-                Value::Union(Box::new(Value::TimestampMicros(-345))),
+                Value::Union(0, Box::new(Value::TimestampMicros(-345))),
             ),
             (
                 "a_record".to_string(),
-                Value::Union(Box::new(Value::Record(vec![(
-                    "record_in_union".to_string(),
-                    Value::Int(-2),
-                )]))),
+                Value::Union(
+                    0,
+                    Box::new(Value::Record(vec![(
+                        "record_in_union".to_string(),
+                        Value::Int(-2),
+                    )])),
+                ),
             ),
             (
                 "a_non_existing_record".to_string(),
-                Value::Union(Box::new(Value::Record(vec![(
-                    "blah".to_string(),
-                    Value::Int(-22),
-                )]))),
+                Value::Union(
+                    0,
+                    Box::new(Value::Record(vec![("blah".to_string(), Value::Int(-22))])),
+                ),
             ),
             (
                 "an_array".to_string(),
-                Value::Union(Box::new(Value::Array(vec![
-                    Value::Boolean(true),
-                    Value::Boolean(false),
-                ]))),
+                Value::Union(
+                    0,
+                    Box::new(Value::Array(vec![
+                        Value::Boolean(true),
+                        Value::Boolean(false),
+                    ])),
+                ),
             ),
             (
                 "a_non_existing_array".to_string(),
-                Value::Union(Box::new(Value::Array(vec![
-                    Value::Boolean(false),
-                    Value::Boolean(true),
-                ]))),
+                Value::Union(
+                    0,
+                    Box::new(Value::Array(vec![
+                        Value::Boolean(false),
+                        Value::Boolean(true),
+                    ])),
+                ),
             ),
             (
                 "a_union_map".to_string(),
-                Value::Union(Box::new(Value::Map(value_map))),
+                Value::Union(0, Box::new(Value::Map(value_map))),
             ),
             (
                 "a_non_existing_union_map".to_string(),
-                Value::Union(Box::new(Value::Map(HashMap::new()))),
+                Value::Union(0, Box::new(Value::Map(HashMap::new()))),
             ),
         ]);
 
diff --git a/lang/rust/src/decode.rs b/lang/rust/src/decode.rs
index 26678bf..eb9c018 100644
--- a/lang/rust/src/decode.rs
+++ b/lang/rust/src/decode.rs
@@ -215,18 +215,17 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
                             num_variants: variants.len(),
                         })?;
                     let value = decode0(variant, reader, schemas_by_name)?;
-                    Ok(Value::Union(Box::new(value)))
+                    Ok(Value::Union(index as i32, Box::new(value)))
                 }
                 Err(Error::ReadVariableIntegerBytes(io_err)) => {
                     if let ErrorKind::UnexpectedEof = io_err.kind() {
-                        Ok(Value::Union(Box::new(Value::Null)))
+                        Ok(Value::Union(0, Box::new(Value::Null)))
                     } else {
                         Err(Error::ReadVariableIntegerBytes(io_err))
                     }
                 }
                 Err(io_err) => Err(io_err),
             },
-
             Schema::Record {
                 ref name,
                 ref fields,
diff --git a/lang/rust/src/encode.rs b/lang/rust/src/encode.rs
index 088def9..9a1fbef 100644
--- a/lang/rust/src/encode.rs
+++ b/lang/rust/src/encode.rs
@@ -124,14 +124,13 @@ pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
             },
             Value::Fixed(_, bytes) => buffer.extend(bytes),
             Value::Enum(i, _) => encode_int(*i, buffer),
-            Value::Union(item) => {
+            Value::Union(idx, item) => {
                 if let Schema::Union(ref inner) = *schema {
-                    // Find the schema that is matched here. Due to validation, this should always
-                    // return a value.
-                    let (idx, inner_schema) = inner
-                        .find_schema(item)
+                    let inner_schema = inner
+                        .schemas
+                        .get(*idx as usize)
                         .expect("Invalid Union validation occurred");
-                    encode_long(idx as i64, buffer);
+                    encode_long(*idx as i64, buffer);
                     encode_ref0(&*item, inner_schema, buffer, schemas_by_name);
                 } else {
                     error!("invalid schema type for Union: {:?}", schema);
diff --git a/lang/rust/src/reader.rs b/lang/rust/src/reader.rs
index 4f6d311..9634a27 100644
--- a/lang/rust/src/reader.rs
+++ b/lang/rust/src/reader.rs
@@ -432,7 +432,7 @@ mod tests {
 
         assert_eq!(
             from_avro_datum(&schema, &mut encoded, None).unwrap(),
-            Value::Union(Box::new(Value::Long(0)))
+            Value::Union(1, Box::new(Value::Long(0)))
         );
     }
 
diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index 2697328..4f85ee9 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -173,6 +173,13 @@ impl SchemaKind {
                 | SchemaKind::String,
         )
     }
+
+    pub fn is_named(self) -> bool {
+        matches!(
+            self,
+            SchemaKind::Record | SchemaKind::Enum | SchemaKind::Fixed
+        )
+    }
 }
 
 impl<'a> From<&'a types::Value> for SchemaKind {
@@ -189,7 +196,7 @@ impl<'a> From<&'a types::Value> for SchemaKind {
             Value::String(_) => Self::String,
             Value::Array(_) => Self::Array,
             Value::Map(_) => Self::Map,
-            Value::Union(_) => Self::Union,
+            Value::Union(_, _) => Self::Union,
             Value::Record(_) => Self::Record,
             Value::Enum(_, _) => Self::Enum,
             Value::Fixed(_, _) => Self::Fixed,
@@ -362,7 +369,7 @@ impl UnionSchema {
                 return Err(Error::GetNestedUnion);
             }
             let kind = SchemaKind::from(schema);
-            if vindex.insert(kind, i).is_some() {
+            if !kind.is_named() && vindex.insert(kind, i).is_some() {
                 return Err(Error::GetUnionDuplicate);
             }
         }
@@ -385,12 +392,12 @@ impl UnionSchema {
     /// Optionally returns a reference to the schema matched by this value, as well as its position
     /// within this union.
     pub fn find_schema(&self, value: &types::Value) -> Option<(usize, &Schema)> {
-        let type_index = &SchemaKind::from(value);
-        if let Some(&i) = self.variant_index.get(type_index) {
+        let schema_kind = SchemaKind::from(value);
+        if let Some(&i) = self.variant_index.get(&schema_kind) {
             // fast path
             Some((i, &self.schemas[i]))
         } else {
-            // slow path (required for matching logical types)
+            // slow path (required for matching logical or named types)
             self.schemas
                 .iter()
                 .enumerate()
@@ -1315,6 +1322,110 @@ mod tests {
         assert_eq!(variants.next(), None);
     }
 
+    // AVRO-3248
+    #[test]
+    fn test_union_of_records() {
+        use std::iter::FromIterator;
+
+        // A and B are the same except the name.
+        let schema_str_a = r#"{
+            "name": "A",
+            "type": "record",
+            "fields": [
+                {"name": "field_one", "type": "float"}
+            ]
+        }"#;
+
+        let schema_str_b = r#"{
+            "name": "B",
+            "type": "record",
+            "fields": [
+                {"name": "field_one", "type": "float"}
+            ]
+        }"#;
+
+        // we get Error::GetNameField if we put ["A", "B"] directly here.
+        let schema_str_c = r#"{
+            "name": "C",
+            "type": "record",
+            "fields": [
+                {"name": "field_one",  "type": ["A", "B"]}
+            ]
+        }"#;
+
+        let schema_a = Schema::parse_str(schema_str_a).unwrap();
+        let schema_b = Schema::parse_str(schema_str_b).unwrap();
+
+        let schema_c = Schema::parse_list(&[schema_str_a, schema_str_b, schema_str_c])
+            .unwrap()
+            .last()
+            .unwrap()
+            .clone();
+
+        let schema_c_expected = Schema::Record {
+            name: Name::new("C"),
+            doc: None,
+            fields: vec![RecordField {
+                name: "field_one".to_string(),
+                doc: None,
+                default: None,
+                schema: Schema::Union(UnionSchema::new(vec![schema_a, schema_b]).unwrap()),
+                order: RecordFieldOrder::Ignore,
+                position: 0,
+            }],
+            lookup: HashMap::from_iter(vec![("field_one".to_string(), 0)]),
+        };
+
+        assert_eq!(schema_c, schema_c_expected);
+    }
+
+    // AVRO-3248
+    #[test]
+    fn test_nullable_record() {
+        use std::iter::FromIterator;
+
+        let schema_str_a = r#"{
+            "name": "A",
+            "type": "record",
+            "fields": [
+                {"name": "field_one", "type": "float"}
+            ]
+        }"#;
+
+        // we get Error::GetNameField if we put ["null", "B"] directly here.
+        let schema_str_option_a = r#"{
+            "name": "OptionA",
+            "type": "record",
+            "fields": [
+                {"name": "field_one",  "type": ["null", "A"], "default": "null"}
+            ]
+        }"#;
+
+        let schema_a = Schema::parse_str(schema_str_a).unwrap();
+
+        let schema_option_a = Schema::parse_list(&[schema_str_a, schema_str_option_a])
+            .unwrap()
+            .last()
+            .unwrap()
+            .clone();
+
+        let schema_option_a_expected = Schema::Record {
+            name: Name::new("OptionA"),
+            doc: None,
+            fields: vec![RecordField {
+                name: "field_one".to_string(),
+                doc: None,
+                default: Some(Value::Null),
+                schema: Schema::Union(UnionSchema::new(vec![Schema::Null, schema_a]).unwrap()),
+                order: RecordFieldOrder::Ignore,
+                position: 0,
+            }],
+            lookup: HashMap::from_iter(vec![("field_one".to_string(), 0)]),
+        };
+
+        assert_eq!(schema_option_a, schema_option_a_expected);
+    }
+
     #[test]
     fn test_record_schema() {
         let parsed = Schema::parse_str(
diff --git a/lang/rust/src/ser.rs b/lang/rust/src/ser.rs
index 480ea0e..444ee20 100644
--- a/lang/rust/src/ser.rs
+++ b/lang/rust/src/ser.rs
@@ -234,7 +234,7 @@ impl<'b> ser::Serializer for &'b mut Serializer {
             ),
             (
                 "value".to_owned(),
-                Value::Union(Box::new(value.serialize(self)?)),
+                Value::Union(index as i32, Box::new(value.serialize(self)?)),
             ),
         ]))
     }
@@ -346,9 +346,10 @@ impl<'a> ser::SerializeSeq for SeqVariantSerializer<'a> {
     where
         T: Serialize,
     {
-        self.items.push(Value::Union(Box::new(
-            value.serialize(&mut Serializer::default())?,
-        )));
+        self.items.push(Value::Union(
+            self.index as i32,
+            Box::new(value.serialize(&mut Serializer::default())?),
+        ));
         Ok(())
     }
 
@@ -469,7 +470,7 @@ impl<'a> ser::SerializeStructVariant for StructVariantSerializer<'a> {
             ),
             (
                 "value".to_owned(),
-                Value::Union(Box::new(Value::Record(self.fields))),
+                Value::Union(self.index as i32, Box::new(Value::Record(self.fields))),
             ),
         ]))
     }
@@ -776,7 +777,7 @@ mod tests {
                 ("type".to_owned(), Value::Enum(0, "Double".to_owned())),
                 (
                     "value".to_owned(),
-                    Value::Union(Box::new(Value::Double(64.0))),
+                    Value::Union(0, Box::new(Value::Double(64.0))),
                 ),
             ]),
         )]);
@@ -836,10 +837,13 @@ mod tests {
                 ("type".to_owned(), Value::Enum(0, "Val1".to_owned())),
                 (
                     "value".to_owned(),
-                    Value::Union(Box::new(Value::Record(vec![
-                        ("x".to_owned(), Value::Float(1.0)),
-                        ("y".to_owned(), Value::Float(2.0)),
-                    ]))),
+                    Value::Union(
+                        0,
+                        Box::new(Value::Record(vec![
+                            ("x".to_owned(), Value::Float(1.0)),
+                            ("y".to_owned(), Value::Float(2.0)),
+                        ])),
+                    ),
                 ),
             ]),
         )]);
@@ -946,9 +950,9 @@ mod tests {
                 (
                     "value".to_owned(),
                     Value::Array(vec![
-                        Value::Union(Box::new(Value::Float(1.0))),
-                        Value::Union(Box::new(Value::Float(2.0))),
-                        Value::Union(Box::new(Value::Float(3.0))),
+                        Value::Union(1, Box::new(Value::Float(1.0))),
+                        Value::Union(1, Box::new(Value::Float(2.0))),
+                        Value::Union(1, Box::new(Value::Float(3.0))),
                     ]),
                 ),
             ]),
diff --git a/lang/rust/src/types.rs b/lang/rust/src/types.rs
index 48a1813..77472b0 100644
--- a/lang/rust/src/types.rs
+++ b/lang/rust/src/types.rs
@@ -66,7 +66,12 @@ pub enum Value {
     /// reading values.
     Enum(i32, String),
     /// An `union` Avro value.
-    Union(Box<Value>),
+    ///
+    /// A Union is represented by the value it holds and its position in the type list
+    /// of its corresponding schema
+    /// This allows schema-less encoding, as well as schema resolution while
+    /// reading values.
+    Union(i32, Box<Value>),
     /// An `array` Avro value.
     Array(Vec<Value>),
     /// A `map` Avro value.
@@ -168,7 +173,11 @@ where
     T: Into<Self>,
 {
     fn from(value: Option<T>) -> Self {
-        Self::Union(Box::new(value.map_or_else(|| Self::Null, Into::into)))
+        // FIXME: this is incorrect in case first type in union is not "none"
+        Self::Union(
+            value.is_some() as i32,
+            Box::new(value.map_or_else(|| Self::Null, Into::into)),
+        )
     }
 }
 
@@ -285,7 +294,7 @@ impl std::convert::TryFrom<Value> for JsonValue {
                 Ok(Self::Array(items.into_iter().map(|v| v.into()).collect()))
             }
             Value::Enum(_i, s) => Ok(Self::String(s)),
-            Value::Union(b) => Self::try_from(*b),
+            Value::Union(_i, b) => Self::try_from(*b),
             Value::Array(items) => items
                 .into_iter()
                 .map(Self::try_from)
@@ -358,9 +367,11 @@ impl Value {
                 .map(|ref symbol| symbol == &s)
                 .unwrap_or(false),
             // (&Value::Union(None), &Schema::Union(_)) => true,
-            (&Value::Union(ref value), &Schema::Union(ref inner)) => {
-                inner.find_schema(value).is_some()
-            }
+            (&Value::Union(i, ref value), &Schema::Union(ref inner)) => inner
+                .variants()
+                .get(i as usize)
+                .map(|schema| value.validate(schema))
+                .unwrap_or(false),
             (&Value::Array(ref items), &Schema::Array(ref inner)) => {
                 items.iter().all(|item| item.validate(inner))
             }
@@ -409,7 +420,7 @@ impl Value {
             {
                 // Pull out the Union, and attempt to resolve against it.
                 let v = match value {
-                    Value::Union(b) => &**b,
+                    Value::Union(_i, b) => &**b,
                     _ => unreachable!(),
                 };
                 *value = v.clone();
@@ -703,13 +714,14 @@ impl Value {
     fn resolve_union(self, schema: &UnionSchema) -> Result<Self, Error> {
         let v = match self {
             // Both are unions case.
-            Value::Union(v) => *v,
+            Value::Union(_i, v) => *v,
             // Reader is a union, but writer is not.
             v => v,
         };
         // Find the first match in the reader schema.
-        let (_, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
-        Ok(Value::Union(Box::new(v.resolve(inner)?)))
+        // FIXME: this might be wrong when the union consists of multiple same records that have different names
+        let (i, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
+        Ok(Value::Union(i as i32, Box::new(v.resolve(inner)?)))
     }
 
     fn resolve_array(self, schema: &Schema) -> Result<Self, Error> {
@@ -770,10 +782,11 @@ impl Value {
                                 // NOTE: this match exists only to optimize null defaults for large
                                 // backward-compatible schemas with many nullable fields
                                 match first {
-                                    Schema::Null => Value::Union(Box::new(Value::Null)),
-                                    _ => Value::Union(Box::new(
-                                        Value::from(value.clone()).resolve(first)?,
-                                    )),
+                                    Schema::Null => Value::Union(0, Box::new(Value::Null)),
+                                    _ => Value::Union(
+                                        0,
+                                        Box::new(Value::from(value.clone()).resolve(first)?),
+                                    ),
                                 }
                             }
                             _ => Value::from(value.clone()),
@@ -821,22 +834,22 @@ mod tests {
             (Value::Int(42), Schema::Int, true),
             (Value::Int(42), Schema::Boolean, false),
             (
-                Value::Union(Box::new(Value::Null)),
+                Value::Union(0, Box::new(Value::Null)),
                 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
                 true,
             ),
             (
-                Value::Union(Box::new(Value::Int(42))),
+                Value::Union(1, Box::new(Value::Int(42))),
                 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
                 true,
             ),
             (
-                Value::Union(Box::new(Value::Null)),
+                Value::Union(0, Box::new(Value::Null)),
                 Schema::Union(UnionSchema::new(vec![Schema::Double, Schema::Int]).unwrap()),
                 false,
             ),
             (
-                Value::Union(Box::new(Value::Int(42))),
+                Value::Union(3, Box::new(Value::Int(42))),
                 Schema::Union(
                     UnionSchema::new(vec![
                         Schema::Null,
@@ -849,7 +862,7 @@ mod tests {
                 true,
             ),
             (
-                Value::Union(Box::new(Value::Long(42i64))),
+                Value::Union(1, Box::new(Value::Long(42i64))),
                 Schema::Union(
                     UnionSchema::new(vec![Schema::Null, Schema::TimestampMillis]).unwrap(),
                 ),
@@ -997,20 +1010,26 @@ mod tests {
 
         let union_schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema]).unwrap());
 
-        assert!(Value::Union(Box::new(Value::Record(vec![
-            ("a".to_string(), Value::Long(42i64)),
-            ("b".to_string(), Value::String("foo".to_string())),
-        ])))
-        .validate(&union_schema));
-
-        assert!(Value::Union(Box::new(Value::Map(
-            vec![
+        assert!(Value::Union(
+            1,
+            Box::new(Value::Record(vec![
                 ("a".to_string(), Value::Long(42i64)),
                 ("b".to_string(), Value::String("foo".to_string())),
-            ]
-            .into_iter()
-            .collect()
-        )))
+            ]))
+        )
+        .validate(&union_schema));
+
+        assert!(Value::Union(
+            1,
+            Box::new(Value::Map(
+                vec![
+                    ("a".to_string(), Value::Long(42i64)),
+                    ("b".to_string(), Value::String("foo".to_string())),
+                ]
+                .into_iter()
+                .collect()
+            ))
+        )
         .validate(&union_schema));
     }
 
@@ -1193,7 +1212,8 @@ mod tests {
             JsonValue::String("test_enum".into())
         );
         assert_eq!(
-            JsonValue::try_from(Value::Union(Box::new(Value::String("test_enum".into())))).unwrap(),
+            JsonValue::try_from(Value::Union(1, Box::new(Value::String("test_enum".into()))))
+                .unwrap(),
             JsonValue::String("test_enum".into())
         );
         assert_eq!(
diff --git a/lang/rust/src/writer.rs b/lang/rust/src/writer.rs
index 41e77c9..a222a0f 100644
--- a/lang/rust/src/writer.rs
+++ b/lang/rust/src/writer.rs
@@ -398,7 +398,7 @@ mod tests {
     #[test]
     fn test_union_not_null() {
         let schema = Schema::parse_str(UNION_SCHEMA).unwrap();
-        let union = Value::Union(Box::new(Value::Long(3)));
+        let union = Value::Union(1, Box::new(Value::Long(3)));
 
         let mut expected = Vec::new();
         zig_i64(1, &mut expected);
@@ -410,7 +410,7 @@ mod tests {
     #[test]
     fn test_union_null() {
         let schema = Schema::parse_str(UNION_SCHEMA).unwrap();
-        let union = Value::Union(Box::new(Value::Null));
+        let union = Value::Union(0, Box::new(Value::Null));
 
         let mut expected = Vec::new();
         zig_i64(0, &mut expected);
@@ -781,11 +781,11 @@ mod tests {
         let mut record1 = Record::new(&schema).unwrap();
         record1.put(
             "a",
-            Value::Union(Box::new(Value::TimestampMicros(1234_i64))),
+            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
         );
 
         let mut record2 = Record::new(&schema).unwrap();
-        record2.put("a", Value::Union(Box::new(Value::Null)));
+        record2.put("a", Value::Union(0, Box::new(Value::Null)));
 
         let n1 = writer.append(record1).unwrap();
         let n2 = writer.append(record2).unwrap();
diff --git a/lang/rust/tests/io.rs b/lang/rust/tests/io.rs
index c1ab1d7..2d4c7ee 100644
--- a/lang/rust/tests/io.rs
+++ b/lang/rust/tests/io.rs
@@ -34,7 +34,7 @@ lazy_static! {
         (r#"{"type": "enum", "name": "Test", "symbols": ["A", "B"]}"#, Value::Enum(1, "B".to_string())),
         (r#"{"type": "array", "items": "long"}"#, Value::Array(vec![Value::Long(1), Value::Long(3), Value::Long(2)])),
         (r#"{"type": "map", "values": "long"}"#, Value::Map([("a".to_string(), Value::Long(1i64)), ("b".to_string(), Value::Long(3i64)), ("c".to_string(), Value::Long(2i64))].iter().cloned().collect())),
-        (r#"["string", "null", "long"]"#, Value::Union(Box::new(Value::Null))),
+        (r#"["string", "null", "long"]"#, Value::Union(1, Box::new(Value::Null))),
         (r#"{"type": "record", "name": "Test", "fields": [{"name": "f", "type": "long"}]}"#, Value::Record(vec![("f".to_string(), Value::Long(1))]))
     ];
 
@@ -65,8 +65,9 @@ lazy_static! {
         (r#"{"type": "enum", "name": "F", "symbols": ["FOO", "BAR"]}"#, r#""FOO""#, Value::Enum(0, "FOO".to_string())),
         (r#"{"type": "array", "items": "int"}"#, "[1, 2, 3]", Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)])),
         (r#"{"type": "map", "values": "int"}"#, r#"{"a": 1, "b": 2}"#, Value::Map([("a".to_string(), Value::Int(1)), ("b".to_string(), Value::Int(2))].iter().cloned().collect())),
-        (r#"["int", "null"]"#, "5", Value::Union(Box::new(Value::Int(5)))),
+        (r#"["int", "null"]"#, "5", Value::Union(0, Box::new(Value::Int(5)))),
         (r#"{"type": "record", "name": "F", "fields": [{"name": "A", "type": "int"}]}"#, r#"{"A": 5}"#,Value::Record(vec![("A".to_string(), Value::Int(5))])),
+        (r#"["null", "int"]"#, "null", Value::Union(0, Box::new(Value::Null))),
     ];
 
     static ref LONG_RECORD_SCHEMA: Schema = Schema::parse_str(r#"