You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2021/04/29 09:29:33 UTC

[pulsar] branch branch-2.7 updated (af6c2fa -> 9a72fe4)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from af6c2fa  Add underReplicate state in the topic internal stats (#10013)
     new bf81fbf  [Python] Fix nested Map or Array in schema doesn't work (#9548)
     new 9a72fe4  [Python schema] Support python avro schema set default value. (#10265)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../python/pulsar/schema/definition.py             | 101 +++++++---
 pulsar-client-cpp/python/pulsar/schema/schema.py   |   4 +
 pulsar-client-cpp/python/schema_test.py            | 212 ++++++++++++++++++++-
 3 files changed, 286 insertions(+), 31 deletions(-)

[pulsar] 01/02: [Python] Fix nested Map or Array in schema doesn't work (#9548)

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bf81fbf79e3222db01e6105b6330960c52ab42f2
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Feb 12 02:10:11 2021 +0800

    [Python] Fix nested Map or Array in schema doesn't work (#9548)
    
    * [Python] Fix nested Map or Array type in schema not work
    
    * Add tests for nested Map or Array
    
    * Refactor tests
    
    * Add tests
    
    * Remove repeated tests
    
    * Remove print to avoid Python2 CI failure
    
    * Fix validate failure
    
    * Fix exceptions thrown in validate
    
    * Use a function to check unicode type
    
    (cherry picked from commit 84cfb3affa1ce5bc1db29baab5498205049019dd)
---
 .../python/pulsar/schema/definition.py             | 12 ++++--
 pulsar-client-cpp/python/schema_test.py            | 43 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index 7075b36..7853d07 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -334,7 +334,7 @@ class Array(Field):
     def schema(self):
         return {
             'type': self.type(),
-            'items': self.array_type.schema() if isinstance(self.array_type, Record) 
+            'items': self.array_type.schema() if isinstance(self.array_type, (Array, Map, Record))
                 else self.array_type.type()
         }
 
@@ -355,7 +355,7 @@ class Map(Field):
         super(Map, self).validate_type(name, val)
 
         for k, v in val.items():
-            if type(k) != str:
+            if type(k) != str and not is_unicode(k):
                 raise TypeError('Map keys for field ' + name + '  should all be strings')
             if type(v) != self.value_type.python_type():
                 raise TypeError('Map values for field ' + name + ' should all be of type '
@@ -366,6 +366,12 @@ class Map(Field):
     def schema(self):
         return {
             'type': self.type(),
-            'values': self.value_type.schema() if isinstance(self.value_type, Record)
+            'values': self.value_type.schema() if isinstance(self.value_type, (Array, Map, Record))
                 else self.value_type.type()
         }
+
+
+# Python3 has no `unicode` type, so here we use a tricky way to check if the type of `x` is `unicode` in Python2
+# and also make it work well with Python3.
+def is_unicode(x):
+    return 'encode' in dir(x) and type(x.encode()) == str
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index 2d03020..a86824c 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -624,6 +624,49 @@ class SchemaTest(TestCase):
         self.assertEqual(MyEnum.C, msg.value().v)
         client.close()
 
+    def test_avro_map_array(self):
+        class MapArray(Record):
+            values = Map(Array(Integer()))
+
+        class MapMap(Record):
+            values = Map(Map(Integer()))
+
+        class ArrayMap(Record):
+            values = Array(Map(Integer()))
+
+        class ArrayArray(Record):
+            values = Array(Array(Integer()))
+
+        topic_prefix = "my-avro-map-array-topic-"
+        data_list = (
+            (topic_prefix + "0", AvroSchema(MapArray),
+                MapArray(values={"A": [1, 2], "B": [3]})),
+            (topic_prefix + "1", AvroSchema(MapMap),
+                MapMap(values={"A": {"B": 2},})),
+            (topic_prefix + "2", AvroSchema(ArrayMap),
+                ArrayMap(values=[{"A": 1}, {"B": 2}, {"C": 3}])),
+            (topic_prefix + "3", AvroSchema(ArrayArray),
+                ArrayArray(values=[[1, 2, 3], [4]])),
+        )
+
+        client = pulsar.Client(self.serviceUrl)
+        for data in data_list:
+            topic = data[0]
+            schema = data[1]
+            record = data[2]
+
+            producer = client.create_producer(topic, schema=schema)
+            consumer = client.subscribe(topic, 'sub', schema=schema)
+
+            producer.send(record)
+            msg = consumer.receive()
+            self.assertEqual(msg.value().values, record.values)
+            consumer.acknowledge(msg)
+            consumer.close()
+            producer.close()
+
+        client.close()
+
     def test_default_value(self):
         class MyRecord(Record):
             A = Integer()

[pulsar] 02/02: [Python schema] Support python avro schema set default value. (#10265)

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9a72fe4fa137e369aa27150e46c41b67041de022
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Mon Apr 19 23:11:36 2021 +0800

    [Python schema] Support python avro schema set default value. (#10265)
    
    ## Motivation
    now python avro schema don't support set default value, it led to the python schema can't update.
    ## implement
    1. add `required` field to control the type of schema can set `null`.
    2. add `required_default` filed to control to control the schema has default attribute wether or not.
    3. add `default` field to control the default value of schema.
    
    (cherry picked from commit d6d0e3a88e5569b16b4c0b9cdbd845d5c07268e0)
---
 .../python/pulsar/schema/definition.py             |  89 ++++++++---
 pulsar-client-cpp/python/pulsar/schema/schema.py   |   4 +
 pulsar-client-cpp/python/schema_test.py            | 169 ++++++++++++++++++++-
 3 files changed, 234 insertions(+), 28 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index 7853d07..d46cf3c 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -56,10 +56,10 @@ class RecordMeta(type):
 
 class Record(with_metaclass(RecordMeta, object)):
 
-    def __init__(self, *args, **kwargs):
-        if args:
-            # Only allow keyword args
-            raise TypeError('Non-keyword arguments not allowed when initializing Records')
+    def __init__(self, default=None, required_default=False, required=False, *args, **kwargs):
+        self._required_default = required_default
+        self._default = default
+        self._required = required
 
         for k, value in self._fields.items():
             if k in kwargs:
@@ -85,18 +85,30 @@ class Record(with_metaclass(RecordMeta, object)):
             field_type = field.schema() if field._required else ['null', field.schema()]
             schema['fields'].append({
                 'name': name,
-                'type': field_type
+                'type': field_type,
+                'default': field.default()
+            }) if field.required_default() else schema['fields'].append({
+                'name': name,
+                'type': field_type,
             })
+
         return schema
 
     def __setattr__(self, key, value):
-        if key not in self._fields:
-            raise AttributeError('Cannot set undeclared field ' + key + ' on record')
+        if key == '_default':
+            super(Record, self).__setattr__(key, value)
+        elif key == '_required_default':
+            super(Record, self).__setattr__(key, value)
+        elif key == '_required':
+            super(Record, self).__setattr__(key, value)
+        else:
+            if key not in self._fields:
+                raise AttributeError('Cannot set undeclared field ' + key + ' on record')
 
-        # Check that type of value matches the field type
-        field = self._fields[key]
-        value = field.validate_type(key, value)
-        super(Record, self).__setattr__(key, value)
+            # Check that type of value matches the field type
+            field = self._fields[key]
+            value = field.validate_type(key, value)
+            super(Record, self).__setattr__(key, value)
 
     def __eq__(self, other):
         for field in self._fields:
@@ -116,12 +128,22 @@ class Record(with_metaclass(RecordMeta, object)):
                 type(val), name, self.__class__))
         return val
 
+    def default(self):
+        if self._default is not None:
+            return self._default
+        else:
+            return None
+
+    def required_default(self):
+        return self._required_default
+
 
 class Field(object):
-    def __init__(self, default=None, required=False):
+    def __init__(self, default=None, required=False, required_default=False):
         if default is not None:
             default = self.validate_type('default', default)
         self._default = default
+        self._required_default = required_default
         self._required = required
 
     @abstractmethod
@@ -144,6 +166,10 @@ class Field(object):
     def default(self):
         return self._default
 
+    def required_default(self):
+        return self._required_default
+
+
 # All types
 
 
@@ -185,7 +211,7 @@ class Integer(Field):
         if self._default is not None:
             return self._default
         else:
-            return 0
+            return None
 
 
 class Long(Field):
@@ -199,7 +225,7 @@ class Long(Field):
         if self._default is not None:
             return self._default
         else:
-            return 0
+            return None
 
 
 class Float(Field):
@@ -213,7 +239,7 @@ class Float(Field):
         if self._default is not None:
             return self._default
         else:
-            return 0.0
+            return None
 
 
 class Double(Field):
@@ -227,7 +253,7 @@ class Double(Field):
         if self._default is not None:
             return self._default
         else:
-            return 0.0
+            return None
 
 
 class Bytes(Field):
@@ -241,7 +267,7 @@ class Bytes(Field):
         if self._default is not None:
             return self._default
         else:
-            return bytes('')
+            return None
 
 
 class String(Field):
@@ -261,7 +287,8 @@ class String(Field):
         if self._default is not None:
             return self._default
         else:
-            return str('')
+            return None
+
 
 # Complex types
 
@@ -309,12 +336,18 @@ class _Enum(Field):
             'symbols': [x.name for x in self.enum_type]
         }
 
+    def default(self):
+        if self._default is not None:
+            return self._default
+        else:
+            return None
+
 
 class Array(Field):
-    def __init__(self, array_type):
+    def __init__(self, array_type, default=None, required=False, required_default=False):
         _check_record_or_field(array_type)
         self.array_type = array_type
-        super(Array, self).__init__()
+        super(Array, self).__init__(default=default, required=required, required_default=required_default)
 
     def type(self):
         return 'array'
@@ -338,12 +371,18 @@ class Array(Field):
                 else self.array_type.type()
         }
 
+    def default(self):
+        if self._default is not None:
+            return self._default
+        else:
+            return None
+
 
 class Map(Field):
-    def __init__(self, value_type):
+    def __init__(self, value_type, default=None, required=False, required_default=False):
         _check_record_or_field(value_type)
         self.value_type = value_type
-        super(Map, self).__init__()
+        super(Map, self).__init__(default=default, required=required, required_default=required_default)
 
     def type(self):
         return 'map'
@@ -370,6 +409,12 @@ class Map(Field):
                 else self.value_type.type()
         }
 
+    def default(self):
+        if self._default is not None:
+            return self._default
+        else:
+            return None
+
 
 # Python3 has no `unicode` type, so here we use a tricky way to check if the type of `x` is `unicode` in Python2
 # and also make it work well with Python3.
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema.py b/pulsar-client-cpp/python/pulsar/schema/schema.py
index 5f69ea2..d0da91a 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema.py
@@ -87,6 +87,10 @@ class JsonSchema(Schema):
 
     def encode(self, obj):
         self._validate_object_type(obj)
+        del obj.__dict__['_default']
+        del obj.__dict__['_required']
+        del obj.__dict__['_required_default']
+
         return json.dumps(obj.__dict__, default=self._get_serialized_value, indent=True).encode('utf-8')
 
     def decode(self, data):
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index a86824c..a0d60c0 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -180,9 +180,6 @@ class SchemaTest(TestCase):
             # Expected
             pass
 
-        try:
-            Record('xyz', a=1, b=2)
-            self.fail('Should have failed')
         except TypeError:
             # Expected
             pass
@@ -410,7 +407,7 @@ class SchemaTest(TestCase):
 
         r = Example()
         self.assertEqual(r.a, 5)
-        self.assertEqual(r.b, 0)
+        self.assertEqual(r.b, None)
         self.assertEqual(r.c, 'hello')
 
     ####
@@ -667,11 +664,171 @@ class SchemaTest(TestCase):
 
         client.close()
 
+    def test_avro_required_default(self):
+        class MySubRecord(Record):
+            x = Integer()
+            y = Long()
+            z = String()
+
+        class Example(Record):
+            a = Integer()
+            b = Boolean(required=True)
+            c = Long()
+            d = Float()
+            e = Double()
+            f = String()
+            g = Bytes()
+            h = Array(String())
+            i = Map(String())
+            j = MySubRecord()
+
+        class ExampleRequiredDefault(Record):
+            a = Integer(required_default=True)
+            b = Boolean(required=True, required_default=True)
+            c = Long(required_default=True)
+            d = Float(required_default=True)
+            e = Double(required_default=True)
+            f = String(required_default=True)
+            g = Bytes(required_default=True)
+            h = Array(String(), required_default=True)
+            i = Map(String(), required_default=True)
+            j = MySubRecord(required_default=True)
+        self.assertEqual(ExampleRequiredDefault.schema(), {
+                "name": "ExampleRequiredDefault",
+                "type": "record",
+                "fields": [
+                    {
+                        "name": "a",
+                        "type": [
+                            "null",
+                            "int"
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "b",
+                        "type": "boolean",
+                        "default": False
+                    },
+                    {
+                        "name": "c",
+                        "type": [
+                            "null",
+                            "long"
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "d",
+                        "type": [
+                            "null",
+                            "float"
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "e",
+                        "type": [
+                            "null",
+                            "double"
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "f",
+                        "type": [
+                            "null",
+                            "string"
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "g",
+                        "type": [
+                            "null",
+                            "bytes"
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "h",
+                        "type": [
+                            "null",
+                            {
+                                "type": "array",
+                                "items": "string"
+                            }
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "i",
+                        "type": [
+                            "null",
+                            {
+                                "type": "map",
+                                "values": "string"
+                            }
+                        ],
+                        "default": None
+                    },
+                    {
+                        "name": "j",
+                        "type": [
+                            "null",
+                            {
+                                "name": "MySubRecord",
+                                "type": "record",
+                                "fields": [
+                                    {
+                                        "name": "x",
+                                        "type": [
+                                            "null",
+                                            "int"
+                                        ]
+                                    },
+                                    {
+                                        "name": "y",
+                                        "type": [
+                                            "null",
+                                            "long"
+                                        ],
+                                    },
+                                    {
+                                        "name": "z",
+                                        "type": [
+                                            "null",
+                                            "string"
+                                        ]
+                                    }
+                                ]
+                            }
+                        ],
+                        "default": None
+                    }
+                ]
+            })
+
+        client = pulsar.Client(self.serviceUrl)
+        producer = client.create_producer(
+            'my-avro-python-default-topic',
+            schema=AvroSchema(Example))
+
+        producer_default = client.create_producer(
+            'my-avro-python-default-topic',
+            schema=AvroSchema(ExampleRequiredDefault))
+
+        producer.close()
+        producer_default.close()
+
+        client.close()
+
+
     def test_default_value(self):
         class MyRecord(Record):
             A = Integer()
             B = String()
-            C = Boolean()
+            C = Boolean(default=True, required=True)
             D = Double(default=6.4)
 
         topic = "my-default-value-topic"
@@ -689,7 +846,7 @@ class SchemaTest(TestCase):
         msg = consumer.receive()
         self.assertEqual(msg.value().A, 5)
         self.assertEqual(msg.value().B, u'text')
-        self.assertEqual(msg.value().C, False)
+        self.assertEqual(msg.value().C, True)
         self.assertEqual(msg.value().D, 6.4)
 
         producer.close()