You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ko...@apache.org on 2021/05/14 23:43:49 UTC

[avro] branch master updated: AVRO-3097: Run Schema Compatibility checks as part of Unittest (#1166)

This is an automated email from the ASF dual-hosted git repository.

kojiromike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new 93d5e21  AVRO-3097: Run Schema Compatibility checks as part of Unittest (#1166)
93d5e21 is described below

commit 93d5e2110ca867c6c6611f8715c0934e437518c7
Author: Subhash Bhushan <su...@gmail.com>
AuthorDate: Fri May 14 16:43:39 2021 -0700

    AVRO-3097: Run Schema Compatibility checks as part of Unittest (#1166)
    
    * AVRO-3097 Run Schema Compatibility checks as part of Unittest
    
    Test methods in `test_compatibility.py` were being ignored in
    unittest runs because they were not connected to unittest.TestCase
    
    This commit encloses the test cases within a class deriving from
    `unittest.TestCase` and also changes assert statements to be
    in compliance with unittest.
    
    * AVRO-3097: Expose `compatibility` module as part of avro package
    
    `compatibility` package was not accessible through the avro package.
    This PR exposes it and allows `import avro.compatibility`.
---
 lang/py/avro/__init__.py                |   2 +-
 lang/py/avro/compatibility.py           |   2 +-
 lang/py/avro/test/test_compatibility.py | 678 ++++++++++++++++----------------
 3 files changed, 341 insertions(+), 341 deletions(-)

diff --git a/lang/py/avro/__init__.py b/lang/py/avro/__init__.py
index 165ced5..2ccfa35 100644
--- a/lang/py/avro/__init__.py
+++ b/lang/py/avro/__init__.py
@@ -21,6 +21,6 @@
 
 import pkgutil
 
-__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones', 'codecs']
+__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones', 'codecs', 'compatibility']
 
 __version__ = (pkgutil.get_data(__name__, 'VERSION.txt') or b'0.0.1+unknown').decode().strip()
diff --git a/lang/py/avro/compatibility.py b/lang/py/avro/compatibility.py
index 98d60a0..1f49c15 100644
--- a/lang/py/avro/compatibility.py
+++ b/lang/py/avro/compatibility.py
@@ -20,7 +20,7 @@
 # limitations under the License.
 from copy import copy
 from enum import Enum
-from typing import Dict, List, Optional, Set, cast
+from typing import List, Optional, Set, cast
 
 from avro.errors import AvroRuntimeException
 from avro.schema import ArraySchema, EnumSchema, Field, FixedSchema, MapSchema, NamedSchema, RecordSchema, Schema, UnionSchema
diff --git a/lang/py/avro/test/test_compatibility.py b/lang/py/avro/test/test_compatibility.py
index 729bcb7..91e136d 100644
--- a/lang/py/avro/test/test_compatibility.py
+++ b/lang/py/avro/test/test_compatibility.py
@@ -20,6 +20,7 @@
 # limitations under the License.
 
 import json
+import unittest
 
 from avro.compatibility import ReaderWriterCompatibilityChecker, SchemaCompatibilityType, SchemaType
 from avro.schema import ArraySchema, MapSchema, Names, PrimitiveSchema, Schema, UnionSchema, parse
@@ -373,356 +374,355 @@ RECORD1_WITH_ENUM_ABC = parse(
 )
 
 
-def test_simple_schema_promotion():
-    field_alias_reader = parse(
-        json.dumps({
-            "name": "foo",
-            "type": "record",
-            "fields": [{
-                "type": "int",
-                "name": "bar",
-                "aliases": ["f1"]
-            }]
-        })
-    )
-    record_alias_reader = parse(
-        json.dumps({
-            "name": "other",
-            "type": "record",
-            "fields": [{
-                "type": "int",
-                "name": "f1"
-            }],
-            "aliases": ["foo"]
-        })
-    )
+class TestCompatibility(unittest.TestCase):
+    def test_simple_schema_promotion(self):
+        field_alias_reader = parse(
+            json.dumps({
+                "name": "foo",
+                "type": "record",
+                "fields": [{
+                    "type": "int",
+                    "name": "bar",
+                    "aliases": ["f1"]
+                }]
+            })
+        )
+        record_alias_reader = parse(
+            json.dumps({
+                "name": "other",
+                "type": "record",
+                "fields": [{
+                    "type": "int",
+                    "name": "f1"
+                }],
+                "aliases": ["foo"]
+            })
+        )
 
-    writer = parse(
-        json.dumps({
-            "name": "foo",
-            "type": "record",
-            "fields": [{
-                "type": "int",
-                "name": "f1"
-            }, {
-                "type": "string",
-                "name": "f2",
-            }]
-        })
-    )
-    # alias testing
-    res = ReaderWriterCompatibilityChecker().get_compatibility(field_alias_reader, writer)
-    assert res.compatibility is SchemaCompatibilityType.compatible, res.locations
-    res = ReaderWriterCompatibilityChecker().get_compatibility(record_alias_reader, writer)
-    assert res.compatibility is SchemaCompatibilityType.compatible, res.locations
+        writer = parse(
+            json.dumps({
+                "name": "foo",
+                "type": "record",
+                "fields": [{
+                    "type": "int",
+                    "name": "f1"
+                }, {
+                    "type": "string",
+                    "name": "f2",
+                }]
+            })
+        )
+        # alias testing
+        res = ReaderWriterCompatibilityChecker().get_compatibility(field_alias_reader, writer)
+        self.assertIs(res.compatibility, SchemaCompatibilityType.compatible, res.locations)
+        res = ReaderWriterCompatibilityChecker().get_compatibility(record_alias_reader, writer)
+        self.assertIs(res.compatibility, SchemaCompatibilityType.compatible, res.locations)
 
-
-def test_schema_compatibility():
-    # testValidateSchemaPairMissingField
-    writer = parse(
-        json.dumps({
-            "type": SchemaType.RECORD,
-            "name": "Record",
-            "fields": [{
-                "name": "oldField1",
-                "type": SchemaType.INT
-            }, {
-                "name": "oldField2",
-                "type": SchemaType.STRING
-            }]
-        })
-    )
-    reader = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record", "fields": [{"name": "oldField1", "type": SchemaType.INT}]}))
-    assert are_compatible(reader, writer)
-    # testValidateSchemaPairMissingSecondField
-    reader = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record", "fields": [{"name": "oldField2", "type": SchemaType.STRING}]}))
-    assert are_compatible(reader, writer)
-    # testValidateSchemaPairAllFields
-    reader = parse(
-        json.dumps({
-            "type": SchemaType.RECORD,
+    def test_schema_compatibility(self):
+        # testValidateSchemaPairMissingField
+        writer = parse(
+            json.dumps({
+                "type": SchemaType.RECORD,
+                "name": "Record",
+                "fields": [{
+                    "name": "oldField1",
+                    "type": SchemaType.INT
+                }, {
+                    "name": "oldField2",
+                    "type": SchemaType.STRING
+                }]
+            })
+        )
+        reader = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record", "fields": [{"name": "oldField1", "type": SchemaType.INT}]}))
+        self.assertTrue(self.are_compatible(reader, writer))
+        # testValidateSchemaPairMissingSecondField
+        reader = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record", "fields": [{"name": "oldField2", "type": SchemaType.STRING}]}))
+        self.assertTrue(self.are_compatible(reader, writer))
+        # testValidateSchemaPairAllFields
+        reader = parse(
+            json.dumps({
+                "type": SchemaType.RECORD,
+                "name": "Record",
+                "fields": [{
+                    "name": "oldField1",
+                    "type": SchemaType.INT
+                }, {
+                    "name": "oldField2",
+                    "type": SchemaType.STRING
+                }]
+            })
+        )
+        self.assertTrue(self.are_compatible(reader, writer))
+        # testValidateSchemaNewFieldWithDefault
+        reader = parse(
+            json.dumps({
+                "type": SchemaType.RECORD,
+                "name": "Record",
+                "fields": [{
+                    "name": "oldField1",
+                    "type": SchemaType.INT
+                }, {
+                    "name": "newField2",
+                    "type": SchemaType.INT,
+                    "default": 42
+                }]
+            })
+        )
+        self.assertTrue(self.are_compatible(reader, writer))
+        # testValidateSchemaNewField
+        reader = parse(
+            json.dumps({
+                "type": SchemaType.RECORD,
+                "name": "Record",
+                "fields": [{
+                    "name": "oldField1",
+                    "type": SchemaType.INT
+                }, {
+                    "name": "newField2",
+                    "type": SchemaType.INT
+                }]
+            })
+        )
+        self.assertFalse(self.are_compatible(reader, writer))
+        # testValidateArrayWriterSchema
+        writer = parse(json.dumps({"type": SchemaType.ARRAY, "items": {"type": SchemaType.STRING}}))
+        reader = parse(json.dumps({"type": SchemaType.ARRAY, "items": {"type": SchemaType.STRING}}))
+        self.assertTrue(self.are_compatible(reader, writer))
+        reader = parse(json.dumps({"type": SchemaType.MAP, "values": {"type": SchemaType.STRING}}))
+        self.assertFalse(self.are_compatible(reader, writer))
+        # testValidatePrimitiveWriterSchema
+        writer = parse(json.dumps({"type": SchemaType.STRING}))
+        reader = parse(json.dumps({"type": SchemaType.STRING}))
+        self.assertTrue(self.are_compatible(reader, writer))
+        reader = parse(json.dumps({"type": SchemaType.INT}))
+        self.assertFalse(self.are_compatible(reader, writer))
+        # testUnionReaderWriterSubsetIncompatibility
+        writer = parse(
+            json.dumps({
+                "name": "Record",
+                "type": "record",
+                "fields": [{
+                    "name": "f1",
+                    "type": [SchemaType.INT, SchemaType.STRING, SchemaType.LONG]
+                }]
+            })
+        )
+        reader = parse(json.dumps({
             "name": "Record",
-            "fields": [{
-                "name": "oldField1",
-                "type": SchemaType.INT
-            }, {
-                "name": "oldField2",
-                "type": SchemaType.STRING
-            }]
-        })
-    )
-    assert are_compatible(reader, writer)
-    # testValidateSchemaNewFieldWithDefault
-    reader = parse(
-        json.dumps({
             "type": SchemaType.RECORD,
-            "name": "Record",
-            "fields": [{
-                "name": "oldField1",
-                "type": SchemaType.INT
-            }, {
-                "name": "newField2",
-                "type": SchemaType.INT,
-                "default": 42
-            }]
-        })
-    )
-    assert are_compatible(reader, writer)
-    # testValidateSchemaNewField
-    reader = parse(
-        json.dumps({
-            "type": SchemaType.RECORD,
-            "name": "Record",
-            "fields": [{
-                "name": "oldField1",
-                "type": SchemaType.INT
-            }, {
-                "name": "newField2",
-                "type": SchemaType.INT
-            }]
-        })
-    )
-    assert not are_compatible(reader, writer)
-    # testValidateArrayWriterSchema
-    writer = parse(json.dumps({"type": SchemaType.ARRAY, "items": {"type": SchemaType.STRING}}))
-    reader = parse(json.dumps({"type": SchemaType.ARRAY, "items": {"type": SchemaType.STRING}}))
-    assert are_compatible(reader, writer)
-    reader = parse(json.dumps({"type": SchemaType.MAP, "values": {"type": SchemaType.STRING}}))
-    assert not are_compatible(reader, writer)
-    # testValidatePrimitiveWriterSchema
-    writer = parse(json.dumps({"type": SchemaType.STRING}))
-    reader = parse(json.dumps({"type": SchemaType.STRING}))
-    assert are_compatible(reader, writer)
-    reader = parse(json.dumps({"type": SchemaType.INT}))
-    assert not are_compatible(reader, writer)
-    # testUnionReaderWriterSubsetIncompatibility
-    writer = parse(
-        json.dumps({
-            "name": "Record",
-            "type": "record",
-            "fields": [{
-                "name": "f1",
-                "type": [SchemaType.INT, SchemaType.STRING, SchemaType.LONG]
-            }]
-        })
-    )
-    reader = parse(json.dumps({"name": "Record", "type": SchemaType.RECORD, "fields": [{"name": "f1", "type": [SchemaType.INT, SchemaType.STRING]}]}))
-    reader = reader.fields[0].type
-    writer = writer.fields[0].type
-    assert isinstance(reader, UnionSchema)
-    assert isinstance(writer, UnionSchema)
-    assert not are_compatible(reader, writer)
-    # testReaderWriterCompatibility
-    compatible_reader_writer_test_cases = [
-        (BOOLEAN_SCHEMA, BOOLEAN_SCHEMA),
-        (INT_SCHEMA, INT_SCHEMA),
-        (LONG_SCHEMA, INT_SCHEMA),
-        (LONG_SCHEMA, LONG_SCHEMA),
-        (FLOAT_SCHEMA, INT_SCHEMA),
-        (FLOAT_SCHEMA, LONG_SCHEMA),
-        (DOUBLE_SCHEMA, LONG_SCHEMA),
-        (DOUBLE_SCHEMA, INT_SCHEMA),
-        (DOUBLE_SCHEMA, FLOAT_SCHEMA),
-        (STRING_SCHEMA, STRING_SCHEMA),
-        (BYTES_SCHEMA, BYTES_SCHEMA),
-        (STRING_SCHEMA, BYTES_SCHEMA),
-        (BYTES_SCHEMA, STRING_SCHEMA),
-        (INT_ARRAY_SCHEMA, INT_ARRAY_SCHEMA),
-        (LONG_ARRAY_SCHEMA, INT_ARRAY_SCHEMA),
-        (INT_MAP_SCHEMA, INT_MAP_SCHEMA),
-        (LONG_MAP_SCHEMA, INT_MAP_SCHEMA),
-        (ENUM1_AB_SCHEMA, ENUM1_AB_SCHEMA),
-        (ENUM1_ABC_SCHEMA, ENUM1_AB_SCHEMA),
-        # Union related pairs
-        (EMPTY_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
-        (FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
-        (FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
-        (FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
-        (FLOAT_UNION_SCHEMA, INT_LONG_UNION_SCHEMA),
-        (INT_UNION_SCHEMA, INT_UNION_SCHEMA),
-        (INT_STRING_UNION_SCHEMA, STRING_INT_UNION_SCHEMA),
-        (INT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
-        (LONG_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
-        (LONG_UNION_SCHEMA, INT_UNION_SCHEMA),
-        (FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
-        (DOUBLE_UNION_SCHEMA, INT_UNION_SCHEMA),
-        (FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
-        (DOUBLE_UNION_SCHEMA, LONG_UNION_SCHEMA),
-        (FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
-        (DOUBLE_UNION_SCHEMA, FLOAT_UNION_SCHEMA),
-        (STRING_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
-        (STRING_UNION_SCHEMA, BYTES_UNION_SCHEMA),
-        (BYTES_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
-        (BYTES_UNION_SCHEMA, STRING_UNION_SCHEMA),
-        (DOUBLE_UNION_SCHEMA, INT_FLOAT_UNION_SCHEMA),
-        # Readers capable of reading all branches of a union are compatible
-        (FLOAT_SCHEMA, INT_FLOAT_UNION_SCHEMA),
-        (LONG_SCHEMA, INT_LONG_UNION_SCHEMA),
-        (DOUBLE_SCHEMA, INT_FLOAT_UNION_SCHEMA),
-        (DOUBLE_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA),
-        # Special case of singleton unions:
-        (FLOAT_SCHEMA, FLOAT_UNION_SCHEMA),
-        (INT_UNION_SCHEMA, INT_SCHEMA),
-        (INT_SCHEMA, INT_UNION_SCHEMA),
-        # Fixed types
-        (FIXED_4_BYTES, FIXED_4_BYTES),
-        # Tests involving records:
-        (EMPTY_RECORD1, EMPTY_RECORD1),
-        (EMPTY_RECORD1, A_INT_RECORD1),
-        (A_INT_RECORD1, A_INT_RECORD1),
-        (A_DINT_RECORD1, A_INT_RECORD1),
-        (A_DINT_RECORD1, A_DINT_RECORD1),
-        (A_INT_RECORD1, A_DINT_RECORD1),
-        (A_LONG_RECORD1, A_INT_RECORD1),
-        (A_INT_RECORD1, A_INT_B_INT_RECORD1),
-        (A_DINT_RECORD1, A_INT_B_INT_RECORD1),
-        (A_INT_B_DINT_RECORD1, A_INT_RECORD1),
-        (A_DINT_B_DINT_RECORD1, EMPTY_RECORD1),
-        (A_DINT_B_DINT_RECORD1, A_INT_RECORD1),
-        (A_INT_B_INT_RECORD1, A_DINT_B_DINT_RECORD1),
-        (parse(json.dumps({"type": "null"})), parse(json.dumps({"type": "null"}))),
-        (NULL_SCHEMA, NULL_SCHEMA),
-        (ENUM_AB_ENUM_DEFAULT_A_RECORD, ENUM_ABC_ENUM_DEFAULT_A_RECORD),
-        (ENUM_AB_FIELD_DEFAULT_A_ENUM_DEFAULT_B_RECORD, ENUM_ABC_FIELD_DEFAULT_B_ENUM_DEFAULT_A_RECORD),
-        (NS_RECORD1, NS_RECORD2),
-    ]
-
-    for (reader, writer) in compatible_reader_writer_test_cases:
-        assert are_compatible(reader, writer)
-
-
-def test_schema_compatibility_fixed_size_mismatch():
-    incompatible_fixed_pairs = [
-        (FIXED_4_BYTES, FIXED_8_BYTES, "expected: 8, found: 4", "/size"),
-        (FIXED_8_BYTES, FIXED_4_BYTES, "expected: 4, found: 8", "/size"),
-        (A_DINT_B_DFIXED_8_BYTES_RECORD1, A_DINT_B_DFIXED_4_BYTES_RECORD1, "expected: 4, found: 8", "/fields/1/type/size"),
-        (A_DINT_B_DFIXED_4_BYTES_RECORD1, A_DINT_B_DFIXED_8_BYTES_RECORD1, "expected: 8, found: 4", "/fields/1/type/size"),
-    ]
-    for (reader, writer, message, location) in incompatible_fixed_pairs:
-        result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
-        assert result.compatibility is SchemaCompatibilityType.incompatible
-        assert location in result.locations, "expected {}, found {}".format(location, result)
-        assert message in result.messages, "expected {}, found {}".format(location, result)
-
-
-def test_schema_compatibility_missing_enum_symbols():
-    incompatible_pairs = [
-        # str(set) representation
-        (ENUM1_AB_SCHEMA, ENUM1_ABC_SCHEMA, "{'C'}", "/symbols"),
-        (ENUM1_BC_SCHEMA, ENUM1_ABC_SCHEMA, "{'A'}", "/symbols"),
-        (RECORD1_WITH_ENUM_AB, RECORD1_WITH_ENUM_ABC, "{'C'}", "/fields/0/type/symbols"),
-    ]
-    for (reader, writer, message, location) in incompatible_pairs:
-        result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
-        assert result.compatibility is SchemaCompatibilityType.incompatible
-        assert message in result.messages
-        assert location in result.locations
-
-
-def test_schema_compatibility_missing_union_branch():
-    incompatible_pairs = [
-        (INT_UNION_SCHEMA, INT_STRING_UNION_SCHEMA, {"reader union lacking writer type: STRING"}, {"/1"}),
-        (STRING_UNION_SCHEMA, INT_STRING_UNION_SCHEMA, {"reader union lacking writer type: INT"}, {"/0"}),
-        (INT_UNION_SCHEMA, UNION_INT_RECORD1, {"reader union lacking writer type: RECORD"}, {"/1"}),
-        (INT_UNION_SCHEMA, UNION_INT_RECORD2, {"reader union lacking writer type: RECORD"}, {"/1"}),
-        (UNION_INT_RECORD1, UNION_INT_RECORD2, {"reader union lacking writer type: RECORD"}, {"/1"}),
-        (INT_UNION_SCHEMA, UNION_INT_ENUM1_AB, {"reader union lacking writer type: ENUM"}, {"/1"}),
-        (INT_UNION_SCHEMA, UNION_INT_FIXED_4_BYTES, {"reader union lacking writer type: FIXED"}, {"/1"}),
-        (INT_UNION_SCHEMA, UNION_INT_BOOLEAN, {"reader union lacking writer type: BOOLEAN"}, {"/1"}),
-        (INT_UNION_SCHEMA, LONG_UNION_SCHEMA, {"reader union lacking writer type: LONG"}, {"/0"}),
-        (INT_UNION_SCHEMA, FLOAT_UNION_SCHEMA, {"reader union lacking writer type: FLOAT"}, {"/0"}),
-        (INT_UNION_SCHEMA, DOUBLE_UNION_SCHEMA, {"reader union lacking writer type: DOUBLE"}, {"/0"}),
-        (INT_UNION_SCHEMA, BYTES_UNION_SCHEMA, {"reader union lacking writer type: BYTES"}, {"/0"}),
-        (INT_UNION_SCHEMA, UNION_INT_ARRAY_INT, {"reader union lacking writer type: ARRAY"}, {"/1"}),
-        (INT_UNION_SCHEMA, UNION_INT_MAP_INT, {"reader union lacking writer type: MAP"}, {"/1"}),
-        (INT_UNION_SCHEMA, UNION_INT_NULL, {"reader union lacking writer type: NULL"}, {"/1"}),
-        (
-            INT_UNION_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA, {
-                "reader union lacking writer type: LONG", "reader union lacking writer type: FLOAT",
-                "reader union lacking writer type: DOUBLE"
-            }, {"/1", "/2", "/3"}
-        ),
-        (
-            A_DINT_B_DINT_UNION_RECORD1, A_DINT_B_DINT_STRING_UNION_RECORD1, {"reader union lacking writer type: STRING"},
-            {"/fields/1/type/1"}
-        ),
-    ]
+            "fields": [
+                {"name": "f1", "type": [SchemaType.INT, SchemaType.STRING]}]}))
+        reader = reader.fields[0].type
+        writer = writer.fields[0].type
+        self.assertIsInstance(reader, UnionSchema)
+        self.assertIsInstance(writer, UnionSchema)
+        self.assertFalse(self.are_compatible(reader, writer))
+        # testReaderWriterCompatibility
+        compatible_reader_writer_test_cases = [
+            (BOOLEAN_SCHEMA, BOOLEAN_SCHEMA),
+            (INT_SCHEMA, INT_SCHEMA),
+            (LONG_SCHEMA, INT_SCHEMA),
+            (LONG_SCHEMA, LONG_SCHEMA),
+            (FLOAT_SCHEMA, INT_SCHEMA),
+            (FLOAT_SCHEMA, LONG_SCHEMA),
+            (DOUBLE_SCHEMA, LONG_SCHEMA),
+            (DOUBLE_SCHEMA, INT_SCHEMA),
+            (DOUBLE_SCHEMA, FLOAT_SCHEMA),
+            (STRING_SCHEMA, STRING_SCHEMA),
+            (BYTES_SCHEMA, BYTES_SCHEMA),
+            (STRING_SCHEMA, BYTES_SCHEMA),
+            (BYTES_SCHEMA, STRING_SCHEMA),
+            (INT_ARRAY_SCHEMA, INT_ARRAY_SCHEMA),
+            (LONG_ARRAY_SCHEMA, INT_ARRAY_SCHEMA),
+            (INT_MAP_SCHEMA, INT_MAP_SCHEMA),
+            (LONG_MAP_SCHEMA, INT_MAP_SCHEMA),
+            (ENUM1_AB_SCHEMA, ENUM1_AB_SCHEMA),
+            (ENUM1_ABC_SCHEMA, ENUM1_AB_SCHEMA),
+            # Union related pairs
+            (EMPTY_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+            (FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+            (FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
+            (FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
+            (FLOAT_UNION_SCHEMA, INT_LONG_UNION_SCHEMA),
+            (INT_UNION_SCHEMA, INT_UNION_SCHEMA),
+            (INT_STRING_UNION_SCHEMA, STRING_INT_UNION_SCHEMA),
+            (INT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+            (LONG_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+            (LONG_UNION_SCHEMA, INT_UNION_SCHEMA),
+            (FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
+            (DOUBLE_UNION_SCHEMA, INT_UNION_SCHEMA),
+            (FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
+            (DOUBLE_UNION_SCHEMA, LONG_UNION_SCHEMA),
+            (FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+            (DOUBLE_UNION_SCHEMA, FLOAT_UNION_SCHEMA),
+            (STRING_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+            (STRING_UNION_SCHEMA, BYTES_UNION_SCHEMA),
+            (BYTES_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+            (BYTES_UNION_SCHEMA, STRING_UNION_SCHEMA),
+            (DOUBLE_UNION_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+            # Readers capable of reading all branches of a union are compatible
+            (FLOAT_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+            (LONG_SCHEMA, INT_LONG_UNION_SCHEMA),
+            (DOUBLE_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+            (DOUBLE_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA),
+            # Special case of singleton unions:
+            (FLOAT_SCHEMA, FLOAT_UNION_SCHEMA),
+            (INT_UNION_SCHEMA, INT_SCHEMA),
+            (INT_SCHEMA, INT_UNION_SCHEMA),
+            # Fixed types
+            (FIXED_4_BYTES, FIXED_4_BYTES),
+            # Tests involving records:
+            (EMPTY_RECORD1, EMPTY_RECORD1),
+            (EMPTY_RECORD1, A_INT_RECORD1),
+            (A_INT_RECORD1, A_INT_RECORD1),
+            (A_DINT_RECORD1, A_INT_RECORD1),
+            (A_DINT_RECORD1, A_DINT_RECORD1),
+            (A_INT_RECORD1, A_DINT_RECORD1),
+            (A_LONG_RECORD1, A_INT_RECORD1),
+            (A_INT_RECORD1, A_INT_B_INT_RECORD1),
+            (A_DINT_RECORD1, A_INT_B_INT_RECORD1),
+            (A_INT_B_DINT_RECORD1, A_INT_RECORD1),
+            (A_DINT_B_DINT_RECORD1, EMPTY_RECORD1),
+            (A_DINT_B_DINT_RECORD1, A_INT_RECORD1),
+            (A_INT_B_INT_RECORD1, A_DINT_B_DINT_RECORD1),
+            (parse(json.dumps({"type": "null"})), parse(json.dumps({"type": "null"}))),
+            (NULL_SCHEMA, NULL_SCHEMA),
+            (ENUM_AB_ENUM_DEFAULT_A_RECORD, ENUM_ABC_ENUM_DEFAULT_A_RECORD),
+            (ENUM_AB_FIELD_DEFAULT_A_ENUM_DEFAULT_B_RECORD, ENUM_ABC_FIELD_DEFAULT_B_ENUM_DEFAULT_A_RECORD),
+            (NS_RECORD1, NS_RECORD2),
+        ]
 
-    for (reader, writer, message, location) in incompatible_pairs:
-        result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
-        assert result.compatibility is SchemaCompatibilityType.incompatible
-        assert result.messages == message
-        assert result.locations == location
+        for (reader, writer) in compatible_reader_writer_test_cases:
+            self.assertTrue(self.are_compatible(reader, writer))
 
+    def test_schema_compatibility_fixed_size_mismatch(self):
+        incompatible_fixed_pairs = [
+            (FIXED_4_BYTES, FIXED_8_BYTES, "expected: 8, found: 4", "/size"),
+            (FIXED_8_BYTES, FIXED_4_BYTES, "expected: 4, found: 8", "/size"),
+            (A_DINT_B_DFIXED_8_BYTES_RECORD1, A_DINT_B_DFIXED_4_BYTES_RECORD1, "expected: 4, found: 8", "/fields/1/type/size"),
+            (A_DINT_B_DFIXED_4_BYTES_RECORD1, A_DINT_B_DFIXED_8_BYTES_RECORD1, "expected: 8, found: 4", "/fields/1/type/size"),
+        ]
+        for (reader, writer, message, location) in incompatible_fixed_pairs:
+            result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+            self.assertIs(result.compatibility, SchemaCompatibilityType.incompatible)
+            self.assertIn(location, result.locations, "expected {}, found {}".format(location, result))
+            self.assertIn(message, result.messages, "expected {}, found {}".format(location, result))
 
-def test_schema_compatibility_name_mismatch():
-    incompatible_pairs = [(ENUM1_AB_SCHEMA, ENUM2_AB_SCHEMA, "expected: Enum2", "/name"),
-                          (EMPTY_RECORD2, EMPTY_RECORD1, "expected: Record1", "/name"),
-                          (FIXED_4_BYTES, FIXED_4_ANOTHER_NAME, "expected: AnotherName", "/name"),
-                          (A_DINT_B_DENUM_1_RECORD1, A_DINT_B_DENUM_2_RECORD1, "expected: Enum2", "/fields/1/type/name")]
+    def test_schema_compatibility_missing_enum_symbols(self):
+        incompatible_pairs = [
+            # str(set) representation
+            (ENUM1_AB_SCHEMA, ENUM1_ABC_SCHEMA, "{'C'}", "/symbols"),
+            (ENUM1_BC_SCHEMA, ENUM1_ABC_SCHEMA, "{'A'}", "/symbols"),
+            (RECORD1_WITH_ENUM_AB, RECORD1_WITH_ENUM_ABC, "{'C'}", "/fields/0/type/symbols"),
+        ]
+        for (reader, writer, message, location) in incompatible_pairs:
+            result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+            self.assertIs(result.compatibility, SchemaCompatibilityType.incompatible)
+            self.assertIn(message, result.messages)
+            self.assertIn(location, result.locations)
 
-    for (reader, writer, message, location) in incompatible_pairs:
-        result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
-        assert result.compatibility is SchemaCompatibilityType.incompatible
-        assert message in result.messages
-        assert location in result.locations
+    def test_schema_compatibility_missing_union_branch(self):
+        incompatible_pairs = [
+            (INT_UNION_SCHEMA, INT_STRING_UNION_SCHEMA, {"reader union lacking writer type: STRING"}, {"/1"}),
+            (STRING_UNION_SCHEMA, INT_STRING_UNION_SCHEMA, {"reader union lacking writer type: INT"}, {"/0"}),
+            (INT_UNION_SCHEMA, UNION_INT_RECORD1, {"reader union lacking writer type: RECORD"}, {"/1"}),
+            (INT_UNION_SCHEMA, UNION_INT_RECORD2, {"reader union lacking writer type: RECORD"}, {"/1"}),
+            (UNION_INT_RECORD1, UNION_INT_RECORD2, {"reader union lacking writer type: RECORD"}, {"/1"}),
+            (INT_UNION_SCHEMA, UNION_INT_ENUM1_AB, {"reader union lacking writer type: ENUM"}, {"/1"}),
+            (INT_UNION_SCHEMA, UNION_INT_FIXED_4_BYTES, {"reader union lacking writer type: FIXED"}, {"/1"}),
+            (INT_UNION_SCHEMA, UNION_INT_BOOLEAN, {"reader union lacking writer type: BOOLEAN"}, {"/1"}),
+            (INT_UNION_SCHEMA, LONG_UNION_SCHEMA, {"reader union lacking writer type: LONG"}, {"/0"}),
+            (INT_UNION_SCHEMA, FLOAT_UNION_SCHEMA, {"reader union lacking writer type: FLOAT"}, {"/0"}),
+            (INT_UNION_SCHEMA, DOUBLE_UNION_SCHEMA, {"reader union lacking writer type: DOUBLE"}, {"/0"}),
+            (INT_UNION_SCHEMA, BYTES_UNION_SCHEMA, {"reader union lacking writer type: BYTES"}, {"/0"}),
+            (INT_UNION_SCHEMA, UNION_INT_ARRAY_INT, {"reader union lacking writer type: ARRAY"}, {"/1"}),
+            (INT_UNION_SCHEMA, UNION_INT_MAP_INT, {"reader union lacking writer type: MAP"}, {"/1"}),
+            (INT_UNION_SCHEMA, UNION_INT_NULL, {"reader union lacking writer type: NULL"}, {"/1"}),
+            (
+                INT_UNION_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA, {
+                    "reader union lacking writer type: LONG", "reader union lacking writer type: FLOAT",
+                    "reader union lacking writer type: DOUBLE"
+                }, {"/1", "/2", "/3"}
+            ),
+            (
+                A_DINT_B_DINT_UNION_RECORD1, A_DINT_B_DINT_STRING_UNION_RECORD1, {"reader union lacking writer type: STRING"},
+                {"/fields/1/type/1"}
+            ),
+        ]
 
+        for (reader, writer, message, location) in incompatible_pairs:
+            result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+            self.assertIs(result.compatibility, SchemaCompatibilityType.incompatible)
+            self.assertEqual(result.messages, message)
+            self.assertEqual(result.locations, location)
 
-def test_schema_compatibility_reader_field_missing_default_value():
-    incompatible_pairs = [
-        (A_INT_RECORD1, EMPTY_RECORD1, "a", "/fields/0"),
-        (A_INT_B_DINT_RECORD1, EMPTY_RECORD1, "a", "/fields/0"),
-    ]
-    for (reader, writer, message, location) in incompatible_pairs:
-        result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
-        assert result.compatibility is SchemaCompatibilityType.incompatible
-        assert len(result.messages) == 1 and len(result.locations) == 1
-        assert message == "".join(result.messages)
-        assert location == "".join(result.locations)
+    def test_schema_compatibility_name_mismatch(self):
+        incompatible_pairs = [
+            (ENUM1_AB_SCHEMA, ENUM2_AB_SCHEMA, "expected: Enum2", "/name"),
+            (EMPTY_RECORD2, EMPTY_RECORD1, "expected: Record1", "/name"),
+            (FIXED_4_BYTES, FIXED_4_ANOTHER_NAME, "expected: AnotherName", "/name"),
+            (A_DINT_B_DENUM_1_RECORD1, A_DINT_B_DENUM_2_RECORD1, "expected: Enum2", "/fields/1/type/name")]
 
+        for (reader, writer, message, location) in incompatible_pairs:
+            result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+            self.assertIs(result.compatibility, SchemaCompatibilityType.incompatible)
+            self.assertIn(message, result.messages)
+            self.assertIn(location, result.locations)
 
-def test_schema_compatibility_type_mismatch():
-    incompatible_pairs = [
-        (NULL_SCHEMA, INT_SCHEMA, "reader type: NULL not compatible with writer type: INT", "/"),
-        (NULL_SCHEMA, LONG_SCHEMA, "reader type: NULL not compatible with writer type: LONG", "/"),
-        (BOOLEAN_SCHEMA, INT_SCHEMA, "reader type: BOOLEAN not compatible with writer type: INT", "/"),
-        (INT_SCHEMA, NULL_SCHEMA, "reader type: INT not compatible with writer type: NULL", "/"),
-        (INT_SCHEMA, BOOLEAN_SCHEMA, "reader type: INT not compatible with writer type: BOOLEAN", "/"),
-        (INT_SCHEMA, LONG_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/"),
-        (INT_SCHEMA, FLOAT_SCHEMA, "reader type: INT not compatible with writer type: FLOAT", "/"),
-        (INT_SCHEMA, DOUBLE_SCHEMA, "reader type: INT not compatible with writer type: DOUBLE", "/"),
-        (LONG_SCHEMA, FLOAT_SCHEMA, "reader type: LONG not compatible with writer type: FLOAT", "/"),
-        (LONG_SCHEMA, DOUBLE_SCHEMA, "reader type: LONG not compatible with writer type: DOUBLE", "/"),
-        (FLOAT_SCHEMA, DOUBLE_SCHEMA, "reader type: FLOAT not compatible with writer type: DOUBLE", "/"),
-        (DOUBLE_SCHEMA, STRING_SCHEMA, "reader type: DOUBLE not compatible with writer type: STRING", "/"),
-        (FIXED_4_BYTES, STRING_SCHEMA, "reader type: FIXED not compatible with writer type: STRING", "/"),
-        (STRING_SCHEMA, BOOLEAN_SCHEMA, "reader type: STRING not compatible with writer type: BOOLEAN", "/"),
-        (STRING_SCHEMA, INT_SCHEMA, "reader type: STRING not compatible with writer type: INT", "/"),
-        (BYTES_SCHEMA, NULL_SCHEMA, "reader type: BYTES not compatible with writer type: NULL", "/"),
-        (BYTES_SCHEMA, INT_SCHEMA, "reader type: BYTES not compatible with writer type: INT", "/"),
-        (A_INT_RECORD1, INT_SCHEMA, "reader type: RECORD not compatible with writer type: INT", "/"),
-        (INT_ARRAY_SCHEMA, LONG_ARRAY_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/items"),
-        (INT_MAP_SCHEMA, INT_ARRAY_SCHEMA, "reader type: MAP not compatible with writer type: ARRAY", "/"),
-        (INT_ARRAY_SCHEMA, INT_MAP_SCHEMA, "reader type: ARRAY not compatible with writer type: MAP", "/"),
-        (INT_MAP_SCHEMA, LONG_MAP_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/values"),
-        (INT_SCHEMA, ENUM2_AB_SCHEMA, "reader type: INT not compatible with writer type: ENUM", "/"),
-        (ENUM2_AB_SCHEMA, INT_SCHEMA, "reader type: ENUM not compatible with writer type: INT", "/"),
-        (
-            FLOAT_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA, "reader type: FLOAT not compatible with writer type: DOUBLE",
-            "/"
-        ),
-        (LONG_SCHEMA, INT_FLOAT_UNION_SCHEMA, "reader type: LONG not compatible with writer type: FLOAT", "/"),
-        (INT_SCHEMA, INT_FLOAT_UNION_SCHEMA, "reader type: INT not compatible with writer type: FLOAT", "/"),
-        # (INT_LIST_RECORD, LONG_LIST_RECORD, "reader type: INT not compatible with writer type: LONG", "/fields/0/type"),
-        (NULL_SCHEMA, INT_SCHEMA, "reader type: NULL not compatible with writer type: INT", "/"),
-    ]
-    for (reader, writer, message, location) in incompatible_pairs:
-        result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
-        assert result.compatibility is SchemaCompatibilityType.incompatible
-        assert message in result.messages
-        assert location in result.locations
+    def test_schema_compatibility_reader_field_missing_default_value(self):
+        incompatible_pairs = [
+            (A_INT_RECORD1, EMPTY_RECORD1, "a", "/fields/0"),
+            (A_INT_B_DINT_RECORD1, EMPTY_RECORD1, "a", "/fields/0"),
+        ]
+        for (reader, writer, message, location) in incompatible_pairs:
+            result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+            self.assertIs(result.compatibility, SchemaCompatibilityType.incompatible)
+            self.assertEqual(len(result.messages), 1)
+            self.assertEqual(len(result.locations), 1)
+            self.assertEqual(message, "".join(result.messages))
+            self.assertEqual(location, "".join(result.locations))
 
+    def test_schema_compatibility_type_mismatch(self):
+        incompatible_pairs = [
+            (NULL_SCHEMA, INT_SCHEMA, "reader type: NULL not compatible with writer type: INT", "/"),
+            (NULL_SCHEMA, LONG_SCHEMA, "reader type: NULL not compatible with writer type: LONG", "/"),
+            (BOOLEAN_SCHEMA, INT_SCHEMA, "reader type: BOOLEAN not compatible with writer type: INT", "/"),
+            (INT_SCHEMA, NULL_SCHEMA, "reader type: INT not compatible with writer type: NULL", "/"),
+            (INT_SCHEMA, BOOLEAN_SCHEMA, "reader type: INT not compatible with writer type: BOOLEAN", "/"),
+            (INT_SCHEMA, LONG_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/"),
+            (INT_SCHEMA, FLOAT_SCHEMA, "reader type: INT not compatible with writer type: FLOAT", "/"),
+            (INT_SCHEMA, DOUBLE_SCHEMA, "reader type: INT not compatible with writer type: DOUBLE", "/"),
+            (LONG_SCHEMA, FLOAT_SCHEMA, "reader type: LONG not compatible with writer type: FLOAT", "/"),
+            (LONG_SCHEMA, DOUBLE_SCHEMA, "reader type: LONG not compatible with writer type: DOUBLE", "/"),
+            (FLOAT_SCHEMA, DOUBLE_SCHEMA, "reader type: FLOAT not compatible with writer type: DOUBLE", "/"),
+            (DOUBLE_SCHEMA, STRING_SCHEMA, "reader type: DOUBLE not compatible with writer type: STRING", "/"),
+            (FIXED_4_BYTES, STRING_SCHEMA, "reader type: FIXED not compatible with writer type: STRING", "/"),
+            (STRING_SCHEMA, BOOLEAN_SCHEMA, "reader type: STRING not compatible with writer type: BOOLEAN", "/"),
+            (STRING_SCHEMA, INT_SCHEMA, "reader type: STRING not compatible with writer type: INT", "/"),
+            (BYTES_SCHEMA, NULL_SCHEMA, "reader type: BYTES not compatible with writer type: NULL", "/"),
+            (BYTES_SCHEMA, INT_SCHEMA, "reader type: BYTES not compatible with writer type: INT", "/"),
+            (A_INT_RECORD1, INT_SCHEMA, "reader type: RECORD not compatible with writer type: INT", "/"),
+            (INT_ARRAY_SCHEMA, LONG_ARRAY_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/items"),
+            (INT_MAP_SCHEMA, INT_ARRAY_SCHEMA, "reader type: MAP not compatible with writer type: ARRAY", "/"),
+            (INT_ARRAY_SCHEMA, INT_MAP_SCHEMA, "reader type: ARRAY not compatible with writer type: MAP", "/"),
+            (INT_MAP_SCHEMA, LONG_MAP_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/values"),
+            (INT_SCHEMA, ENUM2_AB_SCHEMA, "reader type: INT not compatible with writer type: ENUM", "/"),
+            (ENUM2_AB_SCHEMA, INT_SCHEMA, "reader type: ENUM not compatible with writer type: INT", "/"),
+            (
+                FLOAT_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA, "reader type: FLOAT not compatible with writer type: DOUBLE",
+                "/"
+            ),
+            (LONG_SCHEMA, INT_FLOAT_UNION_SCHEMA, "reader type: LONG not compatible with writer type: FLOAT", "/"),
+            (INT_SCHEMA, INT_FLOAT_UNION_SCHEMA, "reader type: INT not compatible with writer type: FLOAT", "/"),
+            # (INT_LIST_RECORD, LONG_LIST_RECORD, "reader type: INT not compatible with writer type: LONG", "/fields/0/type"),
+            (NULL_SCHEMA, INT_SCHEMA, "reader type: NULL not compatible with writer type: INT", "/"),
+        ]
+        for (reader, writer, message, location) in incompatible_pairs:
+            result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+            self.assertIs(result.compatibility, SchemaCompatibilityType.incompatible)
+            self.assertIn(message, result.messages)
+            self.assertIn(location, result.locations)
 
-def are_compatible(reader: Schema, writer: Schema) -> bool:
-    return ReaderWriterCompatibilityChecker(
-    ).get_compatibility(reader, writer).compatibility is SchemaCompatibilityType.compatible
+    def are_compatible(self, reader: Schema, writer: Schema) -> bool:
+        return ReaderWriterCompatibilityChecker(
+        ).get_compatibility(reader, writer).compatibility is SchemaCompatibilityType.compatible