You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 16:30:13 UTC
[pulsar] 02/19: [Python Schema] Python schema support custom Avro configurations for Enum type (#12642)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 736dd6a853d724f652c4aa945076e40384728abd
Author: ran <ga...@126.com>
AuthorDate: Sun Nov 7 21:24:33 2021 +0800
[Python Schema] Python schema support custom Avro configurations for Enum type (#12642)
### Motivation
Currently, the Python client didn't support setting configurations `required`, `default`, `required_default` for Enum type in Record.
### Modifications
Modify the `_Enum` class to `CustomEnum` class, the `_Enum` wasn't exposed to users, the new class `CustomEnum` will be exposed to users, they could set Avro definition configurations `required`, `default`, `required_default`.
### How to use
```
class Color(Enum):
red = 1
green = 2
blue = 3
class NestedObj(Record):
a = Integer()
color = CustomEnum(Color, required_default=True, default=Color.blue)
```
The schema definition will be like this
```
{
'type': 'record',
'name': 'NestedObj',
'fields': [
{'name': 'a', 'type': ['null', 'int']},
{'name': 'color', 'default': 'blue', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols': ['red', 'green', 'blue']}]}
]
}
```
The old way of use will also be preserved.
This feature could work well with Java client.
(cherry picked from commit e7389ed965cb712058c4ed1e8931d7563ca10bc8)
---
pulsar-client-cpp/python/examples/company.avsc | 4 +-
pulsar-client-cpp/python/pulsar/schema/__init__.py | 2 +-
.../python/pulsar/schema/definition.py | 18 ++--
pulsar-client-cpp/python/schema_test.py | 118 ++++++++++-----------
4 files changed, 74 insertions(+), 68 deletions(-)
diff --git a/pulsar-client-cpp/python/examples/company.avsc b/pulsar-client-cpp/python/examples/company.avsc
index cdb595f..5fb1860 100644
--- a/pulsar-client-cpp/python/examples/company.avsc
+++ b/pulsar-client-cpp/python/examples/company.avsc
@@ -14,6 +14,8 @@
{"name": "age", "type": ["null", "int"]}
]
}}]},
- {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
+ {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]},
+ {"name": "companyType", "type": ["null", {"type": "enum", "name": "CompanyType", "symbols":
+ ["companyType1", "companyType2", "companyType3"]}]}
]
}
\ No newline at end of file
diff --git a/pulsar-client-cpp/python/pulsar/schema/__init__.py b/pulsar-client-cpp/python/pulsar/schema/__init__.py
index 150629d..efa6806 100644
--- a/pulsar-client-cpp/python/pulsar/schema/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/schema/__init__.py
@@ -18,7 +18,7 @@
#
from .definition import Record, Field, Null, Boolean, Integer, Long, \
- Float, Double, Bytes, String, Array, Map
+ Float, Double, Bytes, String, Array, Map, CustomEnum
from .schema import Schema, BytesSchema, StringSchema, JsonSchema
from .schema_avro import AvroSchema
diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index fd778f3..9b6c861 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -44,8 +44,7 @@ class RecordMeta(type):
fields = OrderedDict()
for name, value in dct.items():
if issubclass(type(value), EnumMeta):
- # Wrap Python enums
- value = _Enum(value)
+ value = CustomEnum(value)
elif type(value) == RecordMeta:
# We expect an instance of a record rather than the class itself
value = value()
@@ -125,6 +124,12 @@ class Record(with_metaclass(RecordMeta, object)):
schema['namespace'] = cls._avro_namespace
schema['fields'] = []
+ def get_filed_default_value(value):
+ if isinstance(value, Enum):
+ return value.name
+ else:
+ return value
+
if cls._sorted_fields:
fields = sorted(cls._fields.keys())
else:
@@ -135,7 +140,7 @@ class Record(with_metaclass(RecordMeta, object)):
if field._required else ['null', field.schema_info(defined_names)]
schema['fields'].append({
'name': name,
- 'default': field.default(),
+ 'default': get_filed_default_value(field.default()),
'type': field_type
}) if field.required_default() else schema['fields'].append({
'name': name,
@@ -360,15 +365,16 @@ class String(Field):
# Complex types
-class _Enum(Field):
- def __init__(self, enum_type):
+
+class CustomEnum(Field):
+ def __init__(self, enum_type, default=None, required=False, required_default=False):
if not issubclass(enum_type, Enum):
raise Exception(enum_type + " is not a valid Enum type")
self.enum_type = enum_type
self.values = {}
for x in enum_type.__members__.values():
self.values[x.value] = x
- super(_Enum, self).__init__()
+ super(CustomEnum, self).__init__(default, required, required_default)
def type(self):
return 'enum'
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index d2554da..077f2bb 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -49,6 +49,7 @@ class SchemaTest(TestCase):
g = Double()
h = Bytes()
i = Map(String())
+ j = CustomEnum(Color)
fastavro.parse_schema(Example.schema())
self.assertEqual(Example.schema(), {
@@ -74,16 +75,23 @@ class SchemaTest(TestCase):
{"name": "i", "type": ["null", {
"type": "map",
"values": "string"}]
- },
+ },
+ {"name": "j", "type": ["null", "Color"]}
]
})
def test_complex(self):
+ class Color(Enum):
+ red = 1
+ green = 2
+ blue = 3
+
class MySubRecord(Record):
_sorted_fields = True
x = Integer()
y = Long()
z = String()
+ color = CustomEnum(Color)
class Example(Record):
_sorted_fields = True
@@ -101,9 +109,12 @@ class SchemaTest(TestCase):
"type": ["null", {
"name": "MySubRecord",
"type": "record",
- "fields": [{"name": "x", "type": ["null", "int"]},
- {"name": "y", "type": ["null", "long"]},
- {"name": "z", "type": ["null", "string"]}]
+ "fields": [
+ {'name': 'color', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols':
+ ['red', 'green', 'blue']}]},
+ {"name": "x", "type": ["null", "int"]},
+ {"name": "y", "type": ["null", "long"]},
+ {"name": "z", "type": ["null", "string"]}]
}]
},
{"name": "sub2",
@@ -630,6 +641,8 @@ class SchemaTest(TestCase):
class Example(Record):
name = String()
v = MyEnum
+ w = CustomEnum(MyEnum)
+ x = CustomEnum(MyEnum, required=True, default=MyEnum.A, required_default=True)
topic = 'my-json-enum-topic'
@@ -641,13 +654,15 @@ class SchemaTest(TestCase):
consumer = client.subscribe(topic, 'test',
schema=JsonSchema(Example))
- r = Example(name='test', v=MyEnum.C)
+ r = Example(name='test', v=MyEnum.C, w=MyEnum.B)
producer.send(r)
msg = consumer.receive()
self.assertEqual('test', msg.value().name)
self.assertEqual(MyEnum.C, MyEnum(msg.value().v))
+ self.assertEqual(MyEnum.B, MyEnum(msg.value().w))
+ self.assertEqual(MyEnum.A, MyEnum(msg.value().x))
client.close()
def test_avro_enum(self):
@@ -659,6 +674,8 @@ class SchemaTest(TestCase):
class Example(Record):
name = String()
v = MyEnum
+ w = CustomEnum(MyEnum)
+ x = CustomEnum(MyEnum, required=True, default=MyEnum.B, required_default=True)
topic = 'my-avro-enum-topic'
@@ -670,12 +687,14 @@ class SchemaTest(TestCase):
consumer = client.subscribe(topic, 'test',
schema=AvroSchema(Example))
- r = Example(name='test', v=MyEnum.C)
+ r = Example(name='test', v=MyEnum.C, w=MyEnum.A)
producer.send(r)
msg = consumer.receive()
msg.value()
self.assertEqual(MyEnum.C, msg.value().v)
+ self.assertEqual(MyEnum.A, MyEnum(msg.value().w))
+ self.assertEqual(MyEnum.B, MyEnum(msg.value().x))
client.close()
def test_avro_map_array(self):
@@ -913,6 +932,11 @@ class SchemaTest(TestCase):
client.close()
def test_serialize_schema_complex(self):
+ class Color(Enum):
+ red = 1
+ green = 2
+ blue = 3
+
class NestedObj1(Record):
_sorted_fields = True
na1 = String()
@@ -925,6 +949,8 @@ class SchemaTest(TestCase):
nc2 = NestedObj1()
class NestedObj3(Record):
+ _sorted_fields = True
+ color = CustomEnum(Color)
na3 = Integer()
class NestedObj4(Record):
@@ -933,11 +959,6 @@ class SchemaTest(TestCase):
na4 = String()
nb4 = Integer()
- class Color(Enum):
- red = 1
- green = 2
- blue = 3
-
class ComplexRecord(Record):
_avro_namespace = 'xxx.xxx'
_sorted_fields = True
@@ -945,6 +966,7 @@ class SchemaTest(TestCase):
b = Integer()
color = Color
color2 = Color
+ color3 = CustomEnum(Color, required=True, default=Color.red, required_default=True)
nested = NestedObj2()
nested2 = NestedObj2()
mapNested = Map(NestedObj3())
@@ -970,8 +992,10 @@ class SchemaTest(TestCase):
{'name': 'color', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols': [
'red', 'green', 'blue']}]},
{'name': 'color2', 'type': ['null', 'Color']},
+ {'name': 'color3', 'default': 'red', 'type': 'Color'},
{'name': 'mapNested', 'type': ['null', {'type': 'map', 'values':
{'name': 'NestedObj3', 'type': 'record', 'fields': [
+ {'name': 'color', 'type': ['null', 'Color']},
{'name': 'na3', 'type': ['null', 'int']}
]}}
]},
@@ -998,12 +1022,12 @@ class SchemaTest(TestCase):
r = ComplexRecord(a=1, b=2, color=Color.red, color2=Color.blue,
nested=nested_obj2, nested2=nested_obj2,
mapNested={
- 'a': NestedObj3(na3=1),
+ 'a': NestedObj3(na3=1, color=Color.green),
'b': NestedObj3(na3=2),
- 'c': NestedObj3(na3=3)
+ 'c': NestedObj3(na3=3, color=Color.red)
}, mapNested2={
- 'd': NestedObj3(na3=4),
- 'e': NestedObj3(na3=5),
+ 'd': NestedObj3(na3=4, color=Color.red),
+ 'e': NestedObj3(na3=5, color=Color.blue),
'f': NestedObj3(na3=6)
}, arrayNested=[
NestedObj4(na4='value na4 1', nb4=100),
@@ -1017,32 +1041,9 @@ class SchemaTest(TestCase):
data_decode = data_schema.decode(data_encode)
self.assertEqual(data_decode.__class__.__name__, 'ComplexRecord')
self.assertEqual(data_decode, r)
- self.assertEqual(data_decode.a, 1)
- self.assertEqual(data_decode.b, 2)
- self.assertEqual(data_decode.color, Color.red)
- self.assertEqual(data_decode.color2, Color.blue)
- self.assertEqual(data_decode.nested.na2, 22)
- self.assertEqual(data_decode.nested.nb2, True)
- self.assertEqual(data_decode.nested.nc2.na1, 'na1 value')
- self.assertEqual(data_decode.nested.nc2.nb1, 20.5)
- self.assertEqual(data_decode.nested2.na2, 22)
- self.assertEqual(data_decode.nested2.nb2, True)
- self.assertEqual(data_decode.nested2.nc2.na1, 'na1 value')
- self.assertEqual(data_decode.nested2.nc2.nb1, 20.5)
- self.assertEqual(data_decode.mapNested['a'].na3, 1)
- self.assertEqual(data_decode.mapNested['b'].na3, 2)
- self.assertEqual(data_decode.mapNested['c'].na3, 3)
- self.assertEqual(data_decode.mapNested2['d'].na3, 4)
- self.assertEqual(data_decode.mapNested2['e'].na3, 5)
- self.assertEqual(data_decode.mapNested2['f'].na3, 6)
- self.assertEqual(data_decode.arrayNested[0].na4, 'value na4 1')
- self.assertEqual(data_decode.arrayNested[0].nb4, 100)
- self.assertEqual(data_decode.arrayNested[1].na4, 'value na4 2')
- self.assertEqual(data_decode.arrayNested[1].nb4, 200)
- self.assertEqual(data_decode.arrayNested2[0].na4, 'value na4 3')
- self.assertEqual(data_decode.arrayNested2[0].nb4, 300)
- self.assertEqual(data_decode.arrayNested2[1].na4, 'value na4 4')
- self.assertEqual(data_decode.arrayNested2[1].nb4, 400)
+ self.assertEqual(r.color3, Color.red)
+ self.assertEqual(r.mapNested['a'].color, Color.green)
+ self.assertEqual(r.mapNested['b'].color, None)
print('Encode and decode complex schema finish. schema_type: ', schema_type)
encode_and_decode('avro')
@@ -1069,8 +1070,12 @@ class SchemaTest(TestCase):
self.assertEqual(data_decode.na2, 1)
self.assertTrue(data_decode.nb2)
-
def test_produce_and_consume_complex_schema_data(self):
+ class Color(Enum):
+ red = 1
+ green = 2
+ blue = 3
+
class NestedObj1(Record):
na1 = String()
nb1 = Double()
@@ -1082,6 +1087,7 @@ class SchemaTest(TestCase):
class NestedObj3(Record):
na3 = Integer()
+ color = CustomEnum(Color, required=True, required_default=True, default=Color.blue)
class NestedObj4(Record):
na4 = String()
@@ -1090,6 +1096,7 @@ class SchemaTest(TestCase):
class ComplexRecord(Record):
a = Integer()
b = Integer()
+ color = CustomEnum(Color)
nested = NestedObj2()
mapNested = Map(NestedObj3())
arrayNested = Array(NestedObj4())
@@ -1112,8 +1119,8 @@ class SchemaTest(TestCase):
nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
r = ComplexRecord(a=1, b=2, nested=nested_obj2, mapNested={
- 'a': NestedObj3(na3=1),
- 'b': NestedObj3(na3=2),
+ 'a': NestedObj3(na3=1, color=Color.red),
+ 'b': NestedObj3(na3=2, color=Color.green),
'c': NestedObj3(na3=3)
}, arrayNested=[
NestedObj4(na4='value na4 1', nb4=100),
@@ -1125,19 +1132,6 @@ class SchemaTest(TestCase):
value = msg.value()
self.assertEqual(value.__class__.__name__, 'ComplexRecord')
self.assertEqual(value, r)
- self.assertEqual(value.a, 1)
- self.assertEqual(value.b, 2)
- self.assertEqual(value.nested.na2, 22)
- self.assertEqual(value.nested.nb2, True)
- self.assertEqual(value.nested.nc2.na1, 'na1 value')
- self.assertEqual(value.nested.nc2.nb1, 20.5)
- self.assertEqual(value.mapNested['a'].na3, 1)
- self.assertEqual(value.mapNested['b'].na3, 2)
- self.assertEqual(value.mapNested['c'].na3, 3)
- self.assertEqual(value.arrayNested[0].na4, 'value na4 1')
- self.assertEqual(value.arrayNested[0].nb4, 100)
- self.assertEqual(value.arrayNested[1].na4, 'value na4 2')
- self.assertEqual(value.arrayNested[1].nb4, 200)
print('Produce and consume complex schema data finish. schema_type', schema_type)
@@ -1163,7 +1157,8 @@ class SchemaTest(TestCase):
"industry": "software",
"scale": ">100",
"funds": "1000000.0"
- }
+ },
+ "companyType": "companyType1"
}
data = avro_schema.encode(company)
company_decode = avro_schema.decode(data)
@@ -1185,7 +1180,9 @@ class SchemaTest(TestCase):
{'name': 'age', 'type': ['null', 'int']}
]
}}]},
- {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]}
+ {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]},
+ {'name': 'companyType', 'type': ['null', {'type': 'enum', 'name': 'CompanyType', 'symbols':
+ ['companyType1', 'companyType2', 'companyType3']}]}
]
}
encode_and_decode(schema_definition)
@@ -1218,7 +1215,8 @@ class SchemaTest(TestCase):
"industry": "software" + str(i),
"scale": ">100",
"funds": "1000000.0"
- }
+ },
+ "companyType": "companyType" + str((i % 3) + 1)
}
producer.send(company)