You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 16:30:13 UTC

[pulsar] 02/19: [Python Schema] Python schema support custom Avro configurations for Enum type (#12642)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 736dd6a853d724f652c4aa945076e40384728abd
Author: ran <ga...@126.com>
AuthorDate: Sun Nov 7 21:24:33 2021 +0800

    [Python Schema] Python schema support custom Avro configurations for Enum type (#12642)
    
    ### Motivation
    
    Currently, the Python client didn't support setting configurations `required`, `default`, `required_default` for Enum type in Record.
    
    ### Modifications
    
    Modify the `_Enum` class to `CustomEnum` class, the `_Enum` wasn't exposed to users, the new class `CustomEnum` will be exposed to users, they could set Avro definition configurations `required`, `default`, `required_default`.
    
    ### How to use
    
    ```
    class Color(Enum):
        red = 1
        green = 2
        blue = 3
    
    class NestedObj(Record):
        a = Integer()
        color = CustomEnum(Color, required_default=True, default=Color.blue)
    ```
    
    The schema definition will be like this
    ```
    {
    	'type': 'record',
    	'name': 'NestedObj',
    	'fields': [
    		{'name': 'a', 'type': ['null', 'int']},
    		{'name': 'color', 'default': 'blue', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols': ['red', 'green', 'blue']}]}
    	]
    }
    ```
    
    The old way of use will also be preserved.
    
    This feature could work well with Java client.
    
    (cherry picked from commit e7389ed965cb712058c4ed1e8931d7563ca10bc8)
---
 pulsar-client-cpp/python/examples/company.avsc     |   4 +-
 pulsar-client-cpp/python/pulsar/schema/__init__.py |   2 +-
 .../python/pulsar/schema/definition.py             |  18 ++--
 pulsar-client-cpp/python/schema_test.py            | 118 ++++++++++-----------
 4 files changed, 74 insertions(+), 68 deletions(-)

diff --git a/pulsar-client-cpp/python/examples/company.avsc b/pulsar-client-cpp/python/examples/company.avsc
index cdb595f..5fb1860 100644
--- a/pulsar-client-cpp/python/examples/company.avsc
+++ b/pulsar-client-cpp/python/examples/company.avsc
@@ -14,6 +14,8 @@
                 {"name": "age", "type": ["null", "int"]}
             ]
         }}]},
-        {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
+        {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]},
+        {"name": "companyType", "type": ["null", {"type": "enum", "name": "CompanyType", "symbols":
+                    ["companyType1", "companyType2", "companyType3"]}]}
     ]
 }
\ No newline at end of file
diff --git a/pulsar-client-cpp/python/pulsar/schema/__init__.py b/pulsar-client-cpp/python/pulsar/schema/__init__.py
index 150629d..efa6806 100644
--- a/pulsar-client-cpp/python/pulsar/schema/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/schema/__init__.py
@@ -18,7 +18,7 @@
 #
 
 from .definition import Record, Field, Null, Boolean, Integer, Long, \
-            Float, Double, Bytes, String, Array, Map
+            Float, Double, Bytes, String, Array, Map, CustomEnum
 
 from .schema import Schema, BytesSchema, StringSchema, JsonSchema
 from .schema_avro import AvroSchema
diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index fd778f3..9b6c861 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -44,8 +44,7 @@ class RecordMeta(type):
         fields = OrderedDict()
         for name, value in dct.items():
             if issubclass(type(value), EnumMeta):
-                # Wrap Python enums
-                value = _Enum(value)
+                value = CustomEnum(value)
             elif type(value) == RecordMeta:
                 # We expect an instance of a record rather than the class itself
                 value = value()
@@ -125,6 +124,12 @@ class Record(with_metaclass(RecordMeta, object)):
             schema['namespace'] = cls._avro_namespace
         schema['fields'] = []
 
+        def get_filed_default_value(value):
+            if isinstance(value, Enum):
+                return value.name
+            else:
+                return value
+
         if cls._sorted_fields:
             fields = sorted(cls._fields.keys())
         else:
@@ -135,7 +140,7 @@ class Record(with_metaclass(RecordMeta, object)):
                 if field._required else ['null', field.schema_info(defined_names)]
             schema['fields'].append({
                 'name': name,
-                'default': field.default(),
+                'default': get_filed_default_value(field.default()),
                 'type': field_type
             }) if field.required_default() else schema['fields'].append({
                 'name': name,
@@ -360,15 +365,16 @@ class String(Field):
 
 # Complex types
 
-class _Enum(Field):
-    def __init__(self, enum_type):
+
+class CustomEnum(Field):
+    def __init__(self, enum_type, default=None, required=False, required_default=False):
         if not issubclass(enum_type, Enum):
             raise Exception(enum_type + " is not a valid Enum type")
         self.enum_type = enum_type
         self.values = {}
         for x in enum_type.__members__.values():
             self.values[x.value] = x
-        super(_Enum, self).__init__()
+        super(CustomEnum, self).__init__(default, required, required_default)
 
     def type(self):
         return 'enum'
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index d2554da..077f2bb 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -49,6 +49,7 @@ class SchemaTest(TestCase):
             g = Double()
             h = Bytes()
             i = Map(String())
+            j = CustomEnum(Color)
 
         fastavro.parse_schema(Example.schema())
         self.assertEqual(Example.schema(), {
@@ -74,16 +75,23 @@ class SchemaTest(TestCase):
                 {"name": "i", "type": ["null", {
                     "type": "map",
                     "values": "string"}]
-                 },
+                },
+                {"name": "j", "type": ["null", "Color"]}
             ]
         })
 
     def test_complex(self):
+        class Color(Enum):
+            red = 1
+            green = 2
+            blue = 3
+
         class MySubRecord(Record):
             _sorted_fields = True
             x = Integer()
             y = Long()
             z = String()
+            color = CustomEnum(Color)
 
         class Example(Record):
             _sorted_fields = True
@@ -101,9 +109,12 @@ class SchemaTest(TestCase):
                  "type": ["null", {
                      "name": "MySubRecord",
                      "type": "record",
-                     "fields": [{"name": "x", "type": ["null", "int"]},
-                                {"name": "y", "type": ["null", "long"]},
-                                {"name": "z", "type": ["null", "string"]}]
+                     "fields": [
+                        {'name': 'color', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols':
+                            ['red', 'green', 'blue']}]},
+                        {"name": "x", "type": ["null", "int"]},
+                        {"name": "y", "type": ["null", "long"]},
+                        {"name": "z", "type": ["null", "string"]}]
                  }]
                  },
                  {"name": "sub2",
@@ -630,6 +641,8 @@ class SchemaTest(TestCase):
         class Example(Record):
             name = String()
             v = MyEnum
+            w = CustomEnum(MyEnum)
+            x = CustomEnum(MyEnum, required=True, default=MyEnum.A, required_default=True)
 
         topic = 'my-json-enum-topic'
 
@@ -641,13 +654,15 @@ class SchemaTest(TestCase):
         consumer = client.subscribe(topic, 'test',
                                     schema=JsonSchema(Example))
 
-        r = Example(name='test', v=MyEnum.C)
+        r = Example(name='test', v=MyEnum.C, w=MyEnum.B)
         producer.send(r)
 
         msg = consumer.receive()
 
         self.assertEqual('test', msg.value().name)
         self.assertEqual(MyEnum.C, MyEnum(msg.value().v))
+        self.assertEqual(MyEnum.B, MyEnum(msg.value().w))
+        self.assertEqual(MyEnum.A, MyEnum(msg.value().x))
         client.close()
 
     def test_avro_enum(self):
@@ -659,6 +674,8 @@ class SchemaTest(TestCase):
         class Example(Record):
             name = String()
             v = MyEnum
+            w = CustomEnum(MyEnum)
+            x = CustomEnum(MyEnum, required=True, default=MyEnum.B, required_default=True)
 
         topic = 'my-avro-enum-topic'
 
@@ -670,12 +687,14 @@ class SchemaTest(TestCase):
         consumer = client.subscribe(topic, 'test',
                                     schema=AvroSchema(Example))
 
-        r = Example(name='test', v=MyEnum.C)
+        r = Example(name='test', v=MyEnum.C, w=MyEnum.A)
         producer.send(r)
 
         msg = consumer.receive()
         msg.value()
         self.assertEqual(MyEnum.C, msg.value().v)
+        self.assertEqual(MyEnum.A, MyEnum(msg.value().w))
+        self.assertEqual(MyEnum.B, MyEnum(msg.value().x))
         client.close()
 
     def test_avro_map_array(self):
@@ -913,6 +932,11 @@ class SchemaTest(TestCase):
         client.close()
 
     def test_serialize_schema_complex(self):
+        class Color(Enum):
+            red = 1
+            green = 2
+            blue = 3
+
         class NestedObj1(Record):
             _sorted_fields = True
             na1 = String()
@@ -925,6 +949,8 @@ class SchemaTest(TestCase):
             nc2 = NestedObj1()
 
         class NestedObj3(Record):
+            _sorted_fields = True
+            color = CustomEnum(Color)
             na3 = Integer()
 
         class NestedObj4(Record):
@@ -933,11 +959,6 @@ class SchemaTest(TestCase):
             na4 = String()
             nb4 = Integer()
 
-        class Color(Enum):
-            red = 1
-            green = 2
-            blue = 3
-
         class ComplexRecord(Record):
             _avro_namespace = 'xxx.xxx'
             _sorted_fields = True
@@ -945,6 +966,7 @@ class SchemaTest(TestCase):
             b = Integer()
             color = Color
             color2 = Color
+            color3 = CustomEnum(Color, required=True, default=Color.red, required_default=True)
             nested = NestedObj2()
             nested2 = NestedObj2()
             mapNested = Map(NestedObj3())
@@ -970,8 +992,10 @@ class SchemaTest(TestCase):
                 {'name': 'color', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols': [
                     'red', 'green', 'blue']}]},
                 {'name': 'color2', 'type': ['null', 'Color']},
+                {'name': 'color3', 'default': 'red', 'type': 'Color'},
                 {'name': 'mapNested', 'type': ['null', {'type': 'map', 'values':
                     {'name': 'NestedObj3', 'type': 'record', 'fields': [
+                        {'name': 'color', 'type': ['null', 'Color']},
                         {'name': 'na3', 'type': ['null', 'int']}
                     ]}}
                 ]},
@@ -998,12 +1022,12 @@ class SchemaTest(TestCase):
             r = ComplexRecord(a=1, b=2, color=Color.red, color2=Color.blue,
                               nested=nested_obj2, nested2=nested_obj2,
             mapNested={
-                'a': NestedObj3(na3=1),
+                'a': NestedObj3(na3=1, color=Color.green),
                 'b': NestedObj3(na3=2),
-                'c': NestedObj3(na3=3)
+                'c': NestedObj3(na3=3, color=Color.red)
             }, mapNested2={
-                'd': NestedObj3(na3=4),
-                'e': NestedObj3(na3=5),
+                'd': NestedObj3(na3=4, color=Color.red),
+                'e': NestedObj3(na3=5, color=Color.blue),
                 'f': NestedObj3(na3=6)
             }, arrayNested=[
                 NestedObj4(na4='value na4 1', nb4=100),
@@ -1017,32 +1041,9 @@ class SchemaTest(TestCase):
             data_decode = data_schema.decode(data_encode)
             self.assertEqual(data_decode.__class__.__name__, 'ComplexRecord')
             self.assertEqual(data_decode, r)
-            self.assertEqual(data_decode.a, 1)
-            self.assertEqual(data_decode.b, 2)
-            self.assertEqual(data_decode.color, Color.red)
-            self.assertEqual(data_decode.color2, Color.blue)
-            self.assertEqual(data_decode.nested.na2, 22)
-            self.assertEqual(data_decode.nested.nb2, True)
-            self.assertEqual(data_decode.nested.nc2.na1, 'na1 value')
-            self.assertEqual(data_decode.nested.nc2.nb1, 20.5)
-            self.assertEqual(data_decode.nested2.na2, 22)
-            self.assertEqual(data_decode.nested2.nb2, True)
-            self.assertEqual(data_decode.nested2.nc2.na1, 'na1 value')
-            self.assertEqual(data_decode.nested2.nc2.nb1, 20.5)
-            self.assertEqual(data_decode.mapNested['a'].na3, 1)
-            self.assertEqual(data_decode.mapNested['b'].na3, 2)
-            self.assertEqual(data_decode.mapNested['c'].na3, 3)
-            self.assertEqual(data_decode.mapNested2['d'].na3, 4)
-            self.assertEqual(data_decode.mapNested2['e'].na3, 5)
-            self.assertEqual(data_decode.mapNested2['f'].na3, 6)
-            self.assertEqual(data_decode.arrayNested[0].na4, 'value na4 1')
-            self.assertEqual(data_decode.arrayNested[0].nb4, 100)
-            self.assertEqual(data_decode.arrayNested[1].na4, 'value na4 2')
-            self.assertEqual(data_decode.arrayNested[1].nb4, 200)
-            self.assertEqual(data_decode.arrayNested2[0].na4, 'value na4 3')
-            self.assertEqual(data_decode.arrayNested2[0].nb4, 300)
-            self.assertEqual(data_decode.arrayNested2[1].na4, 'value na4 4')
-            self.assertEqual(data_decode.arrayNested2[1].nb4, 400)
+            self.assertEqual(r.color3, Color.red)
+            self.assertEqual(r.mapNested['a'].color, Color.green)
+            self.assertEqual(r.mapNested['b'].color, None)
             print('Encode and decode complex schema finish. schema_type: ', schema_type)
 
         encode_and_decode('avro')
@@ -1069,8 +1070,12 @@ class SchemaTest(TestCase):
         self.assertEqual(data_decode.na2, 1)
         self.assertTrue(data_decode.nb2)
 
-
     def test_produce_and_consume_complex_schema_data(self):
+        class Color(Enum):
+            red = 1
+            green = 2
+            blue = 3
+
         class NestedObj1(Record):
             na1 = String()
             nb1 = Double()
@@ -1082,6 +1087,7 @@ class SchemaTest(TestCase):
 
         class NestedObj3(Record):
             na3 = Integer()
+            color = CustomEnum(Color, required=True, required_default=True, default=Color.blue)
 
         class NestedObj4(Record):
             na4 = String()
@@ -1090,6 +1096,7 @@ class SchemaTest(TestCase):
         class ComplexRecord(Record):
             a = Integer()
             b = Integer()
+            color = CustomEnum(Color)
             nested = NestedObj2()
             mapNested = Map(NestedObj3())
             arrayNested = Array(NestedObj4())
@@ -1112,8 +1119,8 @@ class SchemaTest(TestCase):
             nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
             nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
             r = ComplexRecord(a=1, b=2, nested=nested_obj2, mapNested={
-                'a': NestedObj3(na3=1),
-                'b': NestedObj3(na3=2),
+                'a': NestedObj3(na3=1, color=Color.red),
+                'b': NestedObj3(na3=2, color=Color.green),
                 'c': NestedObj3(na3=3)
             }, arrayNested=[
                 NestedObj4(na4='value na4 1', nb4=100),
@@ -1125,19 +1132,6 @@ class SchemaTest(TestCase):
             value = msg.value()
             self.assertEqual(value.__class__.__name__, 'ComplexRecord')
             self.assertEqual(value, r)
-            self.assertEqual(value.a, 1)
-            self.assertEqual(value.b, 2)
-            self.assertEqual(value.nested.na2, 22)
-            self.assertEqual(value.nested.nb2, True)
-            self.assertEqual(value.nested.nc2.na1, 'na1 value')
-            self.assertEqual(value.nested.nc2.nb1, 20.5)
-            self.assertEqual(value.mapNested['a'].na3, 1)
-            self.assertEqual(value.mapNested['b'].na3, 2)
-            self.assertEqual(value.mapNested['c'].na3, 3)
-            self.assertEqual(value.arrayNested[0].na4, 'value na4 1')
-            self.assertEqual(value.arrayNested[0].nb4, 100)
-            self.assertEqual(value.arrayNested[1].na4, 'value na4 2')
-            self.assertEqual(value.arrayNested[1].nb4, 200)
 
             print('Produce and consume complex schema data finish. schema_type', schema_type)
 
@@ -1163,7 +1157,8 @@ class SchemaTest(TestCase):
                     "industry": "software",
                     "scale": ">100",
                     "funds": "1000000.0"
-                }
+                },
+                "companyType": "companyType1"
             }
             data = avro_schema.encode(company)
             company_decode = avro_schema.decode(data)
@@ -1185,7 +1180,9 @@ class SchemaTest(TestCase):
                         {'name': 'age', 'type': ['null', 'int']}
                     ]
                 }}]},
-                {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]}
+                {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]},
+                {'name': 'companyType', 'type': ['null', {'type': 'enum', 'name': 'CompanyType', 'symbols':
+                    ['companyType1', 'companyType2', 'companyType3']}]}
             ]
         }
         encode_and_decode(schema_definition)
@@ -1218,7 +1215,8 @@ class SchemaTest(TestCase):
                         "industry": "software" + str(i),
                         "scale": ">100",
                         "funds": "1000000.0"
-                    }
+                    },
+                    "companyType": "companyType" + str((i % 3) + 1)
                 }
                 producer.send(company)