You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/04/13 09:18:01 UTC

[avro] branch branch-1.11 updated: AVRO-3483: [Rust] Log error messages with a reason when the validation fails (#1636)

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/branch-1.11 by this push:
     new b67c34c13 AVRO-3483: [Rust] Log error messages with a reason when the validation fails (#1636)
b67c34c13 is described below

commit b67c34c13007f5248bdb041ca2ae7228d57f773e
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Wed Apr 13 12:17:25 2022 +0300

    AVRO-3483: [Rust] Log error messages with a reason when the validation fails (#1636)
    
    * AVRO-3483: [Rust] Log error messages with a reason when the validation fails
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3483: Add a failure reason for missing schema in a Union by index
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3483: Assert the error log messages after failed validation
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3483: Value::validate_internal() now returns Option<String>
    
    Some brings the accumulated error message(s).
    None means that the validation passed!
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3483: Minor cleanup
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3483: Add a test case for Value::Enum with index out of bounds
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3483: Add a test case for the validation failure when Value::Map does not have items for the schema fields
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3483: TestLogger is enabled only for error logs
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3483: Do not allocate new Some's in accumulate function
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit c9a346e8eb6012fdc8594877f02ea21be98c8b26)
---
 lang/rust/avro/Cargo.toml    |   3 +-
 lang/rust/avro/src/error.rs  |   4 +
 lang/rust/avro/src/schema.rs |   8 +-
 lang/rust/avro/src/types.rs  | 452 +++++++++++++++++++++++++++++++++++--------
 lang/rust/avro/src/writer.rs |   4 +-
 5 files changed, 381 insertions(+), 90 deletions(-)

diff --git a/lang/rust/avro/Cargo.toml b/lang/rust/avro/Cargo.toml
index c87f76276..ffb3d69b8 100644
--- a/lang/rust/avro/Cargo.toml
+++ b/lang/rust/avro/Cargo.toml
@@ -71,7 +71,7 @@ typed-builder = "0.10.0"
 uuid = { version = "0.8.2", features = ["serde", "v4"] }
 zerocopy = "0.6.1"
 lazy_static = "1.4.0"
-log = "0.4.14"
+log = "0.4.16"
 zstd = { version = "0.11.0+zstd.1.5.2", optional = true }
 
 [dev-dependencies]
@@ -81,3 +81,4 @@ criterion = "0.3.5"
 anyhow = "1.0.56"
 hex-literal = "0.3.4"
 env_logger = "0.9.0"
+ref_thread_local = "0.1.1"
diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs
index 43aef5f16..908e040e9 100644
--- a/lang/rust/avro/src/error.rs
+++ b/lang/rust/avro/src/error.rs
@@ -51,6 +51,10 @@ pub enum Error {
     #[error("Value does not match schema")]
     Validation,
 
+    /// Describes errors happened while validating Avro data.
+    #[error("Value does not match schema: Reason: {0}")]
+    ValidationWithReason(String),
+
     #[error("Unable to allocate {desired} bytes (maximum allowed: {maximum})")]
     MemoryAllocation { desired: usize, maximum: usize },
 
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 4404745a6..a4ccb09a4 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -27,7 +27,7 @@ use serde::{
 use serde_json::{Map, Value};
 use std::{
     borrow::Cow,
-    collections::{HashMap, HashSet},
+    collections::{BTreeMap, HashMap, HashSet},
     convert::{TryFrom, TryInto},
     fmt,
     hash::Hash,
@@ -68,7 +68,7 @@ impl fmt::Display for SchemaFingerprint {
 /// More information about Avro schemas can be found in the
 /// [Avro Specification](https://avro.apache.org/docs/current/spec.html#schemas)
 #[derive(Clone, Debug, EnumDiscriminants)]
-#[strum_discriminants(name(SchemaKind), derive(Hash))]
+#[strum_discriminants(name(SchemaKind), derive(Hash, Ord, PartialOrd))]
 pub enum Schema {
     /// A `null` Avro schema.
     Null,
@@ -503,12 +503,12 @@ pub struct UnionSchema {
     // schema index given a value.
     // **NOTE** that this approach does not work for named types, and will have to be modified
     // to support that. A simple solution is to also keep a mapping of the names used.
-    variant_index: HashMap<SchemaKind, usize>,
+    variant_index: BTreeMap<SchemaKind, usize>,
 }
 
 impl UnionSchema {
     pub(crate) fn new(schemas: Vec<Schema>) -> AvroResult<Self> {
-        let mut vindex = HashMap::new();
+        let mut vindex = BTreeMap::new();
         for (i, schema) in schemas.iter().enumerate() {
             if let Schema::Union(_) = schema {
                 return Err(Error::GetNestedUnion);
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index a88bdaeba..d77ded887 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -334,80 +334,168 @@ impl Value {
     /// for the full set of rules of schema validation.
     pub fn validate(&self, schema: &Schema) -> bool {
         let rs = ResolvedSchema::try_from(schema).expect("Schema didn't successfully parse");
-        self.validate_internal(schema, rs.get_names())
+        match self.validate_internal(schema, rs.get_names()) {
+            Some(error_msg) => {
+                error!(
+                    "Invalid value: {:?} for schema: {:?}. Reason: {}",
+                    self, schema, error_msg
+                );
+                false
+            }
+            None => true,
+        }
+    }
+
+    fn accumulate(accumulator: Option<String>, other: Option<String>) -> Option<String> {
+        match (accumulator, other) {
+            (None, None) => None,
+            (None, s @ Some(_)) => s,
+            (s @ Some(_), None) => s,
+            (Some(reason1), Some(reason2)) => Some(format!("{}\n{}", reason1, reason2)),
+        }
     }
 
-    pub(crate) fn validate_internal(&self, schema: &Schema, names: &NamesRef) -> bool {
+    pub(crate) fn validate_internal(&self, schema: &Schema, names: &NamesRef) -> Option<String> {
         match (self, schema) {
-            (_, &Schema::Ref { ref name }) => names
-                .get(name)
-                .map_or(false, |s| self.validate_internal(s, names)),
-            (&Value::Null, &Schema::Null) => true,
-            (&Value::Boolean(_), &Schema::Boolean) => true,
-            (&Value::Int(_), &Schema::Int) => true,
-            (&Value::Int(_), &Schema::Date) => true,
-            (&Value::Int(_), &Schema::TimeMillis) => true,
-            (&Value::Long(_), &Schema::Long) => true,
-            (&Value::Long(_), &Schema::TimeMicros) => true,
-            (&Value::Long(_), &Schema::TimestampMillis) => true,
-            (&Value::Long(_), &Schema::TimestampMicros) => true,
-            (&Value::TimestampMicros(_), &Schema::TimestampMicros) => true,
-            (&Value::TimestampMillis(_), &Schema::TimestampMillis) => true,
-            (&Value::TimeMicros(_), &Schema::TimeMicros) => true,
-            (&Value::TimeMillis(_), &Schema::TimeMillis) => true,
-            (&Value::Date(_), &Schema::Date) => true,
-            (&Value::Decimal(_), &Schema::Decimal { .. }) => true,
-            (&Value::Duration(_), &Schema::Duration) => true,
-            (&Value::Uuid(_), &Schema::Uuid) => true,
-            (&Value::Float(_), &Schema::Float) => true,
-            (&Value::Double(_), &Schema::Double) => true,
-            (&Value::Bytes(_), &Schema::Bytes) => true,
-            (&Value::Bytes(_), &Schema::Decimal { .. }) => true,
-            (&Value::String(_), &Schema::String) => true,
-            (&Value::String(_), &Schema::Uuid) => true,
-            (&Value::Fixed(n, _), &Schema::Fixed { size, .. }) => n == size,
-            (&Value::Bytes(ref b), &Schema::Fixed { size, .. }) => b.len() == size,
-            (&Value::Fixed(n, _), &Schema::Duration) => n == 12,
+            (_, &Schema::Ref { ref name }) => names.get(name).map_or_else(
+                || {
+                    return Some(format!(
+                        "Unresolved schema reference: '{}'. Parsed names: {:?}",
+                        name,
+                        names.keys()
+                    ));
+                },
+                |s| self.validate_internal(s, names),
+            ),
+            (&Value::Null, &Schema::Null) => None,
+            (&Value::Boolean(_), &Schema::Boolean) => None,
+            (&Value::Int(_), &Schema::Int) => None,
+            (&Value::Int(_), &Schema::Date) => None,
+            (&Value::Int(_), &Schema::TimeMillis) => None,
+            (&Value::Long(_), &Schema::Long) => None,
+            (&Value::Long(_), &Schema::TimeMicros) => None,
+            (&Value::Long(_), &Schema::TimestampMillis) => None,
+            (&Value::Long(_), &Schema::TimestampMicros) => None,
+            (&Value::TimestampMicros(_), &Schema::TimestampMicros) => None,
+            (&Value::TimestampMillis(_), &Schema::TimestampMillis) => None,
+            (&Value::TimeMicros(_), &Schema::TimeMicros) => None,
+            (&Value::TimeMillis(_), &Schema::TimeMillis) => None,
+            (&Value::Date(_), &Schema::Date) => None,
+            (&Value::Decimal(_), &Schema::Decimal { .. }) => None,
+            (&Value::Duration(_), &Schema::Duration) => None,
+            (&Value::Uuid(_), &Schema::Uuid) => None,
+            (&Value::Float(_), &Schema::Float) => None,
+            (&Value::Double(_), &Schema::Double) => None,
+            (&Value::Bytes(_), &Schema::Bytes) => None,
+            (&Value::Bytes(_), &Schema::Decimal { .. }) => None,
+            (&Value::String(_), &Schema::String) => None,
+            (&Value::String(_), &Schema::Uuid) => None,
+            (&Value::Fixed(n, _), &Schema::Fixed { size, .. }) => {
+                if n != size {
+                    Some(format!(
+                        "The value's size ({}) is different than the schema's size ({})",
+                        n, size
+                    ))
+                } else {
+                    None
+                }
+            }
+            (&Value::Bytes(ref b), &Schema::Fixed { size, .. }) => {
+                if b.len() != size {
+                    Some(format!(
+                        "The bytes' length ({}) is different than the schema's size ({})",
+                        b.len(),
+                        size
+                    ))
+                } else {
+                    None
+                }
+            }
+            (&Value::Fixed(n, _), &Schema::Duration) => {
+                if n != 12 {
+                    Some(format!(
+                        "The value's size ('{}') must be exactly 12 to be a Duration",
+                        n
+                    ))
+                } else {
+                    None
+                }
+            }
             // TODO: check precision against n
-            (&Value::Fixed(_n, _), &Schema::Decimal { .. }) => true,
-            (&Value::String(ref s), &Schema::Enum { ref symbols, .. }) => symbols.contains(s),
+            (&Value::Fixed(_n, _), &Schema::Decimal { .. }) => None,
+            (&Value::String(ref s), &Schema::Enum { ref symbols, .. }) => {
+                if !symbols.contains(s) {
+                    Some(format!("'{}' is not a member of the possible symbols", s))
+                } else {
+                    None
+                }
+            }
             (&Value::Enum(i, ref s), &Schema::Enum { ref symbols, .. }) => symbols
                 .get(i as usize)
-                .map(|ref symbol| symbol == &s)
-                .unwrap_or(false),
-            // (&Value::Union(None), &Schema::Union(_)) => true,
+                .map(|ref symbol| {
+                    if symbol != &s {
+                        Some(format!("Symbol '{}' is not at position '{}'", s, i))
+                    } else {
+                        None
+                    }
+                })
+                .unwrap_or_else(|| Some(format!("No symbol at position '{}'", i))),
+            // (&Value::Union(None), &Schema::Union(_)) => None,
             (&Value::Union(i, ref value), &Schema::Union(ref inner)) => inner
                 .variants()
                 .get(i as usize)
                 .map(|schema| value.validate_internal(schema, names))
-                .unwrap_or(false),
-            (&Value::Array(ref items), &Schema::Array(ref inner)) => items
-                .iter()
-                .all(|item| item.validate_internal(inner, names)),
-            (&Value::Map(ref items), &Schema::Map(ref inner)) => items
-                .iter()
-                .all(|(_, value)| value.validate_internal(inner, names)),
+                .unwrap_or_else(|| Some(format!("No schema in the union at position '{}'", i))),
+            (&Value::Array(ref items), &Schema::Array(ref inner)) => {
+                items.iter().fold(None, |acc, item| {
+                    Value::accumulate(acc, item.validate_internal(inner, names))
+                })
+            }
+            (&Value::Map(ref items), &Schema::Map(ref inner)) => {
+                items.iter().fold(None, |acc, (_, value)| {
+                    Value::accumulate(acc, value.validate_internal(inner, names))
+                })
+            }
             (&Value::Record(ref record_fields), &Schema::Record { ref fields, .. }) => {
-                fields.len() == record_fields.len()
-                    && fields.iter().zip(record_fields.iter()).all(
-                        |(field, &(ref name, ref value))| {
-                            field.name == *name && value.validate_internal(&field.schema, names)
-                        },
-                    )
+                if fields.len() != record_fields.len() {
+                    return Some(format!(
+                        "The value's records length ({}) is different than the schema's ({})",
+                        record_fields.len(),
+                        fields.len()
+                    ));
+                }
+
+                fields.iter().zip(record_fields.iter()).fold(
+                    None,
+                    |acc, (field, &(ref name, ref value))| {
+                        if field.name != *name {
+                            return Some(format!(
+                                "Value's name '{}' does not match the expected field's name '{}'",
+                                name, field.name
+                            ));
+                        }
+                        let res = value.validate_internal(&field.schema, names);
+                        Value::accumulate(acc, res)
+                    },
+                )
             }
             (&Value::Map(ref items), &Schema::Record { ref fields, .. }) => {
-                fields.iter().all(|field| {
+                fields.iter().fold(None, |acc, field| {
                     if let Some(item) = items.get(&field.name) {
-                        item.validate_internal(&field.schema, names)
+                        let res = item.validate_internal(&field.schema, names);
+                        Value::accumulate(acc, res)
                     } else {
-                        false
+                        Value::accumulate(
+                            acc,
+                            Some(format!(
+                                "Field with name '{:?}' is not a member of the map items",
+                                field.name
+                            )),
+                        )
                     }
                 })
             }
-            (v, s) => {
-                error!("Unsupported value-schema combination:\n{:?}\n{:?}", v, s);
-                false
-            }
+            (_v, _s) => Some("Unsupported value-schema combination".to_string()),
         }
     }
 
@@ -825,27 +913,75 @@ mod tests {
         schema::{Name, RecordField, RecordFieldOrder, Schema, UnionSchema},
         types::Value,
     };
+    use log::{Level, LevelFilter, Metadata};
     use uuid::Uuid;
 
+    use ref_thread_local::{ref_thread_local, RefThreadLocal};
+
+    ref_thread_local! {
+        // The unit tests run in parallel
+        // We need to keep the log messages in a thread-local variable
+        // and clear them after assertion
+        static managed LOG_MESSAGES: Vec<String> = Vec::new();
+    }
+
+    struct TestLogger;
+
+    impl log::Log for TestLogger {
+        fn enabled(&self, metadata: &Metadata) -> bool {
+            metadata.level() <= Level::Error
+        }
+
+        fn log(&self, record: &log::Record) {
+            if self.enabled(record.metadata()) {
+                let mut msgs = LOG_MESSAGES.borrow_mut();
+                msgs.push(format!("{}", record.args()));
+            }
+        }
+
+        fn flush(&self) {}
+    }
+
+    static TEST_LOGGER: TestLogger = TestLogger;
+
+    fn init() {
+        let _ = log::set_logger(&TEST_LOGGER);
+        log::set_max_level(LevelFilter::Info);
+    }
+
+    fn assert_log_message(expected_message: &str) {
+        let mut msgs = LOG_MESSAGES.borrow_mut();
+        assert_eq!(msgs.pop().unwrap(), expected_message);
+        msgs.clear();
+    }
+
     #[test]
     fn validate() {
         let value_schema_valid = vec![
-            (Value::Int(42), Schema::Int, true),
-            (Value::Int(42), Schema::Boolean, false),
+            (Value::Int(42), Schema::Int, true, ""),
+            (
+                Value::Int(42),
+                Schema::Boolean,
+                false,
+                "Invalid value: Int(42) for schema: Boolean. Reason: Unsupported value-schema combination",
+            ),
             (
                 Value::Union(0, Box::new(Value::Null)),
                 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
                 true,
+                "",
             ),
             (
                 Value::Union(1, Box::new(Value::Int(42))),
                 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
                 true,
+                "",
             ),
             (
                 Value::Union(0, Box::new(Value::Null)),
                 Schema::Union(UnionSchema::new(vec![Schema::Double, Schema::Int]).unwrap()),
                 false,
+                "Invalid value: Union(0, Null) for schema: Union(UnionSchema { schemas: [Double, Int], variant_index: {Int: 1, Double: 0} }). Reason: Unsupported value-schema combination",
             ),
             (
                 Value::Union(3, Box::new(Value::Int(42))),
@@ -859,6 +995,7 @@ mod tests {
                     .unwrap(),
                 ),
                 true,
+                "",
             ),
             (
                 Value::Union(1, Box::new(Value::Long(42i64))),
@@ -866,27 +1003,100 @@ mod tests {
                     UnionSchema::new(vec![Schema::Null, Schema::TimestampMillis]).unwrap(),
                 ),
                 true,
+                "",
+            ),
+            (
+                Value::Union(2, Box::new(Value::Long(1_i64))),
+                Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
+                false,
+                "Invalid value: Union(2, Long(1)) for schema: Union(UnionSchema { schemas: [Null, Int], variant_index: {Null: 0, Int: 1} }). Reason: No schema in the union at position '2'",
             ),
             (
                 Value::Array(vec![Value::Long(42i64)]),
                 Schema::Array(Box::new(Schema::Long)),
                 true,
+                "",
             ),
             (
                 Value::Array(vec![Value::Boolean(true)]),
                 Schema::Array(Box::new(Schema::Long)),
                 false,
+                "Invalid value: Array([Boolean(true)]) for schema: Array(Long). Reason: Unsupported value-schema combination",
+            ),
+            (Value::Record(vec![]), Schema::Null, false, "Invalid value: Record([]) for schema: Null. Reason: Unsupported value-schema combination"),
+            (
+                Value::Fixed(12, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]),
+                Schema::Duration,
+                true,
+                "",
+            ),
+            (
+                Value::Fixed(11, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
+                Schema::Duration,
+                false,
+                "Invalid value: Fixed(11, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) for schema: Duration. Reason: The value's size ('11') must be exactly 12 to be a Duration",
+            ),
+            (
+                Value::Record(vec![("unknown_field_name".to_string(), Value::Null)]),
+                Schema::Record {
+                    name: Name::new("record_name").unwrap(),
+                    aliases: None,
+                    doc: None,
+                    fields: vec![RecordField {
+                        name: "field_name".to_string(),
+                        doc: None,
+                        default: None,
+                        schema: Schema::Int,
+                        order: RecordFieldOrder::Ignore,
+                        position: 0,
+                    }],
+                    lookup: Default::default(),
+                },
+                false,
+                "Invalid value: Record([(\"unknown_field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Int, order: Ignore, position: 0 }], lookup: {} }. Reason: Value's name 'unknown_field_name' does not match the expected field's name 'field_name'",
+            ),
+            (
+                Value::Record(vec![("field_name".to_string(), Value::Null)]),
+                Schema::Record {
+                    name: Name::new("record_name").unwrap(),
+                    aliases: None,
+                    doc: None,
+                    fields: vec![RecordField {
+                        name: "field_name".to_string(),
+                        doc: None,
+                        default: None,
+                        schema: Schema::Ref {
+                            name: Name::new("missing").unwrap(),
+                        },
+                        order: RecordFieldOrder::Ignore,
+                        position: 0,
+                    }],
+                    lookup: Default::default(),
+                },
+                false,
+                "Invalid value: Record([(\"field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Ref { name: Name { name: \"missing\", namespace: None } }, order: Ignore, position: 0 }], lookup: {} }. Reason: Unresolved schema reference: 'missing'. Parsed names: []",
             ),
-            (Value::Record(vec![]), Schema::Null, false),
         ];
 
-        for (value, schema, valid) in value_schema_valid.into_iter() {
-            assert_eq!(valid, value.validate(&schema));
+        for (value, schema, valid, expected_err_message) in value_schema_valid.into_iter() {
+            let err_message = value.validate_internal(&schema, &HashMap::default());
+            assert_eq!(valid, err_message.is_none());
+            if !valid {
+                let full_err_message = format!(
+                    "Invalid value: {:?} for schema: {:?}. Reason: {}",
+                    value,
+                    schema,
+                    err_message.unwrap()
+                );
+                assert_eq!(expected_err_message, full_err_message);
+            }
         }
     }
 
     #[test]
     fn validate_fixed() {
+        init();
+
         let schema = Schema::Fixed {
             size: 4,
             name: Name::new("some_fixed").unwrap(),
@@ -895,13 +1105,32 @@ mod tests {
         };
 
         assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema));
-        assert!(!Value::Fixed(5, vec![0, 0, 0, 0, 0]).validate(&schema));
+        let value = Value::Fixed(5, vec![0, 0, 0, 0, 0]);
+        assert!(!value.validate(&schema));
+        assert_log_message(
+            format!(
+                "Invalid value: {:?} for schema: {:?}. Reason: {}",
+                value, schema, "The value's size (5) is different than the schema's size (4)"
+            )
+            .as_str(),
+        );
+
         assert!(Value::Bytes(vec![0, 0, 0, 0]).validate(&schema));
-        assert!(!Value::Bytes(vec![0, 0, 0, 0, 0]).validate(&schema));
+        let value = Value::Bytes(vec![0, 0, 0, 0, 0]);
+        assert!(!value.validate(&schema));
+        assert_log_message(
+            format!(
+                "Invalid value: {:?} for schema: {:?}. Reason: {}",
+                value, schema, "The bytes' length (5) is different than the schema's size (4)"
+            )
+            .as_str(),
+        );
     }
 
     #[test]
     fn validate_enum() {
+        init();
+
         let schema = Schema::Enum {
             name: Name::new("some_enum").unwrap(),
             aliases: None,
@@ -917,8 +1146,35 @@ mod tests {
         assert!(Value::Enum(0, "spades".to_string()).validate(&schema));
         assert!(Value::String("spades".to_string()).validate(&schema));
 
-        assert!(!Value::Enum(1, "spades".to_string()).validate(&schema));
-        assert!(!Value::String("lorem".to_string()).validate(&schema));
+        let value = Value::Enum(1, "spades".to_string());
+        assert!(!value.validate(&schema));
+        assert_log_message(
+            format!(
+                "Invalid value: {:?} for schema: {:?}. Reason: {}",
+                value, schema, "Symbol 'spades' is not at position '1'"
+            )
+            .as_str(),
+        );
+
+        let value = Value::Enum(1000, "spades".to_string());
+        assert!(!value.validate(&schema));
+        assert_log_message(
+            format!(
+                "Invalid value: {:?} for schema: {:?}. Reason: {}",
+                value, schema, "No symbol at position '1000'"
+            )
+            .as_str(),
+        );
+
+        let value = Value::String("lorem".to_string());
+        assert!(!value.validate(&schema));
+        assert_log_message(
+            format!(
+                "Invalid value: {:?} for schema: {:?}. Reason: {}",
+                value, schema, "'lorem' is not a member of the possible symbols"
+            )
+            .as_str(),
+        );
 
         let other_schema = Schema::Enum {
             name: Name::new("some_other_enum").unwrap(),
@@ -932,11 +1188,21 @@ mod tests {
             ],
         };
 
-        assert!(!Value::Enum(0, "spades".to_string()).validate(&other_schema));
+        let value = Value::Enum(0, "spades".to_string());
+        assert!(!value.validate(&other_schema));
+        assert_log_message(
+            format!(
+                "Invalid value: {:?} for schema: {:?}. Reason: {}",
+                value, other_schema, "Symbol 'spades' is not at position '0'"
+            )
+            .as_str(),
+        );
     }
 
     #[test]
     fn validate_record() {
+        init();
+
         use std::collections::HashMap;
         // {
         //    "type": "record",
@@ -976,30 +1242,42 @@ mod tests {
         ])
         .validate(&schema));
 
-        assert!(!Value::Record(vec![
+        let value = Value::Record(vec![
             ("b".to_string(), Value::String("foo".to_string())),
             ("a".to_string(), Value::Long(42i64)),
-        ])
-        .validate(&schema));
+        ]);
+        assert!(!value.validate(&schema));
+        assert_log_message(
+            format!(
+                "Invalid value: {:?} for schema: {:?}. Reason: {}",
+                value, schema, "Value's name 'a' does not match the expected field's name 'b'"
+            )
+            .as_str(),
+        );
 
-        assert!(!Value::Record(vec![
+        let value = Value::Record(vec![
             ("a".to_string(), Value::Boolean(false)),
             ("b".to_string(), Value::String("foo".to_string())),
-        ])
-        .validate(&schema));
+        ]);
+        assert!(!value.validate(&schema));
+        assert_log_message("Invalid value: Record([(\"a\", Boolean(false)), (\"b\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {} }. Reason: Unsupported value-schema combination");
 
-        assert!(!Value::Record(vec![
+        let value = Value::Record(vec![
             ("a".to_string(), Value::Long(42i64)),
             ("c".to_string(), Value::String("foo".to_string())),
-        ])
-        .validate(&schema));
+        ]);
+        assert!(!value.validate(&schema));
+        assert_log_message(
+            "Invalid value: Record([(\"a\", Long(42)), (\"c\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {} }. Reason: Value's name 'c' does not match the expected field's name 'b'"
+        );
 
-        assert!(!Value::Record(vec![
+        let value = Value::Record(vec![
             ("a".to_string(), Value::Long(42i64)),
             ("b".to_string(), Value::String("foo".to_string())),
             ("c".to_string(), Value::Null),
-        ])
-        .validate(&schema));
+        ]);
+        assert!(!value.validate(&schema));
+        assert_log_message("Invalid value: Record([(\"a\", Long(42)), (\"b\", String(\"foo\")), (\"c\", Null)]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {} }. Reason: The value's records length (3) is different  [...]
 
         assert!(Value::Map(
             vec![
@@ -1011,6 +1289,14 @@ mod tests {
         )
         .validate(&schema));
 
+        assert!(!Value::Map(
+            vec![("c".to_string(), Value::Long(123_i64)),]
+                .into_iter()
+                .collect()
+        )
+        .validate(&schema));
+        assert_log_message("Invalid value: Map({\"c\": Long(123)}) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {} }. Reason: Field with name '\"a\"' is not a member of the map items\nField with name '\"b\"' is not  [...]
+
         let union_schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema]).unwrap());
 
         assert!(Value::Union(
@@ -1361,7 +1647,7 @@ mod tests {
         let outer = Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
         outer
             .resolve(&schema)
-            .expect("Record definition defined in one field must be availible in other field");
+            .expect("Record definition defined in one field must be available in other field");
     }
 
     #[test]
@@ -1409,7 +1695,7 @@ mod tests {
         ]);
         outer_value
             .resolve(&schema)
-            .expect("Record defined in array definition must be resolveable from map");
+            .expect("Record defined in array definition must be resolvable from map");
     }
 
     #[test]
@@ -1547,7 +1833,7 @@ mod tests {
         ]);
         outer_value
             .resolve(&schema)
-            .expect("Record defined in map definition must be resolveable from array");
+            .expect("Record defined in map definition must be resolvable from array");
     }
 
     #[test]
@@ -1586,11 +1872,11 @@ mod tests {
         ]);
         outer1
             .resolve(&schema)
-            .expect("Record definition defined in union must be resolvabled in other field");
+            .expect("Record definition defined in union must be resolved in other field");
         let outer2 = Value::Record(vec![("a".into(), Value::Null), ("b".into(), inner_value2)]);
         outer2
             .resolve(&schema)
-            .expect("Record definition defined in union must be resolvabled in other field");
+            .expect("Record definition defined in union must be resolved in other field");
     }
 
     #[test]
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 6bb37f734..e815f517f 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -357,11 +357,11 @@ fn write_value_ref_resolved(
     value: &Value,
     buffer: &mut Vec<u8>,
 ) -> AvroResult<()> {
-    if !value.validate_internal(
+    if let Some(err) = value.validate_internal(
         resolved_schema.get_root_schema(),
         resolved_schema.get_names(),
     ) {
-        return Err(Error::Validation);
+        return Err(Error::ValidationWithReason(err));
     }
     encode_internal(
         value,