You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ko...@apache.org on 2021/03/15 18:53:28 UTC

[avro] branch master updated: AVRO-1751: Add python3 compatibility (#979)

This is an automated email from the ASF dual-hosted git repository.

kojiromike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new f67e53a  AVRO-1751: Add python3 compatibility (#979)
f67e53a is described below

commit f67e53ac7e8592b609ba46ea9cd4d73c350baa85
Author: Tincu Gabriel <ga...@protonmail.com>
AuthorDate: Mon Mar 15 19:53:19 2021 +0100

    AVRO-1751: Add python3 compatibility (#979)
    
    * AVRO-1751: add python3 compatibility
    
    Keeping in line with the same logic present in the avro java library
    that handles type promotions, this aims to be a faithful adaptation of
    that code in python.
    
    Most tests were also ported, with the exception of the ones dealing with
    enum default, since the python api does not support that yet
    
    See https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/SchemaCompatibility.java
    
    * typing: fix mypy issues
    
    Merge is now a function instead of a SchemaCompatibilityResult method to
    circumvent the fact that a method cannot receive type hints of it's own
    type without __future__ imports until 3.7 an without until 3.10
    
    More at https://www.python.org/dev/peps/pep-0563/
    
    * style: make compat code python3.5 compatible
    
    - remove f-string
    - remove class member annotations
    
    Co-authored-by: Gabriel Tincu <ga...@aiven.io>
---
 lang/py/avro/compatibility.py           | 357 ++++++++++++++++
 lang/py/avro/errors.py                  |   4 +
 lang/py/avro/test/test_compatibility.py | 728 ++++++++++++++++++++++++++++++++
 3 files changed, 1089 insertions(+)

diff --git a/lang/py/avro/compatibility.py b/lang/py/avro/compatibility.py
new file mode 100644
index 0000000..98d60a0
--- /dev/null
+++ b/lang/py/avro/compatibility.py
@@ -0,0 +1,357 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from copy import copy
+from enum import Enum
+from typing import Dict, List, Optional, Set, cast
+
+from avro.errors import AvroRuntimeException
+from avro.schema import ArraySchema, EnumSchema, Field, FixedSchema, MapSchema, NamedSchema, RecordSchema, Schema, UnionSchema
+
+
+class SchemaType(str, Enum):
+    ARRAY = "array"
+    BOOLEAN = "boolean"
+    BYTES = "bytes"
+    DOUBLE = "double"
+    ENUM = "enum"
+    FIXED = "fixed"
+    FLOAT = "float"
+    INT = "int"
+    LONG = "long"
+    MAP = "map"
+    NULL = "null"
+    RECORD = "record"
+    STRING = "string"
+    UNION = "union"
+
+
+class SchemaCompatibilityType(Enum):
+    compatible = "compatible"
+    incompatible = "incompatible"
+    recursion_in_progress = "recursion_in_progress"
+
+
+class SchemaIncompatibilityType(Enum):
+    name_mismatch = "name_mismatch"
+    fixed_size_mismatch = "fixed_size_mismatch"
+    missing_enum_symbols = "missing_enum_symbols"
+    reader_field_missing_default_value = "reader_field_missing_default_value"
+    type_mismatch = "type_mismatch"
+    missing_union_branch = "missing_union_branch"
+
+
+PRIMITIVE_TYPES = {
+    SchemaType.NULL, SchemaType.BOOLEAN, SchemaType.INT,
+    SchemaType.LONG, SchemaType.FLOAT, SchemaType.DOUBLE,
+    SchemaType.BYTES, SchemaType.STRING
+}
+
+
+class SchemaCompatibilityResult:
+    def __init__(
+        self,
+        compatibility: SchemaCompatibilityType = SchemaCompatibilityType.recursion_in_progress,
+        incompatibilities: List[SchemaIncompatibilityType] = None,
+        messages: Optional[Set[str]] = None,
+        locations: Optional[Set[str]] = None,
+    ):
+        self.locations = locations or {"/"}
+        self.messages = messages or set()
+        self.compatibility = compatibility
+        self.incompatibilities = incompatibilities or []
+
+
+def merge(this: SchemaCompatibilityResult, that: SchemaCompatibilityResult) -> SchemaCompatibilityResult:
+    """
+    Merges two {@code SchemaCompatibilityResult} into a new instance, combining the list of Incompatibilities
+    and regressing to the SchemaCompatibilityType.incompatible state if any incompatibilities are encountered.
+    :param this: SchemaCompatibilityResult
+    :param that: SchemaCompatibilityResult
+    :return: SchemaCompatibilityResult
+    """
+    that = cast(SchemaCompatibilityResult, that)
+    merged = [*copy(this.incompatibilities), *copy(that.incompatibilities)]
+    if this.compatibility is SchemaCompatibilityType.compatible:
+        compat = that.compatibility
+        messages = that.messages
+        locations = that.locations
+    else:
+        compat = this.compatibility
+        messages = this.messages.union(that.messages)
+        locations = this.locations.union(that.locations)
+    return SchemaCompatibilityResult(
+        compatibility=compat, incompatibilities=merged, messages=messages, locations=locations
+    )
+
+
+CompatibleResult = SchemaCompatibilityResult(SchemaCompatibilityType.compatible)
+
+
+class ReaderWriter:
+    def __init__(self, reader: Schema, writer: Schema) -> None:
+        self.reader, self.writer = reader, writer
+
+    def __hash__(self) -> int:
+        return id(self.reader) ^ id(self.writer)
+
+    def __eq__(self, other) -> bool:
+        if not isinstance(other, ReaderWriter):
+            return False
+        return self.reader is other.reader and self.writer is other.writer
+
+
+class ReaderWriterCompatibilityChecker:
+    ROOT_REFERENCE_TOKEN = "/"
+
+    def __init__(self):
+        self.memoize_map = {}
+
+    def get_compatibility(
+        self,
+        reader: Schema,
+        writer: Schema,
+        reference_token: str = ROOT_REFERENCE_TOKEN,
+        location: Optional[List[str]] = None
+    ) -> SchemaCompatibilityResult:
+        if location is None:
+            location = []
+        pair = ReaderWriter(reader, writer)
+        if pair in self.memoize_map:
+            result = cast(SchemaCompatibilityResult, self.memoize_map[pair])
+            if result.compatibility is SchemaCompatibilityType.recursion_in_progress:
+                result = CompatibleResult
+        else:
+            self.memoize_map[pair] = SchemaCompatibilityResult()
+            result = self.calculate_compatibility(reader, writer, location + [reference_token])
+            self.memoize_map[pair] = result
+        return result
+
+    # pylSchemaType.INT: disable=too-many-return-statements
+    def calculate_compatibility(
+        self,
+        reader: Schema,
+        writer: Schema,
+        location: List[str],
+    ) -> SchemaCompatibilityResult:
+        """
+        Calculates the compatibility of a reader/writer schema pair. Will be positive if the reader is capable of reading
+        whatever the writer may write
+        :param reader: avro.schema.Schema
+        :param writer: avro.schema.Schema
+        :param location: List[str]
+        :return: SchemaCompatibilityResult
+        """
+        assert reader is not None
+        assert writer is not None
+        result = CompatibleResult
+        if reader.type == writer.type:
+            if reader.type in PRIMITIVE_TYPES:
+                return result
+            if reader.type == SchemaType.ARRAY:
+                reader, writer = cast(ArraySchema, reader), cast(ArraySchema, writer)
+                return merge(result, self.get_compatibility(reader.items, writer.items, "items", location))
+            if reader.type == SchemaType.MAP:
+                reader, writer = cast(MapSchema, reader), cast(MapSchema, writer)
+                return merge(result, self.get_compatibility(reader.values, writer.values, "values", location))
+            if reader.type == SchemaType.FIXED:
+                reader, writer = cast(FixedSchema, reader), cast(FixedSchema, writer)
+                result = merge(result, check_schema_names(reader, writer, location))
+                return merge(result, check_fixed_size(reader, writer, location))
+            if reader.type == SchemaType.ENUM:
+                reader, writer = cast(EnumSchema, reader), cast(EnumSchema, writer)
+                result = merge(result, check_schema_names(reader, writer, location))
+                return merge(result, check_reader_enum_contains_writer_enum(reader, writer, location))
+            if reader.type == SchemaType.RECORD:
+                reader, writer = cast(RecordSchema, reader), cast(RecordSchema, writer)
+                result = merge(result, check_schema_names(reader, writer, location))
+                return merge(result, self.check_reader_writer_record_fields(reader, writer, location))
+            if reader.type == SchemaType.UNION:
+                reader, writer = cast(UnionSchema, reader), cast(UnionSchema, writer)
+                for i, writer_branch in enumerate(writer.schemas):
+                    compat = self.get_compatibility(reader, writer_branch)
+                    if compat.compatibility is SchemaCompatibilityType.incompatible:
+                        result = merge(
+                            result,
+                            incompatible(
+                                SchemaIncompatibilityType.missing_union_branch,
+                                "reader union lacking writer type: {}".format(writer_branch.type.upper()), location + [str(i)]
+                            )
+                        )
+                return result
+            raise AvroRuntimeException("Unknown schema type: {}".format(reader.type))
+        if writer.type == SchemaType.UNION:
+            writer = cast(UnionSchema, writer)
+            for s in writer.schemas:
+                result = merge(result, self.get_compatibility(reader, s))
+            return result
+        if reader.type in {SchemaType.NULL, SchemaType.BOOLEAN, SchemaType.INT}:
+            return merge(result, type_mismatch(reader, writer, location))
+        if reader.type == SchemaType.LONG:
+            if writer.type == SchemaType.INT:
+                return result
+            return merge(result, type_mismatch(reader, writer, location))
+        if reader.type == SchemaType.FLOAT:
+            if writer.type in {SchemaType.INT, SchemaType.LONG}:
+                return result
+            return merge(result, type_mismatch(reader, writer, location))
+        if reader.type == SchemaType.DOUBLE:
+            if writer.type in {SchemaType.INT, SchemaType.LONG, SchemaType.FLOAT}:
+                return result
+            return merge(result, type_mismatch(reader, writer, location))
+        if reader.type == SchemaType.BYTES:
+            if writer.type == SchemaType.STRING:
+                return result
+            return merge(result, type_mismatch(reader, writer, location))
+        if reader.type == SchemaType.STRING:
+            if writer.type == SchemaType.BYTES:
+                return result
+            return merge(result, type_mismatch(reader, writer, location))
+        if reader.type in {SchemaType.ARRAY, SchemaType.MAP, SchemaType.FIXED, SchemaType.ENUM, SchemaType.RECORD}:
+            return merge(result, type_mismatch(reader, writer, location))
+        if reader.type == SchemaType.UNION:
+            reader = cast(UnionSchema, reader)
+            for reader_branch in reader.schemas:
+                compat = self.get_compatibility(reader_branch, writer)
+                if compat.compatibility is SchemaCompatibilityType.compatible:
+                    return result
+            # No branch in reader compatible with writer
+            message = "reader union lacking writer type {}".format(writer.type)
+            return merge(
+                result,
+                incompatible(
+                    SchemaIncompatibilityType.missing_union_branch, message, location
+                )
+            )
+        raise AvroRuntimeException("Unknown schema type: {}".format(reader.type))
+
+    # pylSchemaType.INT: enable=too-many-return-statements
+
+    def check_reader_writer_record_fields(
+        self, reader: RecordSchema, writer: RecordSchema, location: List[str]
+    ) -> SchemaCompatibilityResult:
+        result = CompatibleResult
+        for i, reader_field in enumerate(reader.fields):
+            reader_field = cast(Field, reader_field)
+            writer_field = lookup_writer_field(writer_schema=writer, reader_field=reader_field)
+            if writer_field is None:
+                if not reader_field.has_default:
+                    if reader_field.type.type == SchemaType.ENUM and reader_field.type.props.get("default"):
+                        result = merge(
+                            result,
+                            self.get_compatibility(
+                                reader_field.type, writer, "type", location + ["fields", str(i)]
+                            ))
+                    else:
+                        result = merge(
+                            result,
+                            incompatible(
+                                SchemaIncompatibilityType.reader_field_missing_default_value,
+                                reader_field.name, location + ["fields", str(i)]
+                            )
+                        )
+            else:
+                result = merge(
+                    result,
+                    self.get_compatibility(
+                        reader_field.type, writer_field.type, "type", location + ["fields", str(i)]
+                    ))
+        return result
+
+
+def type_mismatch(reader: Schema, writer: Schema, location: List[str]) -> SchemaCompatibilityResult:
+    message = "reader type: {} not compatible with writer type: {}".format(reader.type.upper(), writer.type.upper())
+    return incompatible(SchemaIncompatibilityType.type_mismatch, message, location)
+
+
+def check_schema_names(
+    reader: NamedSchema, writer: NamedSchema,
+    location: List[str]
+) -> SchemaCompatibilityResult:
+    result = CompatibleResult
+    if not schema_name_equals(reader, writer):
+        message = "expected: {}".format(writer.fullname)
+        result = incompatible(SchemaIncompatibilityType.name_mismatch, message, location + ["name"])
+    return result
+
+
+def check_fixed_size(
+    reader: FixedSchema, writer:
+    FixedSchema, location: List[str]
+) -> SchemaCompatibilityResult:
+    result = CompatibleResult
+    actual = reader.size
+    expected = writer.size
+    if actual != expected:
+        message = "expected: {}, found: {}".format(expected, actual)
+        result = incompatible(
+            SchemaIncompatibilityType.fixed_size_mismatch,
+            message,
+            location + ["size"],
+        )
+    return result
+
+
+def check_reader_enum_contains_writer_enum(
+    reader: EnumSchema, writer: EnumSchema, location: List[str]
+) -> SchemaCompatibilityResult:
+    result = CompatibleResult
+    writer_symbols, reader_symbols = set(writer.symbols), set(reader.symbols)
+    extra_symbols = writer_symbols.difference(reader_symbols)
+    if extra_symbols:
+        default = reader.props.get("default")
+        if default and default in reader_symbols:
+            result = CompatibleResult
+        else:
+            result = incompatible(
+                SchemaIncompatibilityType.missing_enum_symbols,
+                str(extra_symbols), location + ["symbols"]
+            )
+    return result
+
+
+def incompatible(incompat_type: SchemaIncompatibilityType, message: str, location: List[str]) -> SchemaCompatibilityResult:
+    locations = "/".join(location)
+    if len(location) > 1:
+        locations = locations[1:]
+    ret = SchemaCompatibilityResult(
+        compatibility=SchemaCompatibilityType.incompatible,
+        incompatibilities=[incompat_type],
+        locations={locations},
+        messages={message},
+    )
+    return ret
+
+
+def schema_name_equals(reader: NamedSchema, writer: NamedSchema) -> bool:
+    if reader.name == writer.name:
+        return True
+    return writer.fullname in reader.props.get("aliases", [])
+
+
+def lookup_writer_field(writer_schema: RecordSchema, reader_field: Field) -> Optional[Field]:
+    direct = writer_schema.fields_dict.get(reader_field.name)
+    if direct:
+        return cast(Field, direct)
+    for alias in reader_field.props.get("aliases", []):
+        writer_field = writer_schema.fields_dict.get(alias)
+        if writer_field is not None:
+            return cast(Field, writer_field)
+    return None
diff --git a/lang/py/avro/errors.py b/lang/py/avro/errors.py
index 4d5b7d9..0490f90 100644
--- a/lang/py/avro/errors.py
+++ b/lang/py/avro/errors.py
@@ -83,3 +83,7 @@ class UnsupportedCodec(NotImplementedError, AvroException):
 
 class UsageError(RuntimeError, AvroException):
     """An exception raised when incorrect arguments were passed."""
+
+
+class AvroRuntimeException(RuntimeError, AvroException):
+    """Raised when compatibility parsing encounters an unknown type"""
diff --git a/lang/py/avro/test/test_compatibility.py b/lang/py/avro/test/test_compatibility.py
new file mode 100644
index 0000000..729bcb7
--- /dev/null
+++ b/lang/py/avro/test/test_compatibility.py
@@ -0,0 +1,728 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from avro.compatibility import ReaderWriterCompatibilityChecker, SchemaCompatibilityType, SchemaType
+from avro.schema import ArraySchema, MapSchema, Names, PrimitiveSchema, Schema, UnionSchema, parse
+
+BOOLEAN_SCHEMA = PrimitiveSchema(SchemaType.BOOLEAN)
+NULL_SCHEMA = PrimitiveSchema(SchemaType.NULL)
+INT_SCHEMA = PrimitiveSchema(SchemaType.INT)
+LONG_SCHEMA = PrimitiveSchema(SchemaType.LONG)
+STRING_SCHEMA = PrimitiveSchema(SchemaType.STRING)
+BYTES_SCHEMA = PrimitiveSchema(SchemaType.BYTES)
+FLOAT_SCHEMA = PrimitiveSchema(SchemaType.FLOAT)
+DOUBLE_SCHEMA = PrimitiveSchema(SchemaType.DOUBLE)
+INT_ARRAY_SCHEMA = ArraySchema(SchemaType.INT, names=Names())
+LONG_ARRAY_SCHEMA = ArraySchema(SchemaType.LONG, names=Names())
+STRING_ARRAY_SCHEMA = ArraySchema(SchemaType.STRING, names=Names())
+INT_MAP_SCHEMA = MapSchema(SchemaType.INT, names=Names())
+LONG_MAP_SCHEMA = MapSchema(SchemaType.LONG, names=Names())
+STRING_MAP_SCHEMA = MapSchema(SchemaType.STRING, names=Names())
+ENUM1_AB_SCHEMA = parse(json.dumps({"type": SchemaType.ENUM, "name": "Enum1", "symbols": ["A", "B"]}))
+ENUM1_ABC_SCHEMA = parse(json.dumps({"type": SchemaType.ENUM, "name": "Enum1", "symbols": ["A", "B", "C"]}))
+ENUM1_BC_SCHEMA = parse(json.dumps({"type": SchemaType.ENUM, "name": "Enum1", "symbols": ["B", "C"]}))
+ENUM2_AB_SCHEMA = parse(json.dumps({"type": SchemaType.ENUM, "name": "Enum2", "symbols": ["A", "B"]}))
+ENUM_ABC_ENUM_DEFAULT_A_SCHEMA = parse(
+    json.dumps({
+        "type": "enum",
+        "name": "Enum",
+        "symbols": ["A", "B", "C"],
+        "default": "A"
+    })
+)
+ENUM_AB_ENUM_DEFAULT_A_SCHEMA = parse(json.dumps({"type": SchemaType.ENUM, "name": "Enum", "symbols": ["A", "B"], "default": "A"}))
+ENUM_ABC_ENUM_DEFAULT_A_RECORD = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record",
+        "fields": [{
+            "name": "Field",
+            "type": {
+                "type": SchemaType.ENUM,
+                "name": "Enum",
+                "symbols": ["A", "B", "C"],
+                "default": "A"
+            }
+        }]
+    })
+)
+ENUM_AB_ENUM_DEFAULT_A_RECORD = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record",
+        "fields": [{
+            "name": "Field",
+            "type": {
+                "type": SchemaType.ENUM,
+                "name": "Enum",
+                "symbols": ["A", "B"],
+                "default": "A"
+            }
+        }]
+    })
+)
+ENUM_ABC_FIELD_DEFAULT_B_ENUM_DEFAULT_A_RECORD = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record",
+        "fields": [{
+            "name": "Field",
+            "type": {
+                "type": SchemaType.ENUM,
+                "name": "Enum",
+                "symbols": ["A", "B", "C"],
+                "default": "A"
+            },
+            "default": "B"
+        }]
+    })
+)
+ENUM_AB_FIELD_DEFAULT_A_ENUM_DEFAULT_B_RECORD = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record",
+        "fields": [{
+            "name": "Field",
+            "type": {
+                "type": SchemaType.ENUM,
+                "name": "Enum",
+                "symbols": ["A", "B"],
+                "default": "B"
+            },
+            "default": "A"
+        }]
+    })
+)
+EMPTY_UNION_SCHEMA = UnionSchema([], names=Names())
+NULL_UNION_SCHEMA = UnionSchema([SchemaType.NULL], names=Names())
+INT_UNION_SCHEMA = UnionSchema([SchemaType.INT], names=Names())
+LONG_UNION_SCHEMA = UnionSchema([SchemaType.LONG], names=Names())
+FLOAT_UNION_SCHEMA = UnionSchema([SchemaType.FLOAT], names=Names())
+DOUBLE_UNION_SCHEMA = UnionSchema([SchemaType.DOUBLE], names=Names())
+STRING_UNION_SCHEMA = UnionSchema([SchemaType.STRING], names=Names())
+BYTES_UNION_SCHEMA = UnionSchema([SchemaType.BYTES], names=Names())
+INT_STRING_UNION_SCHEMA = UnionSchema([SchemaType.INT, SchemaType.STRING], names=Names())
+STRING_INT_UNION_SCHEMA = UnionSchema([SchemaType.STRING, SchemaType.INT], names=Names())
+INT_FLOAT_UNION_SCHEMA = UnionSchema([SchemaType.INT, SchemaType.FLOAT], names=Names())
+INT_LONG_UNION_SCHEMA = UnionSchema([SchemaType.INT, SchemaType.LONG], names=Names())
+INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA = UnionSchema([SchemaType.INT, SchemaType.LONG, SchemaType.FLOAT, SchemaType.DOUBLE], names=Names())
+NULL_INT_ARRAY_UNION_SCHEMA = UnionSchema([{"type": SchemaType.NULL}, {"type": SchemaType.ARRAY, "items": SchemaType.INT}], names=Names())
+NULL_INT_MAP_UNION_SCHEMA = UnionSchema([{"type": SchemaType.NULL}, {"type": SchemaType.MAP, "values": SchemaType.INT}], names=Names())
+EMPTY_RECORD1 = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record1", "fields": []}))
+EMPTY_RECORD2 = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record2", "fields": []}))
+A_INT_RECORD1 = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record1", "fields": [{"name": "a", "type": SchemaType.INT}]}))
+A_LONG_RECORD1 = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record1", "fields": [{"name": "a", "type": SchemaType.LONG}]}))
+A_INT_B_INT_RECORD1 = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "a",
+            "type": SchemaType.INT
+        }, {
+            "name": "b",
+            "type": SchemaType.INT
+        }]
+    })
+)
+A_DINT_RECORD1 = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "a",
+            "type": SchemaType.INT,
+            "default": 0
+        }]
+    })
+)
+A_INT_B_DINT_RECORD1 = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "a",
+            "type": SchemaType.INT
+        }, {
+            "name": "b",
+            "type": SchemaType.INT,
+            "default": 0
+        }]
+    })
+)
+A_DINT_B_DINT_RECORD1 = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "a",
+            "type": SchemaType.INT,
+            "default": 0
+        }, {
+            "name": "b",
+            "type": SchemaType.INT,
+            "default": 0
+        }]
+    })
+)
+A_DINT_B_DFIXED_4_BYTES_RECORD1 = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "a",
+            "type": SchemaType.INT,
+            "default": 0
+        }, {
+            "name": "b",
+            "type": {
+                "type": SchemaType.FIXED,
+                "name": "Fixed",
+                "size": 4
+            }
+        }]
+    })
+)
+A_DINT_B_DFIXED_8_BYTES_RECORD1 = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "a",
+            "type": SchemaType.INT,
+            "default": 0
+        }, {
+            "name": "b",
+            "type": {
+                "type": SchemaType.FIXED,
+                "name": "Fixed",
+                "size": 8
+            }
+        }]
+    })
+)
+A_DINT_B_DINT_STRING_UNION_RECORD1 = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "a",
+            "type": SchemaType.INT,
+            "default": 0
+        }, {
+            "name": "b",
+            "type": [SchemaType.INT, SchemaType.STRING],
+            "default": 0
+        }]
+    })
+)
+A_DINT_B_DINT_UNION_RECORD1 = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "a",
+            "type": SchemaType.INT,
+            "default": 0
+        }, {
+            "name": "b",
+            "type": [SchemaType.INT],
+            "default": 0
+        }]
+    })
+)
+A_DINT_B_DENUM_1_RECORD1 = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "a",
+            "type": SchemaType.INT,
+            "default": 0
+        }, {
+            "name": "b",
+            "type": {
+                "type": SchemaType.ENUM,
+                "name": "Enum1",
+                "symbols": ["A", "B"]
+            }
+        }]
+    })
+)
+A_DINT_B_DENUM_2_RECORD1 = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "a",
+            "type": SchemaType.INT,
+            "default": 0
+        }, {
+            "name": "b",
+            "type": {
+                "type": SchemaType.ENUM,
+                "name": "Enum2",
+                "symbols": ["A", "B"]
+            }
+        }]
+    })
+)
+FIXED_4_BYTES = parse(json.dumps({"type": SchemaType.FIXED, "name": "Fixed", "size": 4}))
+FIXED_8_BYTES = parse(json.dumps({"type": SchemaType.FIXED, "name": "Fixed", "size": 8}))
+NS_RECORD1 = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "f1",
+            "type": [
+                SchemaType.NULL, {
+                    "type": SchemaType.ARRAY,
+                    "items": {
+                        "type": SchemaType.RECORD,
+                        "name": "InnerRecord1",
+                        "namespace": "ns1",
+                        "fields": [{
+                            "name": "a",
+                            "type": SchemaType.INT
+                        }]
+                    }
+                }
+            ]
+        }]
+    })
+)
+NS_RECORD2 = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "f1",
+            "type": [
+                SchemaType.NULL, {
+                    "type": SchemaType.ARRAY,
+                    "items": {
+                        "type": SchemaType.RECORD,
+                        "name": "InnerRecord1",
+                        "namespace": "ns2",
+                        "fields": [{
+                            "name": "a",
+                            "type": SchemaType.INT
+                        }]
+                    }
+                }
+            ]
+        }]
+    })
+)
+
+UNION_INT_RECORD1 = UnionSchema([
+    {"type": SchemaType.INT},
+    {"type": SchemaType.RECORD, "name": "Record1", "fields": [{"name": "field1", "type": SchemaType.INT}]}
+])
+UNION_INT_RECORD2 = UnionSchema([
+    {"type": SchemaType.INT},
+    {"type": "record", "name": "Record2", "fields": [{"name": "field1", "type": SchemaType.INT}]}
+])
+UNION_INT_ENUM1_AB = UnionSchema([{"type": SchemaType.INT}, ENUM1_AB_SCHEMA.to_json()])
+UNION_INT_FIXED_4_BYTES = UnionSchema([{"type": SchemaType.INT}, FIXED_4_BYTES.to_json()])
+UNION_INT_BOOLEAN = UnionSchema([{"type": SchemaType.INT}, {"type": SchemaType.BOOLEAN}])
+UNION_INT_ARRAY_INT = UnionSchema([{"type": SchemaType.INT}, INT_ARRAY_SCHEMA.to_json()])
+UNION_INT_MAP_INT = UnionSchema([{"type": SchemaType.INT}, INT_MAP_SCHEMA.to_json()])
+UNION_INT_NULL = UnionSchema([{"type": SchemaType.INT}, {"type": SchemaType.NULL}])
+FIXED_4_ANOTHER_NAME = parse(json.dumps({"type": SchemaType.FIXED, "name": "AnotherName", "size": 4}))
+RECORD1_WITH_ENUM_AB = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "field1",
+            "type": dict(ENUM1_AB_SCHEMA.to_json())
+        }]
+    })
+)
+RECORD1_WITH_ENUM_ABC = parse(
+    json.dumps({
+        "type": SchemaType.RECORD,
+        "name": "Record1",
+        "fields": [{
+            "name": "field1",
+            "type": dict(ENUM1_ABC_SCHEMA.to_json())
+        }]
+    })
+)
+
+
+def test_simple_schema_promotion():
+    field_alias_reader = parse(
+        json.dumps({
+            "name": "foo",
+            "type": "record",
+            "fields": [{
+                "type": "int",
+                "name": "bar",
+                "aliases": ["f1"]
+            }]
+        })
+    )
+    record_alias_reader = parse(
+        json.dumps({
+            "name": "other",
+            "type": "record",
+            "fields": [{
+                "type": "int",
+                "name": "f1"
+            }],
+            "aliases": ["foo"]
+        })
+    )
+
+    writer = parse(
+        json.dumps({
+            "name": "foo",
+            "type": "record",
+            "fields": [{
+                "type": "int",
+                "name": "f1"
+            }, {
+                "type": "string",
+                "name": "f2",
+            }]
+        })
+    )
+    # alias testing
+    res = ReaderWriterCompatibilityChecker().get_compatibility(field_alias_reader, writer)
+    assert res.compatibility is SchemaCompatibilityType.compatible, res.locations
+    res = ReaderWriterCompatibilityChecker().get_compatibility(record_alias_reader, writer)
+    assert res.compatibility is SchemaCompatibilityType.compatible, res.locations
+
+
+def test_schema_compatibility():
+    # testValidateSchemaPairMissingField
+    writer = parse(
+        json.dumps({
+            "type": SchemaType.RECORD,
+            "name": "Record",
+            "fields": [{
+                "name": "oldField1",
+                "type": SchemaType.INT
+            }, {
+                "name": "oldField2",
+                "type": SchemaType.STRING
+            }]
+        })
+    )
+    reader = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record", "fields": [{"name": "oldField1", "type": SchemaType.INT}]}))
+    assert are_compatible(reader, writer)
+    # testValidateSchemaPairMissingSecondField
+    reader = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record", "fields": [{"name": "oldField2", "type": SchemaType.STRING}]}))
+    assert are_compatible(reader, writer)
+    # testValidateSchemaPairAllFields
+    reader = parse(
+        json.dumps({
+            "type": SchemaType.RECORD,
+            "name": "Record",
+            "fields": [{
+                "name": "oldField1",
+                "type": SchemaType.INT
+            }, {
+                "name": "oldField2",
+                "type": SchemaType.STRING
+            }]
+        })
+    )
+    assert are_compatible(reader, writer)
+    # testValidateSchemaNewFieldWithDefault
+    reader = parse(
+        json.dumps({
+            "type": SchemaType.RECORD,
+            "name": "Record",
+            "fields": [{
+                "name": "oldField1",
+                "type": SchemaType.INT
+            }, {
+                "name": "newField2",
+                "type": SchemaType.INT,
+                "default": 42
+            }]
+        })
+    )
+    assert are_compatible(reader, writer)
+    # testValidateSchemaNewField
+    reader = parse(
+        json.dumps({
+            "type": SchemaType.RECORD,
+            "name": "Record",
+            "fields": [{
+                "name": "oldField1",
+                "type": SchemaType.INT
+            }, {
+                "name": "newField2",
+                "type": SchemaType.INT
+            }]
+        })
+    )
+    assert not are_compatible(reader, writer)
+    # testValidateArrayWriterSchema
+    writer = parse(json.dumps({"type": SchemaType.ARRAY, "items": {"type": SchemaType.STRING}}))
+    reader = parse(json.dumps({"type": SchemaType.ARRAY, "items": {"type": SchemaType.STRING}}))
+    assert are_compatible(reader, writer)
+    reader = parse(json.dumps({"type": SchemaType.MAP, "values": {"type": SchemaType.STRING}}))
+    assert not are_compatible(reader, writer)
+    # testValidatePrimitiveWriterSchema
+    writer = parse(json.dumps({"type": SchemaType.STRING}))
+    reader = parse(json.dumps({"type": SchemaType.STRING}))
+    assert are_compatible(reader, writer)
+    reader = parse(json.dumps({"type": SchemaType.INT}))
+    assert not are_compatible(reader, writer)
+    # testUnionReaderWriterSubsetIncompatibility
+    writer = parse(
+        json.dumps({
+            "name": "Record",
+            "type": "record",
+            "fields": [{
+                "name": "f1",
+                "type": [SchemaType.INT, SchemaType.STRING, SchemaType.LONG]
+            }]
+        })
+    )
+    reader = parse(json.dumps({"name": "Record", "type": SchemaType.RECORD, "fields": [{"name": "f1", "type": [SchemaType.INT, SchemaType.STRING]}]}))
+    reader = reader.fields[0].type
+    writer = writer.fields[0].type
+    assert isinstance(reader, UnionSchema)
+    assert isinstance(writer, UnionSchema)
+    assert not are_compatible(reader, writer)
+    # testReaderWriterCompatibility
+    compatible_reader_writer_test_cases = [
+        (BOOLEAN_SCHEMA, BOOLEAN_SCHEMA),
+        (INT_SCHEMA, INT_SCHEMA),
+        (LONG_SCHEMA, INT_SCHEMA),
+        (LONG_SCHEMA, LONG_SCHEMA),
+        (FLOAT_SCHEMA, INT_SCHEMA),
+        (FLOAT_SCHEMA, LONG_SCHEMA),
+        (DOUBLE_SCHEMA, LONG_SCHEMA),
+        (DOUBLE_SCHEMA, INT_SCHEMA),
+        (DOUBLE_SCHEMA, FLOAT_SCHEMA),
+        (STRING_SCHEMA, STRING_SCHEMA),
+        (BYTES_SCHEMA, BYTES_SCHEMA),
+        (STRING_SCHEMA, BYTES_SCHEMA),
+        (BYTES_SCHEMA, STRING_SCHEMA),
+        (INT_ARRAY_SCHEMA, INT_ARRAY_SCHEMA),
+        (LONG_ARRAY_SCHEMA, INT_ARRAY_SCHEMA),
+        (INT_MAP_SCHEMA, INT_MAP_SCHEMA),
+        (LONG_MAP_SCHEMA, INT_MAP_SCHEMA),
+        (ENUM1_AB_SCHEMA, ENUM1_AB_SCHEMA),
+        (ENUM1_ABC_SCHEMA, ENUM1_AB_SCHEMA),
+        # Union related pairs
+        (EMPTY_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+        (FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+        (FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
+        (FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
+        (FLOAT_UNION_SCHEMA, INT_LONG_UNION_SCHEMA),
+        (INT_UNION_SCHEMA, INT_UNION_SCHEMA),
+        (INT_STRING_UNION_SCHEMA, STRING_INT_UNION_SCHEMA),
+        (INT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+        (LONG_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+        (LONG_UNION_SCHEMA, INT_UNION_SCHEMA),
+        (FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
+        (DOUBLE_UNION_SCHEMA, INT_UNION_SCHEMA),
+        (FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
+        (DOUBLE_UNION_SCHEMA, LONG_UNION_SCHEMA),
+        (FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+        (DOUBLE_UNION_SCHEMA, FLOAT_UNION_SCHEMA),
+        (STRING_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+        (STRING_UNION_SCHEMA, BYTES_UNION_SCHEMA),
+        (BYTES_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+        (BYTES_UNION_SCHEMA, STRING_UNION_SCHEMA),
+        (DOUBLE_UNION_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+        # Readers capable of reading all branches of a union are compatible
+        (FLOAT_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+        (LONG_SCHEMA, INT_LONG_UNION_SCHEMA),
+        (DOUBLE_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+        (DOUBLE_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA),
+        # Special case of singleton unions:
+        (FLOAT_SCHEMA, FLOAT_UNION_SCHEMA),
+        (INT_UNION_SCHEMA, INT_SCHEMA),
+        (INT_SCHEMA, INT_UNION_SCHEMA),
+        # Fixed types
+        (FIXED_4_BYTES, FIXED_4_BYTES),
+        # Tests involving records:
+        (EMPTY_RECORD1, EMPTY_RECORD1),
+        (EMPTY_RECORD1, A_INT_RECORD1),
+        (A_INT_RECORD1, A_INT_RECORD1),
+        (A_DINT_RECORD1, A_INT_RECORD1),
+        (A_DINT_RECORD1, A_DINT_RECORD1),
+        (A_INT_RECORD1, A_DINT_RECORD1),
+        (A_LONG_RECORD1, A_INT_RECORD1),
+        (A_INT_RECORD1, A_INT_B_INT_RECORD1),
+        (A_DINT_RECORD1, A_INT_B_INT_RECORD1),
+        (A_INT_B_DINT_RECORD1, A_INT_RECORD1),
+        (A_DINT_B_DINT_RECORD1, EMPTY_RECORD1),
+        (A_DINT_B_DINT_RECORD1, A_INT_RECORD1),
+        (A_INT_B_INT_RECORD1, A_DINT_B_DINT_RECORD1),
+        (parse(json.dumps({"type": "null"})), parse(json.dumps({"type": "null"}))),
+        (NULL_SCHEMA, NULL_SCHEMA),
+        (ENUM_AB_ENUM_DEFAULT_A_RECORD, ENUM_ABC_ENUM_DEFAULT_A_RECORD),
+        (ENUM_AB_FIELD_DEFAULT_A_ENUM_DEFAULT_B_RECORD, ENUM_ABC_FIELD_DEFAULT_B_ENUM_DEFAULT_A_RECORD),
+        (NS_RECORD1, NS_RECORD2),
+    ]
+
+    for (reader, writer) in compatible_reader_writer_test_cases:
+        assert are_compatible(reader, writer)
+
+
+def test_schema_compatibility_fixed_size_mismatch():
+    incompatible_fixed_pairs = [
+        (FIXED_4_BYTES, FIXED_8_BYTES, "expected: 8, found: 4", "/size"),
+        (FIXED_8_BYTES, FIXED_4_BYTES, "expected: 4, found: 8", "/size"),
+        (A_DINT_B_DFIXED_8_BYTES_RECORD1, A_DINT_B_DFIXED_4_BYTES_RECORD1, "expected: 4, found: 8", "/fields/1/type/size"),
+        (A_DINT_B_DFIXED_4_BYTES_RECORD1, A_DINT_B_DFIXED_8_BYTES_RECORD1, "expected: 8, found: 4", "/fields/1/type/size"),
+    ]
+    for (reader, writer, message, location) in incompatible_fixed_pairs:
+        result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+        assert result.compatibility is SchemaCompatibilityType.incompatible
+        assert location in result.locations, "expected {}, found {}".format(location, result)
+        assert message in result.messages, "expected {}, found {}".format(location, result)
+
+
+def test_schema_compatibility_missing_enum_symbols():
+    incompatible_pairs = [
+        # str(set) representation
+        (ENUM1_AB_SCHEMA, ENUM1_ABC_SCHEMA, "{'C'}", "/symbols"),
+        (ENUM1_BC_SCHEMA, ENUM1_ABC_SCHEMA, "{'A'}", "/symbols"),
+        (RECORD1_WITH_ENUM_AB, RECORD1_WITH_ENUM_ABC, "{'C'}", "/fields/0/type/symbols"),
+    ]
+    for (reader, writer, message, location) in incompatible_pairs:
+        result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+        assert result.compatibility is SchemaCompatibilityType.incompatible
+        assert message in result.messages
+        assert location in result.locations
+
+
+def test_schema_compatibility_missing_union_branch():
+    incompatible_pairs = [
+        (INT_UNION_SCHEMA, INT_STRING_UNION_SCHEMA, {"reader union lacking writer type: STRING"}, {"/1"}),
+        (STRING_UNION_SCHEMA, INT_STRING_UNION_SCHEMA, {"reader union lacking writer type: INT"}, {"/0"}),
+        (INT_UNION_SCHEMA, UNION_INT_RECORD1, {"reader union lacking writer type: RECORD"}, {"/1"}),
+        (INT_UNION_SCHEMA, UNION_INT_RECORD2, {"reader union lacking writer type: RECORD"}, {"/1"}),
+        (UNION_INT_RECORD1, UNION_INT_RECORD2, {"reader union lacking writer type: RECORD"}, {"/1"}),
+        (INT_UNION_SCHEMA, UNION_INT_ENUM1_AB, {"reader union lacking writer type: ENUM"}, {"/1"}),
+        (INT_UNION_SCHEMA, UNION_INT_FIXED_4_BYTES, {"reader union lacking writer type: FIXED"}, {"/1"}),
+        (INT_UNION_SCHEMA, UNION_INT_BOOLEAN, {"reader union lacking writer type: BOOLEAN"}, {"/1"}),
+        (INT_UNION_SCHEMA, LONG_UNION_SCHEMA, {"reader union lacking writer type: LONG"}, {"/0"}),
+        (INT_UNION_SCHEMA, FLOAT_UNION_SCHEMA, {"reader union lacking writer type: FLOAT"}, {"/0"}),
+        (INT_UNION_SCHEMA, DOUBLE_UNION_SCHEMA, {"reader union lacking writer type: DOUBLE"}, {"/0"}),
+        (INT_UNION_SCHEMA, BYTES_UNION_SCHEMA, {"reader union lacking writer type: BYTES"}, {"/0"}),
+        (INT_UNION_SCHEMA, UNION_INT_ARRAY_INT, {"reader union lacking writer type: ARRAY"}, {"/1"}),
+        (INT_UNION_SCHEMA, UNION_INT_MAP_INT, {"reader union lacking writer type: MAP"}, {"/1"}),
+        (INT_UNION_SCHEMA, UNION_INT_NULL, {"reader union lacking writer type: NULL"}, {"/1"}),
+        (
+            INT_UNION_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA, {
+                "reader union lacking writer type: LONG", "reader union lacking writer type: FLOAT",
+                "reader union lacking writer type: DOUBLE"
+            }, {"/1", "/2", "/3"}
+        ),
+        (
+            A_DINT_B_DINT_UNION_RECORD1, A_DINT_B_DINT_STRING_UNION_RECORD1, {"reader union lacking writer type: STRING"},
+            {"/fields/1/type/1"}
+        ),
+    ]
+
+    for (reader, writer, message, location) in incompatible_pairs:
+        result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+        assert result.compatibility is SchemaCompatibilityType.incompatible
+        assert result.messages == message
+        assert result.locations == location
+
+
+def test_schema_compatibility_name_mismatch():
+    incompatible_pairs = [(ENUM1_AB_SCHEMA, ENUM2_AB_SCHEMA, "expected: Enum2", "/name"),
+                          (EMPTY_RECORD2, EMPTY_RECORD1, "expected: Record1", "/name"),
+                          (FIXED_4_BYTES, FIXED_4_ANOTHER_NAME, "expected: AnotherName", "/name"),
+                          (A_DINT_B_DENUM_1_RECORD1, A_DINT_B_DENUM_2_RECORD1, "expected: Enum2", "/fields/1/type/name")]
+
+    for (reader, writer, message, location) in incompatible_pairs:
+        result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+        assert result.compatibility is SchemaCompatibilityType.incompatible
+        assert message in result.messages
+        assert location in result.locations
+
+
+def test_schema_compatibility_reader_field_missing_default_value():
+    incompatible_pairs = [
+        (A_INT_RECORD1, EMPTY_RECORD1, "a", "/fields/0"),
+        (A_INT_B_DINT_RECORD1, EMPTY_RECORD1, "a", "/fields/0"),
+    ]
+    for (reader, writer, message, location) in incompatible_pairs:
+        result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+        assert result.compatibility is SchemaCompatibilityType.incompatible
+        assert len(result.messages) == 1 and len(result.locations) == 1
+        assert message == "".join(result.messages)
+        assert location == "".join(result.locations)
+
+
+def test_schema_compatibility_type_mismatch():
+    incompatible_pairs = [
+        (NULL_SCHEMA, INT_SCHEMA, "reader type: NULL not compatible with writer type: INT", "/"),
+        (NULL_SCHEMA, LONG_SCHEMA, "reader type: NULL not compatible with writer type: LONG", "/"),
+        (BOOLEAN_SCHEMA, INT_SCHEMA, "reader type: BOOLEAN not compatible with writer type: INT", "/"),
+        (INT_SCHEMA, NULL_SCHEMA, "reader type: INT not compatible with writer type: NULL", "/"),
+        (INT_SCHEMA, BOOLEAN_SCHEMA, "reader type: INT not compatible with writer type: BOOLEAN", "/"),
+        (INT_SCHEMA, LONG_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/"),
+        (INT_SCHEMA, FLOAT_SCHEMA, "reader type: INT not compatible with writer type: FLOAT", "/"),
+        (INT_SCHEMA, DOUBLE_SCHEMA, "reader type: INT not compatible with writer type: DOUBLE", "/"),
+        (LONG_SCHEMA, FLOAT_SCHEMA, "reader type: LONG not compatible with writer type: FLOAT", "/"),
+        (LONG_SCHEMA, DOUBLE_SCHEMA, "reader type: LONG not compatible with writer type: DOUBLE", "/"),
+        (FLOAT_SCHEMA, DOUBLE_SCHEMA, "reader type: FLOAT not compatible with writer type: DOUBLE", "/"),
+        (DOUBLE_SCHEMA, STRING_SCHEMA, "reader type: DOUBLE not compatible with writer type: STRING", "/"),
+        (FIXED_4_BYTES, STRING_SCHEMA, "reader type: FIXED not compatible with writer type: STRING", "/"),
+        (STRING_SCHEMA, BOOLEAN_SCHEMA, "reader type: STRING not compatible with writer type: BOOLEAN", "/"),
+        (STRING_SCHEMA, INT_SCHEMA, "reader type: STRING not compatible with writer type: INT", "/"),
+        (BYTES_SCHEMA, NULL_SCHEMA, "reader type: BYTES not compatible with writer type: NULL", "/"),
+        (BYTES_SCHEMA, INT_SCHEMA, "reader type: BYTES not compatible with writer type: INT", "/"),
+        (A_INT_RECORD1, INT_SCHEMA, "reader type: RECORD not compatible with writer type: INT", "/"),
+        (INT_ARRAY_SCHEMA, LONG_ARRAY_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/items"),
+        (INT_MAP_SCHEMA, INT_ARRAY_SCHEMA, "reader type: MAP not compatible with writer type: ARRAY", "/"),
+        (INT_ARRAY_SCHEMA, INT_MAP_SCHEMA, "reader type: ARRAY not compatible with writer type: MAP", "/"),
+        (INT_MAP_SCHEMA, LONG_MAP_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/values"),
+        (INT_SCHEMA, ENUM2_AB_SCHEMA, "reader type: INT not compatible with writer type: ENUM", "/"),
+        (ENUM2_AB_SCHEMA, INT_SCHEMA, "reader type: ENUM not compatible with writer type: INT", "/"),
+        (
+            FLOAT_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA, "reader type: FLOAT not compatible with writer type: DOUBLE",
+            "/"
+        ),
+        (LONG_SCHEMA, INT_FLOAT_UNION_SCHEMA, "reader type: LONG not compatible with writer type: FLOAT", "/"),
+        (INT_SCHEMA, INT_FLOAT_UNION_SCHEMA, "reader type: INT not compatible with writer type: FLOAT", "/"),
+        # (INT_LIST_RECORD, LONG_LIST_RECORD, "reader type: INT not compatible with writer type: LONG", "/fields/0/type"),
+        (NULL_SCHEMA, INT_SCHEMA, "reader type: NULL not compatible with writer type: INT", "/"),
+    ]
+    for (reader, writer, message, location) in incompatible_pairs:
+        result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+        assert result.compatibility is SchemaCompatibilityType.incompatible
+        assert message in result.messages
+        assert location in result.locations
+
+
+def are_compatible(reader: Schema, writer: Schema) -> bool:
+    return ReaderWriterCompatibilityChecker(
+    ).get_compatibility(reader, writer).compatibility is SchemaCompatibilityType.compatible