You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/08/02 16:39:07 UTC

[pulsar] branch master updated: [Python Schema] Fix python schema array map with record (#11530)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 03aedc7  [Python Schema] Fix python schema array map with record (#11530)
03aedc7 is described below

commit 03aedc7cd708e40781d6673114ce1691a7510a14
Author: ran <ga...@126.com>
AuthorDate: Tue Aug 3 00:38:14 2021 +0800

    [Python Schema] Fix python schema array map with record (#11530)
    
    * fix Python schema type Array and Map work with Record
    
    * fix test
---
 .../python/pulsar/schema/definition.py             | 21 ++++++-
 .../python/pulsar/schema/schema_avro.py            |  7 +++
 pulsar-client-cpp/python/schema_test.py            | 70 ++++++++++++++++++++--
 3 files changed, 93 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index 3b946b8..56385957 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -67,8 +67,24 @@ class Record(with_metaclass(RecordMeta, object)):
                 if isinstance(value, Record) and isinstance(kwargs[k], dict):
                     # Use dict init Record object
                     copied = copy.copy(value)
-                    copied.__init__(decode=True, **kwargs[k])
+                    copied.__init__(**kwargs[k])
                     self.__setattr__(k, copied)
+                elif isinstance(value, Array) and isinstance(kwargs[k], list) and len(kwargs[k]) > 0 \
+                        and isinstance(value.array_type, Record) and isinstance(kwargs[k][0], dict):
+                    arr = []
+                    for item in kwargs[k]:
+                        copied = copy.copy(value.array_type)
+                        copied.__init__(**item)
+                        arr.append(copied)
+                    self.__setattr__(k, arr)
+                elif isinstance(value, Map) and isinstance(kwargs[k], dict) and len(kwargs[k]) > 0 \
+                    and isinstance(value.value_type, Record) and isinstance(list(kwargs[k].values())[0], dict):
+                    dic = {}
+                    for mapKey, mapValue in kwargs[k].items():
+                        copied = copy.copy(value.value_type)
+                        copied.__init__(**mapValue)
+                        dic[mapKey] = copied
+                    self.__setattr__(k, dic)
                 else:
                     # Value was overridden at constructor
                     self.__setattr__(k, kwargs[k])
@@ -129,6 +145,9 @@ class Record(with_metaclass(RecordMeta, object)):
     def type(self):
         return str(self.__class__.__name__)
 
+    def python_type(self):
+        return self.__class__
+
     def validate_type(self, name, val):
         if not isinstance(val, self.__class__):
             raise TypeError("Invalid type '%s' for sub-record field '%s'. Expected: %s" % (
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
index fc9e6a6..d2b57cb 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
@@ -42,6 +42,13 @@ if HAS_AVRO:
                 return x.name
             elif isinstance(x, Record):
                 return self.encode_dict(x.__dict__)
+            elif isinstance(x, list):
+                arr = []
+                for item in x:
+                    arr.append(self._get_serialized_value(item))
+                return arr
+            elif isinstance(x, dict):
+                return self.encode_dict(x)
             else:
                 return x
 
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index 8cf6ff4..49b6c42 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -460,6 +460,9 @@ class SchemaTest(TestCase):
         msg = consumer.receive()
 
         self.assertEqual(r, msg.value())
+
+        producer.close()
+        consumer.close()
         client.close()
 
     def test_string_schema(self):
@@ -562,6 +565,9 @@ class SchemaTest(TestCase):
         msg = consumer.receive()
 
         self.assertEqual(r, msg.value())
+
+        producer.close()
+        consumer.close()
         client.close()
 
     def test_json_enum(self):
@@ -863,17 +869,38 @@ class SchemaTest(TestCase):
             nb2 = Boolean()
             nc2 = NestedObj1()
 
+        class NestedObj3(Record):
+            na3 = Integer()
+
+        class NestedObj4(Record):
+            na4 = String()
+            nb4 = Integer()
+
         class ComplexRecord(Record):
             a = Integer()
             b = Integer()
             nested = NestedObj2()
+            mapNested = Map(NestedObj3())
+            arrayNested = Array(NestedObj4())
 
+        print('complex schema: ', ComplexRecord.schema())
         self.assertEqual(ComplexRecord.schema(), {
             "name": "ComplexRecord",
             "type": "record",
             "fields": [
                 {"name": "a", "type": ["null", "int"]},
+                {'name': 'arrayNested', 'type': ['null',
+                    {'type': 'array', 'items': {'name': 'NestedObj4', 'type': 'record', 'fields': [
+                        {'name': 'na4', 'type': ['null', 'string']},
+                        {'name': 'nb4', 'type': ['null', 'int']}
+                    ]}}
+                ]},
                 {"name": "b", "type": ["null", "int"]},
+                {'name': 'mapNested', 'type': ['null', {'type': 'map', 'values':
+                    {'name': 'NestedObj3', 'type': 'record', 'fields': [
+                        {'name': 'na3', 'type': ['null', 'int']}
+                    ]}}
+                ]},
                 {"name": "nested", "type": ['null', {'name': 'NestedObj2', 'type': 'record', 'fields': [
                     {'name': 'na2', 'type': ['null', 'int']},
                     {'name': 'nb2', 'type': ['null', 'boolean']},
@@ -892,7 +919,14 @@ class SchemaTest(TestCase):
 
             nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
             nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
-            r = ComplexRecord(a=1, b=2, nested=nested_obj2)
+            r = ComplexRecord(a=1, b=2, nested=nested_obj2, mapNested={
+                'a': NestedObj3(na3=1),
+                'b': NestedObj3(na3=2),
+                'c': NestedObj3(na3=3)
+            }, arrayNested=[
+                NestedObj4(na4='value na4 1', nb4=100),
+                NestedObj4(na4='value na4 2', nb4=200)
+            ])
             data_encode = data_schema.encode(r)
 
             data_decode = data_schema.decode(data_encode)
@@ -904,6 +938,13 @@ class SchemaTest(TestCase):
             self.assertEqual(data_decode.nested.nb2, True)
             self.assertEqual(data_decode.nested.nc2.na1, 'na1 value')
             self.assertEqual(data_decode.nested.nc2.nb1, 20.5)
+            self.assertEqual(data_decode.mapNested['a'].na3, 1)
+            self.assertEqual(data_decode.mapNested['b'].na3, 2)
+            self.assertEqual(data_decode.mapNested['c'].na3, 3)
+            self.assertEqual(data_decode.arrayNested[0].na4, 'value na4 1')
+            self.assertEqual(data_decode.arrayNested[0].nb4, 100)
+            self.assertEqual(data_decode.arrayNested[1].na4, 'value na4 2')
+            self.assertEqual(data_decode.arrayNested[1].nb4, 200)
             print('Encode and decode complex schema finish. schema_type: ', schema_type)
 
         encode_and_decode('avro')
@@ -919,10 +960,19 @@ class SchemaTest(TestCase):
             nb2 = Boolean()
             nc2 = NestedObj1()
 
+        class NestedObj3(Record):
+            na3 = Integer()
+
+        class NestedObj4(Record):
+            na4 = String()
+            nb4 = Integer()
+
         class ComplexRecord(Record):
             a = Integer()
             b = Integer()
             nested = NestedObj2()
+            mapNested = Map(NestedObj3())
+            arrayNested = Array(NestedObj4())
 
         client = pulsar.Client(self.serviceUrl)
 
@@ -941,7 +991,14 @@ class SchemaTest(TestCase):
 
             nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
             nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
-            r = ComplexRecord(a=1, b=2, nested=nested_obj2)
+            r = ComplexRecord(a=1, b=2, nested=nested_obj2, mapNested={
+                'a': NestedObj3(na3=1),
+                'b': NestedObj3(na3=2),
+                'c': NestedObj3(na3=3)
+            }, arrayNested=[
+                NestedObj4(na4='value na4 1', nb4=100),
+                NestedObj4(na4='value na4 2', nb4=200)
+            ])
             producer.send(r)
 
             msg = consumer.receive()
@@ -954,9 +1011,14 @@ class SchemaTest(TestCase):
             self.assertEqual(value.nested.nb2, True)
             self.assertEqual(value.nested.nc2.na1, 'na1 value')
             self.assertEqual(value.nested.nc2.nb1, 20.5)
+            self.assertEqual(value.mapNested['a'].na3, 1)
+            self.assertEqual(value.mapNested['b'].na3, 2)
+            self.assertEqual(value.mapNested['c'].na3, 3)
+            self.assertEqual(value.arrayNested[0].na4, 'value na4 1')
+            self.assertEqual(value.arrayNested[0].nb4, 100)
+            self.assertEqual(value.arrayNested[1].na4, 'value na4 2')
+            self.assertEqual(value.arrayNested[1].nb4, 200)
 
-            producer.close()
-            consumer.close()
             print('Produce and consume complex schema data finish. schema_type', schema_type)
 
         produce_consume_test('avro')