You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2021/04/29 09:29:35 UTC

[pulsar] 02/02: [Python schema] Support python avro schema set default value. (#10265)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9a72fe4fa137e369aa27150e46c41b67041de022
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Mon Apr 19 23:11:36 2021 +0800

    [Python schema] Support python avro schema set default value. (#10265)
    
    ## Motivation
    now python avro schema don't support set default value, it led to the python schema can't update.
    ## implement
    1. add `required` field to control the type of schema can set `null`.
    2. add `required_default` filed to control to control the schema has default attribute wether or not.
    3. add `default` field to control the default value of schema.
    
    (cherry picked from commit d6d0e3a88e5569b16b4c0b9cdbd845d5c07268e0)
---
 .../python/pulsar/schema/definition.py             |  89 ++++++++---
 pulsar-client-cpp/python/pulsar/schema/schema.py   |   4 +
 pulsar-client-cpp/python/schema_test.py            | 169 ++++++++++++++++++++-
 3 files changed, 234 insertions(+), 28 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index 7853d07..d46cf3c 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -56,10 +56,10 @@ class RecordMeta(type):
 
 class Record(with_metaclass(RecordMeta, object)):
 
-    def __init__(self, *args, **kwargs):
-        if args:
-            # Only allow keyword args
-            raise TypeError('Non-keyword arguments not allowed when initializing Records')
+    def __init__(self, default=None, required_default=False, required=False, *args, **kwargs):
+        self._required_default = required_default
+        self._default = default
+        self._required = required
 
         for k, value in self._fields.items():
             if k in kwargs:
@@ -85,18 +85,30 @@ class Record(with_metaclass(RecordMeta, object)):
             field_type = field.schema() if field._required else ['null', field.schema()]
             schema['fields'].append({
                 'name': name,
-                'type': field_type
+                'type': field_type,
+                'default': field.default()
+            }) if field.required_default() else schema['fields'].append({
+                'name': name,
+                'type': field_type,
             })
+
         return schema
 
     def __setattr__(self, key, value):
-        if key not in self._fields:
-            raise AttributeError('Cannot set undeclared field ' + key + ' on record')
+        if key == '_default':
+            super(Record, self).__setattr__(key, value)
+        elif key == '_required_default':
+            super(Record, self).__setattr__(key, value)
+        elif key == '_required':
+            super(Record, self).__setattr__(key, value)
+        else:
+            if key not in self._fields:
+                raise AttributeError('Cannot set undeclared field ' + key + ' on record')
 
-        # Check that type of value matches the field type
-        field = self._fields[key]
-        value = field.validate_type(key, value)
-        super(Record, self).__setattr__(key, value)
+            # Check that type of value matches the field type
+            field = self._fields[key]
+            value = field.validate_type(key, value)
+            super(Record, self).__setattr__(key, value)
 
     def __eq__(self, other):
         for field in self._fields:
@@ -116,12 +128,22 @@ class Record(with_metaclass(RecordMeta, object)):
                 type(val), name, self.__class__))
         return val
 
+    def default(self):
+        if self._default is not None:
+            return self._default
+        else:
+            return None
+
+    def required_default(self):
+        return self._required_default
+
 
 class Field(object):
-    def __init__(self, default=None, required=False):
+    def __init__(self, default=None, required=False, required_default=False):
         if default is not None:
             default = self.validate_type('default', default)
         self._default = default
+        self._required_default = required_default
         self._required = required
 
     @abstractmethod
@@ -144,6 +166,10 @@ class Field(object):
     def default(self):
         return self._default
 
+    def required_default(self):
+        return self._required_default
+
+
 # All types
 
 
@@ -185,7 +211,7 @@ class Integer(Field):
         if self._default is not None:
             return self._default
         else:
-            return 0
+            return None
 
 
 class Long(Field):
@@ -199,7 +225,7 @@ class Long(Field):
         if self._default is not None:
             return self._default
         else:
-            return 0
+            return None
 
 
 class Float(Field):
@@ -213,7 +239,7 @@ class Float(Field):
         if self._default is not None:
             return self._default
         else:
-            return 0.0
+            return None
 
 
 class Double(Field):
@@ -227,7 +253,7 @@ class Double(Field):
         if self._default is not None:
             return self._default
         else:
-            return 0.0
+            return None
 
 
 class Bytes(Field):
@@ -241,7 +267,7 @@ class Bytes(Field):
         if self._default is not None:
             return self._default
         else:
-            return bytes('')
+            return None
 
 
 class String(Field):
@@ -261,7 +287,8 @@ class String(Field):
         if self._default is not None:
             return self._default
         else:
-            return str('')
+            return None
+
 
 # Complex types
 
@@ -309,12 +336,18 @@ class _Enum(Field):
             'symbols': [x.name for x in self.enum_type]
         }
 
+    def default(self):
+        if self._default is not None:
+            return self._default
+        else:
+            return None
+
 
 class Array(Field):
-    def __init__(self, array_type):
+    def __init__(self, array_type, default=None, required=False, required_default=False):
         _check_record_or_field(array_type)
         self.array_type = array_type
-        super(Array, self).__init__()
+        super(Array, self).__init__(default=default, required=required, required_default=required_default)
 
     def type(self):
         return 'array'
@@ -338,12 +371,18 @@ class Array(Field):
                 else self.array_type.type()
         }
 
+    def default(self):
+        if self._default is not None:
+            return self._default
+        else:
+            return None
+
 
 class Map(Field):
-    def __init__(self, value_type):
+    def __init__(self, value_type, default=None, required=False, required_default=False):
         _check_record_or_field(value_type)
         self.value_type = value_type
-        super(Map, self).__init__()
+        super(Map, self).__init__(default=default, required=required, required_default=required_default)
 
     def type(self):
         return 'map'
@@ -370,6 +409,12 @@ class Map(Field):
                 else self.value_type.type()
         }
 
+    def default(self):
+        if self._default is not None:
+            return self._default
+        else:
+            return None
+
 
 # Python3 has no `unicode` type, so here we use a tricky way to check if the type of `x` is `unicode` in Python2
 # and also make it work well with Python3.
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema.py b/pulsar-client-cpp/python/pulsar/schema/schema.py
index 5f69ea2..d0da91a 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema.py
@@ -87,6 +87,10 @@ class JsonSchema(Schema):
 
     def encode(self, obj):
         self._validate_object_type(obj)
+        del obj.__dict__['_default']
+        del obj.__dict__['_required']
+        del obj.__dict__['_required_default']
+
         return json.dumps(obj.__dict__, default=self._get_serialized_value, indent=True).encode('utf-8')
 
     def decode(self, data):
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index a86824c..a0d60c0 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -180,9 +180,6 @@ class SchemaTest(TestCase):
             # Expected
             pass
 
-        try:
-            Record('xyz', a=1, b=2)
-            self.fail('Should have failed')
         except TypeError:
             # Expected
             pass
@@ -410,7 +407,7 @@ class SchemaTest(TestCase):
 
         r = Example()
         self.assertEqual(r.a, 5)
-        self.assertEqual(r.b, 0)
+        self.assertEqual(r.b, None)
         self.assertEqual(r.c, 'hello')
 
     ####
@@ -667,11 +664,171 @@ class SchemaTest(TestCase):
 
         client.close()
 
+    def test_avro_required_default(self):
+        class MySubRecord(Record):
+            x = Integer()
+            y = Long()
+            z = String()
+
+        class Example(Record):
+            a = Integer()
+            b = Boolean(required=True)
+            c = Long()
+            d = Float()
+            e = Double()
+            f = String()
+            g = Bytes()
+            h = Array(String())
+            i = Map(String())
+            j = MySubRecord()
+
+        class ExampleRequiredDefault(Record):
+            a = Integer(required_default=True)
+            b = Boolean(required=True, required_default=True)
+            c = Long(required_default=True)
+            d = Float(required_default=True)
+            e = Double(required_default=True)
+            f = String(required_default=True)
+            g = Bytes(required_default=True)
+            h = Array(String(), required_default=True)
+            i = Map(String(), required_default=True)
+            j = MySubRecord(required_default=True)
+        self.assertEqual(ExampleRequiredDefault.schema(), {
+                "name": "ExampleRequiredDefault",
+                "type": "record",
+                "fields": [
+                    {
+                        "name": "a",
+                        "type": [
+                            "null",
+                            "int"
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "b",
+                        "type": "boolean",
+                        "default": False
+                    },
+                    {
+                        "name": "c",
+                        "type": [
+                            "null",
+                            "long"
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "d",
+                        "type": [
+                            "null",
+                            "float"
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "e",
+                        "type": [
+                            "null",
+                            "double"
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "f",
+                        "type": [
+                            "null",
+                            "string"
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "g",
+                        "type": [
+                            "null",
+                            "bytes"
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "h",
+                        "type": [
+                            "null",
+                            {
+                                "type": "array",
+                                "items": "string"
+                            }
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "i",
+                        "type": [
+                            "null",
+                            {
+                                "type": "map",
+                                "values": "string"
+                            }
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "j",
+                        "type": [
+                            "null",
+                            {
+                                "name": "MySubRecord",
+                                "type": "record",
+                                "fields": [
+                                    {
+                                        "name": "x",
+                                        "type": [
+                                            "null",
+                                            "int"
+                                        ]
+                                    },
+                                    {
+                                        "name": "y",
+                                        "type": [
+                                            "null",
+                                            "long"
+                                        ],
+                                    },
+                                    {
+                                        "name": "z",
+                                        "type": [
+                                            "null",
+                                            "string"
+                                        ]
+                                    }
+                                ]
+                            }
+                        ],
+                        "default": None
+                    }
+                ]
+            })
+
+        client = pulsar.Client(self.serviceUrl)
+        producer = client.create_producer(
+            'my-avro-python-default-topic',
+            schema=AvroSchema(Example))
+
+        producer_default = client.create_producer(
+            'my-avro-python-default-topic',
+            schema=AvroSchema(ExampleRequiredDefault))
+
+        producer.close()
+        producer_default.close()
+
+        client.close()
+
+
     def test_default_value(self):
         class MyRecord(Record):
             A = Integer()
             B = String()
-            C = Boolean()
+            C = Boolean(default=True, required=True)
             D = Double(default=6.4)
 
         topic = "my-default-value-topic"
@@ -689,7 +846,7 @@ class SchemaTest(TestCase):
         msg = consumer.receive()
         self.assertEqual(msg.value().A, 5)
         self.assertEqual(msg.value().B, u'text')
-        self.assertEqual(msg.value().C, False)
+        self.assertEqual(msg.value().C, True)
         self.assertEqual(msg.value().D, 6.4)
 
         producer.close()