You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/08/02 16:39:07 UTC
[pulsar] branch master updated: [Python Schema] Fix python schema
array map with record (#11530)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 03aedc7 [Python Schema] Fix python schema array map with record (#11530)
03aedc7 is described below
commit 03aedc7cd708e40781d6673114ce1691a7510a14
Author: ran <ga...@126.com>
AuthorDate: Tue Aug 3 00:38:14 2021 +0800
[Python Schema] Fix python schema array map with record (#11530)
* fix Python schema type Array and Map work with Record
* fix test
---
.../python/pulsar/schema/definition.py | 21 ++++++-
.../python/pulsar/schema/schema_avro.py | 7 +++
pulsar-client-cpp/python/schema_test.py | 70 ++++++++++++++++++++--
3 files changed, 93 insertions(+), 5 deletions(-)
diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index 3b946b8..56385957 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -67,8 +67,24 @@ class Record(with_metaclass(RecordMeta, object)):
if isinstance(value, Record) and isinstance(kwargs[k], dict):
# Use dict init Record object
copied = copy.copy(value)
- copied.__init__(decode=True, **kwargs[k])
+ copied.__init__(**kwargs[k])
self.__setattr__(k, copied)
+ elif isinstance(value, Array) and isinstance(kwargs[k], list) and len(kwargs[k]) > 0 \
+ and isinstance(value.array_type, Record) and isinstance(kwargs[k][0], dict):
+ arr = []
+ for item in kwargs[k]:
+ copied = copy.copy(value.array_type)
+ copied.__init__(**item)
+ arr.append(copied)
+ self.__setattr__(k, arr)
+ elif isinstance(value, Map) and isinstance(kwargs[k], dict) and len(kwargs[k]) > 0 \
+ and isinstance(value.value_type, Record) and isinstance(list(kwargs[k].values())[0], dict):
+ dic = {}
+ for mapKey, mapValue in kwargs[k].items():
+ copied = copy.copy(value.value_type)
+ copied.__init__(**mapValue)
+ dic[mapKey] = copied
+ self.__setattr__(k, dic)
else:
# Value was overridden at constructor
self.__setattr__(k, kwargs[k])
@@ -129,6 +145,9 @@ class Record(with_metaclass(RecordMeta, object)):
def type(self):
return str(self.__class__.__name__)
+ def python_type(self):
+ return self.__class__
+
def validate_type(self, name, val):
if not isinstance(val, self.__class__):
raise TypeError("Invalid type '%s' for sub-record field '%s'. Expected: %s" % (
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
index fc9e6a6..d2b57cb 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
@@ -42,6 +42,13 @@ if HAS_AVRO:
return x.name
elif isinstance(x, Record):
return self.encode_dict(x.__dict__)
+ elif isinstance(x, list):
+ arr = []
+ for item in x:
+ arr.append(self._get_serialized_value(item))
+ return arr
+ elif isinstance(x, dict):
+ return self.encode_dict(x)
else:
return x
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index 8cf6ff4..49b6c42 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -460,6 +460,9 @@ class SchemaTest(TestCase):
msg = consumer.receive()
self.assertEqual(r, msg.value())
+
+ producer.close()
+ consumer.close()
client.close()
def test_string_schema(self):
@@ -562,6 +565,9 @@ class SchemaTest(TestCase):
msg = consumer.receive()
self.assertEqual(r, msg.value())
+
+ producer.close()
+ consumer.close()
client.close()
def test_json_enum(self):
@@ -863,17 +869,38 @@ class SchemaTest(TestCase):
nb2 = Boolean()
nc2 = NestedObj1()
+ class NestedObj3(Record):
+ na3 = Integer()
+
+ class NestedObj4(Record):
+ na4 = String()
+ nb4 = Integer()
+
class ComplexRecord(Record):
a = Integer()
b = Integer()
nested = NestedObj2()
+ mapNested = Map(NestedObj3())
+ arrayNested = Array(NestedObj4())
+ print('complex schema: ', ComplexRecord.schema())
self.assertEqual(ComplexRecord.schema(), {
"name": "ComplexRecord",
"type": "record",
"fields": [
{"name": "a", "type": ["null", "int"]},
+ {'name': 'arrayNested', 'type': ['null',
+ {'type': 'array', 'items': {'name': 'NestedObj4', 'type': 'record', 'fields': [
+ {'name': 'na4', 'type': ['null', 'string']},
+ {'name': 'nb4', 'type': ['null', 'int']}
+ ]}}
+ ]},
{"name": "b", "type": ["null", "int"]},
+ {'name': 'mapNested', 'type': ['null', {'type': 'map', 'values':
+ {'name': 'NestedObj3', 'type': 'record', 'fields': [
+ {'name': 'na3', 'type': ['null', 'int']}
+ ]}}
+ ]},
{"name": "nested", "type": ['null', {'name': 'NestedObj2', 'type': 'record', 'fields': [
{'name': 'na2', 'type': ['null', 'int']},
{'name': 'nb2', 'type': ['null', 'boolean']},
@@ -892,7 +919,14 @@ class SchemaTest(TestCase):
nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
- r = ComplexRecord(a=1, b=2, nested=nested_obj2)
+ r = ComplexRecord(a=1, b=2, nested=nested_obj2, mapNested={
+ 'a': NestedObj3(na3=1),
+ 'b': NestedObj3(na3=2),
+ 'c': NestedObj3(na3=3)
+ }, arrayNested=[
+ NestedObj4(na4='value na4 1', nb4=100),
+ NestedObj4(na4='value na4 2', nb4=200)
+ ])
data_encode = data_schema.encode(r)
data_decode = data_schema.decode(data_encode)
@@ -904,6 +938,13 @@ class SchemaTest(TestCase):
self.assertEqual(data_decode.nested.nb2, True)
self.assertEqual(data_decode.nested.nc2.na1, 'na1 value')
self.assertEqual(data_decode.nested.nc2.nb1, 20.5)
+ self.assertEqual(data_decode.mapNested['a'].na3, 1)
+ self.assertEqual(data_decode.mapNested['b'].na3, 2)
+ self.assertEqual(data_decode.mapNested['c'].na3, 3)
+ self.assertEqual(data_decode.arrayNested[0].na4, 'value na4 1')
+ self.assertEqual(data_decode.arrayNested[0].nb4, 100)
+ self.assertEqual(data_decode.arrayNested[1].na4, 'value na4 2')
+ self.assertEqual(data_decode.arrayNested[1].nb4, 200)
print('Encode and decode complex schema finish. schema_type: ', schema_type)
encode_and_decode('avro')
@@ -919,10 +960,19 @@ class SchemaTest(TestCase):
nb2 = Boolean()
nc2 = NestedObj1()
+ class NestedObj3(Record):
+ na3 = Integer()
+
+ class NestedObj4(Record):
+ na4 = String()
+ nb4 = Integer()
+
class ComplexRecord(Record):
a = Integer()
b = Integer()
nested = NestedObj2()
+ mapNested = Map(NestedObj3())
+ arrayNested = Array(NestedObj4())
client = pulsar.Client(self.serviceUrl)
@@ -941,7 +991,14 @@ class SchemaTest(TestCase):
nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
- r = ComplexRecord(a=1, b=2, nested=nested_obj2)
+ r = ComplexRecord(a=1, b=2, nested=nested_obj2, mapNested={
+ 'a': NestedObj3(na3=1),
+ 'b': NestedObj3(na3=2),
+ 'c': NestedObj3(na3=3)
+ }, arrayNested=[
+ NestedObj4(na4='value na4 1', nb4=100),
+ NestedObj4(na4='value na4 2', nb4=200)
+ ])
producer.send(r)
msg = consumer.receive()
@@ -954,9 +1011,14 @@ class SchemaTest(TestCase):
self.assertEqual(value.nested.nb2, True)
self.assertEqual(value.nested.nc2.na1, 'na1 value')
self.assertEqual(value.nested.nc2.nb1, 20.5)
+ self.assertEqual(value.mapNested['a'].na3, 1)
+ self.assertEqual(value.mapNested['b'].na3, 2)
+ self.assertEqual(value.mapNested['c'].na3, 3)
+ self.assertEqual(value.arrayNested[0].na4, 'value na4 1')
+ self.assertEqual(value.arrayNested[0].nb4, 100)
+ self.assertEqual(value.arrayNested[1].na4, 'value na4 2')
+ self.assertEqual(value.arrayNested[1].nb4, 200)
- producer.close()
- consumer.close()
print('Produce and consume complex schema data finish. schema_type', schema_type)
produce_consume_test('avro')