You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by ko...@apache.org on 2021/05/14 23:43:49 UTC
[avro] branch master updated: AVRO-3097: Run Schema Compatibility
checks as part of Unittest (#1166)
This is an automated email from the ASF dual-hosted git repository.
kojiromike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new 93d5e21 AVRO-3097: Run Schema Compatibility checks as part of Unittest (#1166)
93d5e21 is described below
commit 93d5e2110ca867c6c6611f8715c0934e437518c7
Author: Subhash Bhushan <su...@gmail.com>
AuthorDate: Fri May 14 16:43:39 2021 -0700
AVRO-3097: Run Schema Compatibility checks as part of Unittest (#1166)
* AVRO-3097 Run Schema Compatibility checks as part of Unittest
Test methods in `test_compatibility.py` were being ignored in
unittest runs because they were not connected to unittest.TestCase
This commit encloses the test cases within a class deriving from
`unittest.TestCase` and also changes assert statements to be
in compliance with unittest.
* AVRO-3097: Expose `compatibility` module as part of avro package
`compatibility` package was not accessible through the avro package.
This PR exposes it and allows `import avro.compatibility`.
---
lang/py/avro/__init__.py | 2 +-
lang/py/avro/compatibility.py | 2 +-
lang/py/avro/test/test_compatibility.py | 678 ++++++++++++++++----------------
3 files changed, 341 insertions(+), 341 deletions(-)
diff --git a/lang/py/avro/__init__.py b/lang/py/avro/__init__.py
index 165ced5..2ccfa35 100644
--- a/lang/py/avro/__init__.py
+++ b/lang/py/avro/__init__.py
@@ -21,6 +21,6 @@
import pkgutil
-__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones', 'codecs']
+__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones', 'codecs', 'compatibility']
__version__ = (pkgutil.get_data(__name__, 'VERSION.txt') or b'0.0.1+unknown').decode().strip()
diff --git a/lang/py/avro/compatibility.py b/lang/py/avro/compatibility.py
index 98d60a0..1f49c15 100644
--- a/lang/py/avro/compatibility.py
+++ b/lang/py/avro/compatibility.py
@@ -20,7 +20,7 @@
# limitations under the License.
from copy import copy
from enum import Enum
-from typing import Dict, List, Optional, Set, cast
+from typing import List, Optional, Set, cast
from avro.errors import AvroRuntimeException
from avro.schema import ArraySchema, EnumSchema, Field, FixedSchema, MapSchema, NamedSchema, RecordSchema, Schema, UnionSchema
diff --git a/lang/py/avro/test/test_compatibility.py b/lang/py/avro/test/test_compatibility.py
index 729bcb7..91e136d 100644
--- a/lang/py/avro/test/test_compatibility.py
+++ b/lang/py/avro/test/test_compatibility.py
@@ -20,6 +20,7 @@
# limitations under the License.
import json
+import unittest
from avro.compatibility import ReaderWriterCompatibilityChecker, SchemaCompatibilityType, SchemaType
from avro.schema import ArraySchema, MapSchema, Names, PrimitiveSchema, Schema, UnionSchema, parse
@@ -373,356 +374,355 @@ RECORD1_WITH_ENUM_ABC = parse(
)
-def test_simple_schema_promotion():
- field_alias_reader = parse(
- json.dumps({
- "name": "foo",
- "type": "record",
- "fields": [{
- "type": "int",
- "name": "bar",
- "aliases": ["f1"]
- }]
- })
- )
- record_alias_reader = parse(
- json.dumps({
- "name": "other",
- "type": "record",
- "fields": [{
- "type": "int",
- "name": "f1"
- }],
- "aliases": ["foo"]
- })
- )
+class TestCompatibility(unittest.TestCase):
+ def test_simple_schema_promotion(self):
+ field_alias_reader = parse(
+ json.dumps({
+ "name": "foo",
+ "type": "record",
+ "fields": [{
+ "type": "int",
+ "name": "bar",
+ "aliases": ["f1"]
+ }]
+ })
+ )
+ record_alias_reader = parse(
+ json.dumps({
+ "name": "other",
+ "type": "record",
+ "fields": [{
+ "type": "int",
+ "name": "f1"
+ }],
+ "aliases": ["foo"]
+ })
+ )
- writer = parse(
- json.dumps({
- "name": "foo",
- "type": "record",
- "fields": [{
- "type": "int",
- "name": "f1"
- }, {
- "type": "string",
- "name": "f2",
- }]
- })
- )
- # alias testing
- res = ReaderWriterCompatibilityChecker().get_compatibility(field_alias_reader, writer)
- assert res.compatibility is SchemaCompatibilityType.compatible, res.locations
- res = ReaderWriterCompatibilityChecker().get_compatibility(record_alias_reader, writer)
- assert res.compatibility is SchemaCompatibilityType.compatible, res.locations
+ writer = parse(
+ json.dumps({
+ "name": "foo",
+ "type": "record",
+ "fields": [{
+ "type": "int",
+ "name": "f1"
+ }, {
+ "type": "string",
+ "name": "f2",
+ }]
+ })
+ )
+ # alias testing
+ res = ReaderWriterCompatibilityChecker().get_compatibility(field_alias_reader, writer)
+ self.assertIs(res.compatibility, SchemaCompatibilityType.compatible, res.locations)
+ res = ReaderWriterCompatibilityChecker().get_compatibility(record_alias_reader, writer)
+ self.assertIs(res.compatibility, SchemaCompatibilityType.compatible, res.locations)
-
-def test_schema_compatibility():
- # testValidateSchemaPairMissingField
- writer = parse(
- json.dumps({
- "type": SchemaType.RECORD,
- "name": "Record",
- "fields": [{
- "name": "oldField1",
- "type": SchemaType.INT
- }, {
- "name": "oldField2",
- "type": SchemaType.STRING
- }]
- })
- )
- reader = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record", "fields": [{"name": "oldField1", "type": SchemaType.INT}]}))
- assert are_compatible(reader, writer)
- # testValidateSchemaPairMissingSecondField
- reader = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record", "fields": [{"name": "oldField2", "type": SchemaType.STRING}]}))
- assert are_compatible(reader, writer)
- # testValidateSchemaPairAllFields
- reader = parse(
- json.dumps({
- "type": SchemaType.RECORD,
+ def test_schema_compatibility(self):
+ # testValidateSchemaPairMissingField
+ writer = parse(
+ json.dumps({
+ "type": SchemaType.RECORD,
+ "name": "Record",
+ "fields": [{
+ "name": "oldField1",
+ "type": SchemaType.INT
+ }, {
+ "name": "oldField2",
+ "type": SchemaType.STRING
+ }]
+ })
+ )
+ reader = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record", "fields": [{"name": "oldField1", "type": SchemaType.INT}]}))
+ self.assertTrue(self.are_compatible(reader, writer))
+ # testValidateSchemaPairMissingSecondField
+ reader = parse(json.dumps({"type": SchemaType.RECORD, "name": "Record", "fields": [{"name": "oldField2", "type": SchemaType.STRING}]}))
+ self.assertTrue(self.are_compatible(reader, writer))
+ # testValidateSchemaPairAllFields
+ reader = parse(
+ json.dumps({
+ "type": SchemaType.RECORD,
+ "name": "Record",
+ "fields": [{
+ "name": "oldField1",
+ "type": SchemaType.INT
+ }, {
+ "name": "oldField2",
+ "type": SchemaType.STRING
+ }]
+ })
+ )
+ self.assertTrue(self.are_compatible(reader, writer))
+ # testValidateSchemaNewFieldWithDefault
+ reader = parse(
+ json.dumps({
+ "type": SchemaType.RECORD,
+ "name": "Record",
+ "fields": [{
+ "name": "oldField1",
+ "type": SchemaType.INT
+ }, {
+ "name": "newField2",
+ "type": SchemaType.INT,
+ "default": 42
+ }]
+ })
+ )
+ self.assertTrue(self.are_compatible(reader, writer))
+ # testValidateSchemaNewField
+ reader = parse(
+ json.dumps({
+ "type": SchemaType.RECORD,
+ "name": "Record",
+ "fields": [{
+ "name": "oldField1",
+ "type": SchemaType.INT
+ }, {
+ "name": "newField2",
+ "type": SchemaType.INT
+ }]
+ })
+ )
+ self.assertFalse(self.are_compatible(reader, writer))
+ # testValidateArrayWriterSchema
+ writer = parse(json.dumps({"type": SchemaType.ARRAY, "items": {"type": SchemaType.STRING}}))
+ reader = parse(json.dumps({"type": SchemaType.ARRAY, "items": {"type": SchemaType.STRING}}))
+ self.assertTrue(self.are_compatible(reader, writer))
+ reader = parse(json.dumps({"type": SchemaType.MAP, "values": {"type": SchemaType.STRING}}))
+ self.assertFalse(self.are_compatible(reader, writer))
+ # testValidatePrimitiveWriterSchema
+ writer = parse(json.dumps({"type": SchemaType.STRING}))
+ reader = parse(json.dumps({"type": SchemaType.STRING}))
+ self.assertTrue(self.are_compatible(reader, writer))
+ reader = parse(json.dumps({"type": SchemaType.INT}))
+ self.assertFalse(self.are_compatible(reader, writer))
+ # testUnionReaderWriterSubsetIncompatibility
+ writer = parse(
+ json.dumps({
+ "name": "Record",
+ "type": "record",
+ "fields": [{
+ "name": "f1",
+ "type": [SchemaType.INT, SchemaType.STRING, SchemaType.LONG]
+ }]
+ })
+ )
+ reader = parse(json.dumps({
"name": "Record",
- "fields": [{
- "name": "oldField1",
- "type": SchemaType.INT
- }, {
- "name": "oldField2",
- "type": SchemaType.STRING
- }]
- })
- )
- assert are_compatible(reader, writer)
- # testValidateSchemaNewFieldWithDefault
- reader = parse(
- json.dumps({
"type": SchemaType.RECORD,
- "name": "Record",
- "fields": [{
- "name": "oldField1",
- "type": SchemaType.INT
- }, {
- "name": "newField2",
- "type": SchemaType.INT,
- "default": 42
- }]
- })
- )
- assert are_compatible(reader, writer)
- # testValidateSchemaNewField
- reader = parse(
- json.dumps({
- "type": SchemaType.RECORD,
- "name": "Record",
- "fields": [{
- "name": "oldField1",
- "type": SchemaType.INT
- }, {
- "name": "newField2",
- "type": SchemaType.INT
- }]
- })
- )
- assert not are_compatible(reader, writer)
- # testValidateArrayWriterSchema
- writer = parse(json.dumps({"type": SchemaType.ARRAY, "items": {"type": SchemaType.STRING}}))
- reader = parse(json.dumps({"type": SchemaType.ARRAY, "items": {"type": SchemaType.STRING}}))
- assert are_compatible(reader, writer)
- reader = parse(json.dumps({"type": SchemaType.MAP, "values": {"type": SchemaType.STRING}}))
- assert not are_compatible(reader, writer)
- # testValidatePrimitiveWriterSchema
- writer = parse(json.dumps({"type": SchemaType.STRING}))
- reader = parse(json.dumps({"type": SchemaType.STRING}))
- assert are_compatible(reader, writer)
- reader = parse(json.dumps({"type": SchemaType.INT}))
- assert not are_compatible(reader, writer)
- # testUnionReaderWriterSubsetIncompatibility
- writer = parse(
- json.dumps({
- "name": "Record",
- "type": "record",
- "fields": [{
- "name": "f1",
- "type": [SchemaType.INT, SchemaType.STRING, SchemaType.LONG]
- }]
- })
- )
- reader = parse(json.dumps({"name": "Record", "type": SchemaType.RECORD, "fields": [{"name": "f1", "type": [SchemaType.INT, SchemaType.STRING]}]}))
- reader = reader.fields[0].type
- writer = writer.fields[0].type
- assert isinstance(reader, UnionSchema)
- assert isinstance(writer, UnionSchema)
- assert not are_compatible(reader, writer)
- # testReaderWriterCompatibility
- compatible_reader_writer_test_cases = [
- (BOOLEAN_SCHEMA, BOOLEAN_SCHEMA),
- (INT_SCHEMA, INT_SCHEMA),
- (LONG_SCHEMA, INT_SCHEMA),
- (LONG_SCHEMA, LONG_SCHEMA),
- (FLOAT_SCHEMA, INT_SCHEMA),
- (FLOAT_SCHEMA, LONG_SCHEMA),
- (DOUBLE_SCHEMA, LONG_SCHEMA),
- (DOUBLE_SCHEMA, INT_SCHEMA),
- (DOUBLE_SCHEMA, FLOAT_SCHEMA),
- (STRING_SCHEMA, STRING_SCHEMA),
- (BYTES_SCHEMA, BYTES_SCHEMA),
- (STRING_SCHEMA, BYTES_SCHEMA),
- (BYTES_SCHEMA, STRING_SCHEMA),
- (INT_ARRAY_SCHEMA, INT_ARRAY_SCHEMA),
- (LONG_ARRAY_SCHEMA, INT_ARRAY_SCHEMA),
- (INT_MAP_SCHEMA, INT_MAP_SCHEMA),
- (LONG_MAP_SCHEMA, INT_MAP_SCHEMA),
- (ENUM1_AB_SCHEMA, ENUM1_AB_SCHEMA),
- (ENUM1_ABC_SCHEMA, ENUM1_AB_SCHEMA),
- # Union related pairs
- (EMPTY_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
- (FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
- (FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
- (FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
- (FLOAT_UNION_SCHEMA, INT_LONG_UNION_SCHEMA),
- (INT_UNION_SCHEMA, INT_UNION_SCHEMA),
- (INT_STRING_UNION_SCHEMA, STRING_INT_UNION_SCHEMA),
- (INT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
- (LONG_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
- (LONG_UNION_SCHEMA, INT_UNION_SCHEMA),
- (FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
- (DOUBLE_UNION_SCHEMA, INT_UNION_SCHEMA),
- (FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
- (DOUBLE_UNION_SCHEMA, LONG_UNION_SCHEMA),
- (FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
- (DOUBLE_UNION_SCHEMA, FLOAT_UNION_SCHEMA),
- (STRING_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
- (STRING_UNION_SCHEMA, BYTES_UNION_SCHEMA),
- (BYTES_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
- (BYTES_UNION_SCHEMA, STRING_UNION_SCHEMA),
- (DOUBLE_UNION_SCHEMA, INT_FLOAT_UNION_SCHEMA),
- # Readers capable of reading all branches of a union are compatible
- (FLOAT_SCHEMA, INT_FLOAT_UNION_SCHEMA),
- (LONG_SCHEMA, INT_LONG_UNION_SCHEMA),
- (DOUBLE_SCHEMA, INT_FLOAT_UNION_SCHEMA),
- (DOUBLE_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA),
- # Special case of singleton unions:
- (FLOAT_SCHEMA, FLOAT_UNION_SCHEMA),
- (INT_UNION_SCHEMA, INT_SCHEMA),
- (INT_SCHEMA, INT_UNION_SCHEMA),
- # Fixed types
- (FIXED_4_BYTES, FIXED_4_BYTES),
- # Tests involving records:
- (EMPTY_RECORD1, EMPTY_RECORD1),
- (EMPTY_RECORD1, A_INT_RECORD1),
- (A_INT_RECORD1, A_INT_RECORD1),
- (A_DINT_RECORD1, A_INT_RECORD1),
- (A_DINT_RECORD1, A_DINT_RECORD1),
- (A_INT_RECORD1, A_DINT_RECORD1),
- (A_LONG_RECORD1, A_INT_RECORD1),
- (A_INT_RECORD1, A_INT_B_INT_RECORD1),
- (A_DINT_RECORD1, A_INT_B_INT_RECORD1),
- (A_INT_B_DINT_RECORD1, A_INT_RECORD1),
- (A_DINT_B_DINT_RECORD1, EMPTY_RECORD1),
- (A_DINT_B_DINT_RECORD1, A_INT_RECORD1),
- (A_INT_B_INT_RECORD1, A_DINT_B_DINT_RECORD1),
- (parse(json.dumps({"type": "null"})), parse(json.dumps({"type": "null"}))),
- (NULL_SCHEMA, NULL_SCHEMA),
- (ENUM_AB_ENUM_DEFAULT_A_RECORD, ENUM_ABC_ENUM_DEFAULT_A_RECORD),
- (ENUM_AB_FIELD_DEFAULT_A_ENUM_DEFAULT_B_RECORD, ENUM_ABC_FIELD_DEFAULT_B_ENUM_DEFAULT_A_RECORD),
- (NS_RECORD1, NS_RECORD2),
- ]
-
- for (reader, writer) in compatible_reader_writer_test_cases:
- assert are_compatible(reader, writer)
-
-
-def test_schema_compatibility_fixed_size_mismatch():
- incompatible_fixed_pairs = [
- (FIXED_4_BYTES, FIXED_8_BYTES, "expected: 8, found: 4", "/size"),
- (FIXED_8_BYTES, FIXED_4_BYTES, "expected: 4, found: 8", "/size"),
- (A_DINT_B_DFIXED_8_BYTES_RECORD1, A_DINT_B_DFIXED_4_BYTES_RECORD1, "expected: 4, found: 8", "/fields/1/type/size"),
- (A_DINT_B_DFIXED_4_BYTES_RECORD1, A_DINT_B_DFIXED_8_BYTES_RECORD1, "expected: 8, found: 4", "/fields/1/type/size"),
- ]
- for (reader, writer, message, location) in incompatible_fixed_pairs:
- result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
- assert result.compatibility is SchemaCompatibilityType.incompatible
- assert location in result.locations, "expected {}, found {}".format(location, result)
- assert message in result.messages, "expected {}, found {}".format(location, result)
-
-
-def test_schema_compatibility_missing_enum_symbols():
- incompatible_pairs = [
- # str(set) representation
- (ENUM1_AB_SCHEMA, ENUM1_ABC_SCHEMA, "{'C'}", "/symbols"),
- (ENUM1_BC_SCHEMA, ENUM1_ABC_SCHEMA, "{'A'}", "/symbols"),
- (RECORD1_WITH_ENUM_AB, RECORD1_WITH_ENUM_ABC, "{'C'}", "/fields/0/type/symbols"),
- ]
- for (reader, writer, message, location) in incompatible_pairs:
- result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
- assert result.compatibility is SchemaCompatibilityType.incompatible
- assert message in result.messages
- assert location in result.locations
-
-
-def test_schema_compatibility_missing_union_branch():
- incompatible_pairs = [
- (INT_UNION_SCHEMA, INT_STRING_UNION_SCHEMA, {"reader union lacking writer type: STRING"}, {"/1"}),
- (STRING_UNION_SCHEMA, INT_STRING_UNION_SCHEMA, {"reader union lacking writer type: INT"}, {"/0"}),
- (INT_UNION_SCHEMA, UNION_INT_RECORD1, {"reader union lacking writer type: RECORD"}, {"/1"}),
- (INT_UNION_SCHEMA, UNION_INT_RECORD2, {"reader union lacking writer type: RECORD"}, {"/1"}),
- (UNION_INT_RECORD1, UNION_INT_RECORD2, {"reader union lacking writer type: RECORD"}, {"/1"}),
- (INT_UNION_SCHEMA, UNION_INT_ENUM1_AB, {"reader union lacking writer type: ENUM"}, {"/1"}),
- (INT_UNION_SCHEMA, UNION_INT_FIXED_4_BYTES, {"reader union lacking writer type: FIXED"}, {"/1"}),
- (INT_UNION_SCHEMA, UNION_INT_BOOLEAN, {"reader union lacking writer type: BOOLEAN"}, {"/1"}),
- (INT_UNION_SCHEMA, LONG_UNION_SCHEMA, {"reader union lacking writer type: LONG"}, {"/0"}),
- (INT_UNION_SCHEMA, FLOAT_UNION_SCHEMA, {"reader union lacking writer type: FLOAT"}, {"/0"}),
- (INT_UNION_SCHEMA, DOUBLE_UNION_SCHEMA, {"reader union lacking writer type: DOUBLE"}, {"/0"}),
- (INT_UNION_SCHEMA, BYTES_UNION_SCHEMA, {"reader union lacking writer type: BYTES"}, {"/0"}),
- (INT_UNION_SCHEMA, UNION_INT_ARRAY_INT, {"reader union lacking writer type: ARRAY"}, {"/1"}),
- (INT_UNION_SCHEMA, UNION_INT_MAP_INT, {"reader union lacking writer type: MAP"}, {"/1"}),
- (INT_UNION_SCHEMA, UNION_INT_NULL, {"reader union lacking writer type: NULL"}, {"/1"}),
- (
- INT_UNION_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA, {
- "reader union lacking writer type: LONG", "reader union lacking writer type: FLOAT",
- "reader union lacking writer type: DOUBLE"
- }, {"/1", "/2", "/3"}
- ),
- (
- A_DINT_B_DINT_UNION_RECORD1, A_DINT_B_DINT_STRING_UNION_RECORD1, {"reader union lacking writer type: STRING"},
- {"/fields/1/type/1"}
- ),
- ]
+ "fields": [
+ {"name": "f1", "type": [SchemaType.INT, SchemaType.STRING]}]}))
+ reader = reader.fields[0].type
+ writer = writer.fields[0].type
+ self.assertIsInstance(reader, UnionSchema)
+ self.assertIsInstance(writer, UnionSchema)
+ self.assertFalse(self.are_compatible(reader, writer))
+ # testReaderWriterCompatibility
+ compatible_reader_writer_test_cases = [
+ (BOOLEAN_SCHEMA, BOOLEAN_SCHEMA),
+ (INT_SCHEMA, INT_SCHEMA),
+ (LONG_SCHEMA, INT_SCHEMA),
+ (LONG_SCHEMA, LONG_SCHEMA),
+ (FLOAT_SCHEMA, INT_SCHEMA),
+ (FLOAT_SCHEMA, LONG_SCHEMA),
+ (DOUBLE_SCHEMA, LONG_SCHEMA),
+ (DOUBLE_SCHEMA, INT_SCHEMA),
+ (DOUBLE_SCHEMA, FLOAT_SCHEMA),
+ (STRING_SCHEMA, STRING_SCHEMA),
+ (BYTES_SCHEMA, BYTES_SCHEMA),
+ (STRING_SCHEMA, BYTES_SCHEMA),
+ (BYTES_SCHEMA, STRING_SCHEMA),
+ (INT_ARRAY_SCHEMA, INT_ARRAY_SCHEMA),
+ (LONG_ARRAY_SCHEMA, INT_ARRAY_SCHEMA),
+ (INT_MAP_SCHEMA, INT_MAP_SCHEMA),
+ (LONG_MAP_SCHEMA, INT_MAP_SCHEMA),
+ (ENUM1_AB_SCHEMA, ENUM1_AB_SCHEMA),
+ (ENUM1_ABC_SCHEMA, ENUM1_AB_SCHEMA),
+ # Union related pairs
+ (EMPTY_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+ (FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+ (FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
+ (FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
+ (FLOAT_UNION_SCHEMA, INT_LONG_UNION_SCHEMA),
+ (INT_UNION_SCHEMA, INT_UNION_SCHEMA),
+ (INT_STRING_UNION_SCHEMA, STRING_INT_UNION_SCHEMA),
+ (INT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+ (LONG_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+ (LONG_UNION_SCHEMA, INT_UNION_SCHEMA),
+ (FLOAT_UNION_SCHEMA, INT_UNION_SCHEMA),
+ (DOUBLE_UNION_SCHEMA, INT_UNION_SCHEMA),
+ (FLOAT_UNION_SCHEMA, LONG_UNION_SCHEMA),
+ (DOUBLE_UNION_SCHEMA, LONG_UNION_SCHEMA),
+ (FLOAT_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+ (DOUBLE_UNION_SCHEMA, FLOAT_UNION_SCHEMA),
+ (STRING_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+ (STRING_UNION_SCHEMA, BYTES_UNION_SCHEMA),
+ (BYTES_UNION_SCHEMA, EMPTY_UNION_SCHEMA),
+ (BYTES_UNION_SCHEMA, STRING_UNION_SCHEMA),
+ (DOUBLE_UNION_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+ # Readers capable of reading all branches of a union are compatible
+ (FLOAT_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+ (LONG_SCHEMA, INT_LONG_UNION_SCHEMA),
+ (DOUBLE_SCHEMA, INT_FLOAT_UNION_SCHEMA),
+ (DOUBLE_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA),
+ # Special case of singleton unions:
+ (FLOAT_SCHEMA, FLOAT_UNION_SCHEMA),
+ (INT_UNION_SCHEMA, INT_SCHEMA),
+ (INT_SCHEMA, INT_UNION_SCHEMA),
+ # Fixed types
+ (FIXED_4_BYTES, FIXED_4_BYTES),
+ # Tests involving records:
+ (EMPTY_RECORD1, EMPTY_RECORD1),
+ (EMPTY_RECORD1, A_INT_RECORD1),
+ (A_INT_RECORD1, A_INT_RECORD1),
+ (A_DINT_RECORD1, A_INT_RECORD1),
+ (A_DINT_RECORD1, A_DINT_RECORD1),
+ (A_INT_RECORD1, A_DINT_RECORD1),
+ (A_LONG_RECORD1, A_INT_RECORD1),
+ (A_INT_RECORD1, A_INT_B_INT_RECORD1),
+ (A_DINT_RECORD1, A_INT_B_INT_RECORD1),
+ (A_INT_B_DINT_RECORD1, A_INT_RECORD1),
+ (A_DINT_B_DINT_RECORD1, EMPTY_RECORD1),
+ (A_DINT_B_DINT_RECORD1, A_INT_RECORD1),
+ (A_INT_B_INT_RECORD1, A_DINT_B_DINT_RECORD1),
+ (parse(json.dumps({"type": "null"})), parse(json.dumps({"type": "null"}))),
+ (NULL_SCHEMA, NULL_SCHEMA),
+ (ENUM_AB_ENUM_DEFAULT_A_RECORD, ENUM_ABC_ENUM_DEFAULT_A_RECORD),
+ (ENUM_AB_FIELD_DEFAULT_A_ENUM_DEFAULT_B_RECORD, ENUM_ABC_FIELD_DEFAULT_B_ENUM_DEFAULT_A_RECORD),
+ (NS_RECORD1, NS_RECORD2),
+ ]
- for (reader, writer, message, location) in incompatible_pairs:
- result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
- assert result.compatibility is SchemaCompatibilityType.incompatible
- assert result.messages == message
- assert result.locations == location
+ for (reader, writer) in compatible_reader_writer_test_cases:
+ self.assertTrue(self.are_compatible(reader, writer))
+ def test_schema_compatibility_fixed_size_mismatch(self):
+ incompatible_fixed_pairs = [
+ (FIXED_4_BYTES, FIXED_8_BYTES, "expected: 8, found: 4", "/size"),
+ (FIXED_8_BYTES, FIXED_4_BYTES, "expected: 4, found: 8", "/size"),
+ (A_DINT_B_DFIXED_8_BYTES_RECORD1, A_DINT_B_DFIXED_4_BYTES_RECORD1, "expected: 4, found: 8", "/fields/1/type/size"),
+ (A_DINT_B_DFIXED_4_BYTES_RECORD1, A_DINT_B_DFIXED_8_BYTES_RECORD1, "expected: 8, found: 4", "/fields/1/type/size"),
+ ]
+ for (reader, writer, message, location) in incompatible_fixed_pairs:
+ result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+ self.assertIs(result.compatibility, SchemaCompatibilityType.incompatible)
+ self.assertIn(location, result.locations, "expected {}, found {}".format(location, result))
+ self.assertIn(message, result.messages, "expected {}, found {}".format(location, result))
-def test_schema_compatibility_name_mismatch():
- incompatible_pairs = [(ENUM1_AB_SCHEMA, ENUM2_AB_SCHEMA, "expected: Enum2", "/name"),
- (EMPTY_RECORD2, EMPTY_RECORD1, "expected: Record1", "/name"),
- (FIXED_4_BYTES, FIXED_4_ANOTHER_NAME, "expected: AnotherName", "/name"),
- (A_DINT_B_DENUM_1_RECORD1, A_DINT_B_DENUM_2_RECORD1, "expected: Enum2", "/fields/1/type/name")]
+ def test_schema_compatibility_missing_enum_symbols(self):
+ incompatible_pairs = [
+ # str(set) representation
+ (ENUM1_AB_SCHEMA, ENUM1_ABC_SCHEMA, "{'C'}", "/symbols"),
+ (ENUM1_BC_SCHEMA, ENUM1_ABC_SCHEMA, "{'A'}", "/symbols"),
+ (RECORD1_WITH_ENUM_AB, RECORD1_WITH_ENUM_ABC, "{'C'}", "/fields/0/type/symbols"),
+ ]
+ for (reader, writer, message, location) in incompatible_pairs:
+ result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+ self.assertIs(result.compatibility, SchemaCompatibilityType.incompatible)
+ self.assertIn(message, result.messages)
+ self.assertIn(location, result.locations)
- for (reader, writer, message, location) in incompatible_pairs:
- result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
- assert result.compatibility is SchemaCompatibilityType.incompatible
- assert message in result.messages
- assert location in result.locations
+ def test_schema_compatibility_missing_union_branch(self):
+ incompatible_pairs = [
+ (INT_UNION_SCHEMA, INT_STRING_UNION_SCHEMA, {"reader union lacking writer type: STRING"}, {"/1"}),
+ (STRING_UNION_SCHEMA, INT_STRING_UNION_SCHEMA, {"reader union lacking writer type: INT"}, {"/0"}),
+ (INT_UNION_SCHEMA, UNION_INT_RECORD1, {"reader union lacking writer type: RECORD"}, {"/1"}),
+ (INT_UNION_SCHEMA, UNION_INT_RECORD2, {"reader union lacking writer type: RECORD"}, {"/1"}),
+ (UNION_INT_RECORD1, UNION_INT_RECORD2, {"reader union lacking writer type: RECORD"}, {"/1"}),
+ (INT_UNION_SCHEMA, UNION_INT_ENUM1_AB, {"reader union lacking writer type: ENUM"}, {"/1"}),
+ (INT_UNION_SCHEMA, UNION_INT_FIXED_4_BYTES, {"reader union lacking writer type: FIXED"}, {"/1"}),
+ (INT_UNION_SCHEMA, UNION_INT_BOOLEAN, {"reader union lacking writer type: BOOLEAN"}, {"/1"}),
+ (INT_UNION_SCHEMA, LONG_UNION_SCHEMA, {"reader union lacking writer type: LONG"}, {"/0"}),
+ (INT_UNION_SCHEMA, FLOAT_UNION_SCHEMA, {"reader union lacking writer type: FLOAT"}, {"/0"}),
+ (INT_UNION_SCHEMA, DOUBLE_UNION_SCHEMA, {"reader union lacking writer type: DOUBLE"}, {"/0"}),
+ (INT_UNION_SCHEMA, BYTES_UNION_SCHEMA, {"reader union lacking writer type: BYTES"}, {"/0"}),
+ (INT_UNION_SCHEMA, UNION_INT_ARRAY_INT, {"reader union lacking writer type: ARRAY"}, {"/1"}),
+ (INT_UNION_SCHEMA, UNION_INT_MAP_INT, {"reader union lacking writer type: MAP"}, {"/1"}),
+ (INT_UNION_SCHEMA, UNION_INT_NULL, {"reader union lacking writer type: NULL"}, {"/1"}),
+ (
+ INT_UNION_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA, {
+ "reader union lacking writer type: LONG", "reader union lacking writer type: FLOAT",
+ "reader union lacking writer type: DOUBLE"
+ }, {"/1", "/2", "/3"}
+ ),
+ (
+ A_DINT_B_DINT_UNION_RECORD1, A_DINT_B_DINT_STRING_UNION_RECORD1, {"reader union lacking writer type: STRING"},
+ {"/fields/1/type/1"}
+ ),
+ ]
+ for (reader, writer, message, location) in incompatible_pairs:
+ result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+ self.assertIs(result.compatibility, SchemaCompatibilityType.incompatible)
+ self.assertEqual(result.messages, message)
+ self.assertEqual(result.locations, location)
-def test_schema_compatibility_reader_field_missing_default_value():
- incompatible_pairs = [
- (A_INT_RECORD1, EMPTY_RECORD1, "a", "/fields/0"),
- (A_INT_B_DINT_RECORD1, EMPTY_RECORD1, "a", "/fields/0"),
- ]
- for (reader, writer, message, location) in incompatible_pairs:
- result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
- assert result.compatibility is SchemaCompatibilityType.incompatible
- assert len(result.messages) == 1 and len(result.locations) == 1
- assert message == "".join(result.messages)
- assert location == "".join(result.locations)
+ def test_schema_compatibility_name_mismatch(self):
+ incompatible_pairs = [
+ (ENUM1_AB_SCHEMA, ENUM2_AB_SCHEMA, "expected: Enum2", "/name"),
+ (EMPTY_RECORD2, EMPTY_RECORD1, "expected: Record1", "/name"),
+ (FIXED_4_BYTES, FIXED_4_ANOTHER_NAME, "expected: AnotherName", "/name"),
+ (A_DINT_B_DENUM_1_RECORD1, A_DINT_B_DENUM_2_RECORD1, "expected: Enum2", "/fields/1/type/name")]
+ for (reader, writer, message, location) in incompatible_pairs:
+ result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+ self.assertIs(result.compatibility, SchemaCompatibilityType.incompatible)
+ self.assertIn(message, result.messages)
+ self.assertIn(location, result.locations)
-def test_schema_compatibility_type_mismatch():
- incompatible_pairs = [
- (NULL_SCHEMA, INT_SCHEMA, "reader type: NULL not compatible with writer type: INT", "/"),
- (NULL_SCHEMA, LONG_SCHEMA, "reader type: NULL not compatible with writer type: LONG", "/"),
- (BOOLEAN_SCHEMA, INT_SCHEMA, "reader type: BOOLEAN not compatible with writer type: INT", "/"),
- (INT_SCHEMA, NULL_SCHEMA, "reader type: INT not compatible with writer type: NULL", "/"),
- (INT_SCHEMA, BOOLEAN_SCHEMA, "reader type: INT not compatible with writer type: BOOLEAN", "/"),
- (INT_SCHEMA, LONG_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/"),
- (INT_SCHEMA, FLOAT_SCHEMA, "reader type: INT not compatible with writer type: FLOAT", "/"),
- (INT_SCHEMA, DOUBLE_SCHEMA, "reader type: INT not compatible with writer type: DOUBLE", "/"),
- (LONG_SCHEMA, FLOAT_SCHEMA, "reader type: LONG not compatible with writer type: FLOAT", "/"),
- (LONG_SCHEMA, DOUBLE_SCHEMA, "reader type: LONG not compatible with writer type: DOUBLE", "/"),
- (FLOAT_SCHEMA, DOUBLE_SCHEMA, "reader type: FLOAT not compatible with writer type: DOUBLE", "/"),
- (DOUBLE_SCHEMA, STRING_SCHEMA, "reader type: DOUBLE not compatible with writer type: STRING", "/"),
- (FIXED_4_BYTES, STRING_SCHEMA, "reader type: FIXED not compatible with writer type: STRING", "/"),
- (STRING_SCHEMA, BOOLEAN_SCHEMA, "reader type: STRING not compatible with writer type: BOOLEAN", "/"),
- (STRING_SCHEMA, INT_SCHEMA, "reader type: STRING not compatible with writer type: INT", "/"),
- (BYTES_SCHEMA, NULL_SCHEMA, "reader type: BYTES not compatible with writer type: NULL", "/"),
- (BYTES_SCHEMA, INT_SCHEMA, "reader type: BYTES not compatible with writer type: INT", "/"),
- (A_INT_RECORD1, INT_SCHEMA, "reader type: RECORD not compatible with writer type: INT", "/"),
- (INT_ARRAY_SCHEMA, LONG_ARRAY_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/items"),
- (INT_MAP_SCHEMA, INT_ARRAY_SCHEMA, "reader type: MAP not compatible with writer type: ARRAY", "/"),
- (INT_ARRAY_SCHEMA, INT_MAP_SCHEMA, "reader type: ARRAY not compatible with writer type: MAP", "/"),
- (INT_MAP_SCHEMA, LONG_MAP_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/values"),
- (INT_SCHEMA, ENUM2_AB_SCHEMA, "reader type: INT not compatible with writer type: ENUM", "/"),
- (ENUM2_AB_SCHEMA, INT_SCHEMA, "reader type: ENUM not compatible with writer type: INT", "/"),
- (
- FLOAT_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA, "reader type: FLOAT not compatible with writer type: DOUBLE",
- "/"
- ),
- (LONG_SCHEMA, INT_FLOAT_UNION_SCHEMA, "reader type: LONG not compatible with writer type: FLOAT", "/"),
- (INT_SCHEMA, INT_FLOAT_UNION_SCHEMA, "reader type: INT not compatible with writer type: FLOAT", "/"),
- # (INT_LIST_RECORD, LONG_LIST_RECORD, "reader type: INT not compatible with writer type: LONG", "/fields/0/type"),
- (NULL_SCHEMA, INT_SCHEMA, "reader type: NULL not compatible with writer type: INT", "/"),
- ]
- for (reader, writer, message, location) in incompatible_pairs:
- result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
- assert result.compatibility is SchemaCompatibilityType.incompatible
- assert message in result.messages
- assert location in result.locations
+ def test_schema_compatibility_reader_field_missing_default_value(self):
+ incompatible_pairs = [
+ (A_INT_RECORD1, EMPTY_RECORD1, "a", "/fields/0"),
+ (A_INT_B_DINT_RECORD1, EMPTY_RECORD1, "a", "/fields/0"),
+ ]
+ for (reader, writer, message, location) in incompatible_pairs:
+ result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+ self.assertIs(result.compatibility, SchemaCompatibilityType.incompatible)
+ self.assertEqual(len(result.messages), 1)
+ self.assertEqual(len(result.locations), 1)
+ self.assertEqual(message, "".join(result.messages))
+ self.assertEqual(location, "".join(result.locations))
+ def test_schema_compatibility_type_mismatch(self):
+ incompatible_pairs = [
+ (NULL_SCHEMA, INT_SCHEMA, "reader type: NULL not compatible with writer type: INT", "/"),
+ (NULL_SCHEMA, LONG_SCHEMA, "reader type: NULL not compatible with writer type: LONG", "/"),
+ (BOOLEAN_SCHEMA, INT_SCHEMA, "reader type: BOOLEAN not compatible with writer type: INT", "/"),
+ (INT_SCHEMA, NULL_SCHEMA, "reader type: INT not compatible with writer type: NULL", "/"),
+ (INT_SCHEMA, BOOLEAN_SCHEMA, "reader type: INT not compatible with writer type: BOOLEAN", "/"),
+ (INT_SCHEMA, LONG_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/"),
+ (INT_SCHEMA, FLOAT_SCHEMA, "reader type: INT not compatible with writer type: FLOAT", "/"),
+ (INT_SCHEMA, DOUBLE_SCHEMA, "reader type: INT not compatible with writer type: DOUBLE", "/"),
+ (LONG_SCHEMA, FLOAT_SCHEMA, "reader type: LONG not compatible with writer type: FLOAT", "/"),
+ (LONG_SCHEMA, DOUBLE_SCHEMA, "reader type: LONG not compatible with writer type: DOUBLE", "/"),
+ (FLOAT_SCHEMA, DOUBLE_SCHEMA, "reader type: FLOAT not compatible with writer type: DOUBLE", "/"),
+ (DOUBLE_SCHEMA, STRING_SCHEMA, "reader type: DOUBLE not compatible with writer type: STRING", "/"),
+ (FIXED_4_BYTES, STRING_SCHEMA, "reader type: FIXED not compatible with writer type: STRING", "/"),
+ (STRING_SCHEMA, BOOLEAN_SCHEMA, "reader type: STRING not compatible with writer type: BOOLEAN", "/"),
+ (STRING_SCHEMA, INT_SCHEMA, "reader type: STRING not compatible with writer type: INT", "/"),
+ (BYTES_SCHEMA, NULL_SCHEMA, "reader type: BYTES not compatible with writer type: NULL", "/"),
+ (BYTES_SCHEMA, INT_SCHEMA, "reader type: BYTES not compatible with writer type: INT", "/"),
+ (A_INT_RECORD1, INT_SCHEMA, "reader type: RECORD not compatible with writer type: INT", "/"),
+ (INT_ARRAY_SCHEMA, LONG_ARRAY_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/items"),
+ (INT_MAP_SCHEMA, INT_ARRAY_SCHEMA, "reader type: MAP not compatible with writer type: ARRAY", "/"),
+ (INT_ARRAY_SCHEMA, INT_MAP_SCHEMA, "reader type: ARRAY not compatible with writer type: MAP", "/"),
+ (INT_MAP_SCHEMA, LONG_MAP_SCHEMA, "reader type: INT not compatible with writer type: LONG", "/values"),
+ (INT_SCHEMA, ENUM2_AB_SCHEMA, "reader type: INT not compatible with writer type: ENUM", "/"),
+ (ENUM2_AB_SCHEMA, INT_SCHEMA, "reader type: ENUM not compatible with writer type: INT", "/"),
+ (
+ FLOAT_SCHEMA, INT_LONG_FLOAT_DOUBLE_UNION_SCHEMA, "reader type: FLOAT not compatible with writer type: DOUBLE",
+ "/"
+ ),
+ (LONG_SCHEMA, INT_FLOAT_UNION_SCHEMA, "reader type: LONG not compatible with writer type: FLOAT", "/"),
+ (INT_SCHEMA, INT_FLOAT_UNION_SCHEMA, "reader type: INT not compatible with writer type: FLOAT", "/"),
+ # (INT_LIST_RECORD, LONG_LIST_RECORD, "reader type: INT not compatible with writer type: LONG", "/fields/0/type"),
+ (NULL_SCHEMA, INT_SCHEMA, "reader type: NULL not compatible with writer type: INT", "/"),
+ ]
+ for (reader, writer, message, location) in incompatible_pairs:
+ result = ReaderWriterCompatibilityChecker().get_compatibility(reader, writer)
+ self.assertIs(result.compatibility, SchemaCompatibilityType.incompatible)
+ self.assertIn(message, result.messages)
+ self.assertIn(location, result.locations)
-def are_compatible(reader: Schema, writer: Schema) -> bool:
- return ReaderWriterCompatibilityChecker(
- ).get_compatibility(reader, writer).compatibility is SchemaCompatibilityType.compatible
+ def are_compatible(self, reader: Schema, writer: Schema) -> bool:
+ return ReaderWriterCompatibilityChecker(
+ ).get_compatibility(reader, writer).compatibility is SchemaCompatibilityType.compatible