You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2021/04/29 09:29:34 UTC
[pulsar] 01/02: [Python] Fix nested Map or Array in schema doesn't
work (#9548)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bf81fbf79e3222db01e6105b6330960c52ab42f2
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Feb 12 02:10:11 2021 +0800
[Python] Fix nested Map or Array in schema doesn't work (#9548)
* [Python] Fix nested Map or Array type in schema not work
* Add tests for nested Map or Array
* Refactor tests
* Add tests
* Remove repeated tests
* Remove print to avoid Python2 CI failure
* Fix validate failure
* Fix exceptions thrown in validate
* Use a function to check unicode type
(cherry picked from commit 84cfb3affa1ce5bc1db29baab5498205049019dd)
---
.../python/pulsar/schema/definition.py | 12 ++++--
pulsar-client-cpp/python/schema_test.py | 43 ++++++++++++++++++++++
2 files changed, 52 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index 7075b36..7853d07 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -334,7 +334,7 @@ class Array(Field):
def schema(self):
return {
'type': self.type(),
- 'items': self.array_type.schema() if isinstance(self.array_type, Record)
+ 'items': self.array_type.schema() if isinstance(self.array_type, (Array, Map, Record))
else self.array_type.type()
}
@@ -355,7 +355,7 @@ class Map(Field):
super(Map, self).validate_type(name, val)
for k, v in val.items():
- if type(k) != str:
+ if type(k) != str and not is_unicode(k):
raise TypeError('Map keys for field ' + name + ' should all be strings')
if type(v) != self.value_type.python_type():
raise TypeError('Map values for field ' + name + ' should all be of type '
@@ -366,6 +366,12 @@ class Map(Field):
def schema(self):
return {
'type': self.type(),
- 'values': self.value_type.schema() if isinstance(self.value_type, Record)
+ 'values': self.value_type.schema() if isinstance(self.value_type, (Array, Map, Record))
else self.value_type.type()
}
+
+
+# Python3 has no `unicode` type, so here we use a tricky way to check if the type of `x` is `unicode` in Python2
+# and also make it work well with Python3.
+def is_unicode(x):
+ return 'encode' in dir(x) and type(x.encode()) == str
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index 2d03020..a86824c 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -624,6 +624,49 @@ class SchemaTest(TestCase):
self.assertEqual(MyEnum.C, msg.value().v)
client.close()
+ def test_avro_map_array(self):
+ class MapArray(Record):
+ values = Map(Array(Integer()))
+
+ class MapMap(Record):
+ values = Map(Map(Integer()))
+
+ class ArrayMap(Record):
+ values = Array(Map(Integer()))
+
+ class ArrayArray(Record):
+ values = Array(Array(Integer()))
+
+ topic_prefix = "my-avro-map-array-topic-"
+ data_list = (
+ (topic_prefix + "0", AvroSchema(MapArray),
+ MapArray(values={"A": [1, 2], "B": [3]})),
+ (topic_prefix + "1", AvroSchema(MapMap),
+ MapMap(values={"A": {"B": 2},})),
+ (topic_prefix + "2", AvroSchema(ArrayMap),
+ ArrayMap(values=[{"A": 1}, {"B": 2}, {"C": 3}])),
+ (topic_prefix + "3", AvroSchema(ArrayArray),
+ ArrayArray(values=[[1, 2, 3], [4]])),
+ )
+
+ client = pulsar.Client(self.serviceUrl)
+ for data in data_list:
+ topic = data[0]
+ schema = data[1]
+ record = data[2]
+
+ producer = client.create_producer(topic, schema=schema)
+ consumer = client.subscribe(topic, 'sub', schema=schema)
+
+ producer.send(record)
+ msg = consumer.receive()
+ self.assertEqual(msg.value().values, record.values)
+ consumer.acknowledge(msg)
+ consumer.close()
+ producer.close()
+
+ client.close()
+
def test_default_value(self):
class MyRecord(Record):
A = Integer()