You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2021/04/29 09:29:34 UTC

[pulsar] 01/02: [Python] Fix nested Map or Array in schema doesn't work (#9548)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bf81fbf79e3222db01e6105b6330960c52ab42f2
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Feb 12 02:10:11 2021 +0800

    [Python] Fix nested Map or Array in schema doesn't work (#9548)
    
    * [Python] Fix nested Map or Array type in schema not work
    
    * Add tests for nested Map or Array
    
    * Refactor tests
    
    * Add tests
    
    * Remove repeated tests
    
    * Remove print to avoid Python2 CI failure
    
    * Fix validate failure
    
    * Fix exceptions thrown in validate
    
    * Use a function to check unicode type
    
    (cherry picked from commit 84cfb3affa1ce5bc1db29baab5498205049019dd)
---
 .../python/pulsar/schema/definition.py             | 12 ++++--
 pulsar-client-cpp/python/schema_test.py            | 43 ++++++++++++++++++++++
 2 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index 7075b36..7853d07 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -334,7 +334,7 @@ class Array(Field):
     def schema(self):
         return {
             'type': self.type(),
-            'items': self.array_type.schema() if isinstance(self.array_type, Record) 
+            'items': self.array_type.schema() if isinstance(self.array_type, (Array, Map, Record))
                 else self.array_type.type()
         }
 
@@ -355,7 +355,7 @@ class Map(Field):
         super(Map, self).validate_type(name, val)
 
         for k, v in val.items():
-            if type(k) != str:
+            if type(k) != str and not is_unicode(k):
                 raise TypeError('Map keys for field ' + name + '  should all be strings')
             if type(v) != self.value_type.python_type():
                 raise TypeError('Map values for field ' + name + ' should all be of type '
@@ -366,6 +366,12 @@ class Map(Field):
     def schema(self):
         return {
             'type': self.type(),
-            'values': self.value_type.schema() if isinstance(self.value_type, Record)
+            'values': self.value_type.schema() if isinstance(self.value_type, (Array, Map, Record))
                 else self.value_type.type()
         }
+
+
+# Python3 has no `unicode` type, so here we use a tricky way to check if the type of `x` is `unicode` in Python2
+# and also make it work well with Python3.
+def is_unicode(x):
+    return 'encode' in dir(x) and type(x.encode()) == str
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index 2d03020..a86824c 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -624,6 +624,49 @@ class SchemaTest(TestCase):
         self.assertEqual(MyEnum.C, msg.value().v)
         client.close()
 
+    def test_avro_map_array(self):
+        class MapArray(Record):
+            values = Map(Array(Integer()))
+
+        class MapMap(Record):
+            values = Map(Map(Integer()))
+
+        class ArrayMap(Record):
+            values = Array(Map(Integer()))
+
+        class ArrayArray(Record):
+            values = Array(Array(Integer()))
+
+        topic_prefix = "my-avro-map-array-topic-"
+        data_list = (
+            (topic_prefix + "0", AvroSchema(MapArray),
+                MapArray(values={"A": [1, 2], "B": [3]})),
+            (topic_prefix + "1", AvroSchema(MapMap),
+                MapMap(values={"A": {"B": 2},})),
+            (topic_prefix + "2", AvroSchema(ArrayMap),
+                ArrayMap(values=[{"A": 1}, {"B": 2}, {"C": 3}])),
+            (topic_prefix + "3", AvroSchema(ArrayArray),
+                ArrayArray(values=[[1, 2, 3], [4]])),
+        )
+
+        client = pulsar.Client(self.serviceUrl)
+        for data in data_list:
+            topic = data[0]
+            schema = data[1]
+            record = data[2]
+
+            producer = client.create_producer(topic, schema=schema)
+            consumer = client.subscribe(topic, 'sub', schema=schema)
+
+            producer.send(record)
+            msg = consumer.receive()
+            self.assertEqual(msg.value().values, record.values)
+            consumer.acknowledge(msg)
+            consumer.close()
+            producer.close()
+
+        client.close()
+
     def test_default_value(self):
         class MyRecord(Record):
             A = Integer()