You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:09 UTC

[pulsar] 23/38: [PY] Fix serialization of enums with json/avro schemas (#6808)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e135afbeccbe2d2eab2e7e479708601ace6e1413
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Apr 27 00:49:05 2020 -0700

    [PY] Fix serialization of enums with json/avro schemas (#6808)
    
    ### Motivation
    
    In Python client, the serialization of enums when using the schema is currently broken, throwing error because it's not possible to directly serialize them into json.
    
    Instead, for both Avro and JSON, we need to handle the enum serialization on its own way.
    (cherry picked from commit 5ec9d7b0c030bd377b35ed2128b79c7be93bee26)
---
 pulsar-client-cpp/python/pulsar/schema/__init__.py |  7 ++-
 pulsar-client-cpp/python/pulsar/schema/schema.py   | 18 ++++++-
 pulsar-client-cpp/python/schema_test.py            | 57 ++++++++++++++++++++++
 3 files changed, 76 insertions(+), 6 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/schema/__init__.py b/pulsar-client-cpp/python/pulsar/schema/__init__.py
index 096e64a..a38513f 100644
--- a/pulsar-client-cpp/python/pulsar/schema/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/schema/__init__.py
@@ -17,8 +17,7 @@
 # under the License.
 #
 
+from .definition import Record, Field, Null, Boolean, Integer, Long, \
+            Float, Double, Bytes, String, Array, Map
 
-from .definition import *
-from .schema import *
-
-
+from .schema import Schema, BytesSchema, StringSchema, JsonSchema, AvroSchema
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema.py b/pulsar-client-cpp/python/pulsar/schema/schema.py
index 260d7b0..5f69ea2 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema.py
@@ -23,6 +23,7 @@ import json
 import fastavro
 import _pulsar
 import io
+import enum
 
 
 class Schema(object):
@@ -78,9 +79,15 @@ class JsonSchema(Schema):
         super(JsonSchema, self).__init__(record_cls, _pulsar.SchemaType.JSON,
                                          record_cls.schema(), 'JSON')
 
+    def _get_serialized_value(self, o):
+        if isinstance(o, enum.Enum):
+            return o.value
+        else:
+            return o.__dict__
+
     def encode(self, obj):
         self._validate_object_type(obj)
-        return json.dumps(obj.__dict__, default=lambda o: o.__dict__, indent=True).encode('utf-8')
+        return json.dumps(obj.__dict__, default=self._get_serialized_value, indent=True).encode('utf-8')
 
     def decode(self, data):
         return self._record_cls(**json.loads(data))
@@ -92,10 +99,17 @@ class AvroSchema(Schema):
                                          record_cls.schema(), 'AVRO')
         self._schema = record_cls.schema()
 
+    def _get_serialized_value(self, x):
+        if isinstance(x, enum.Enum):
+            return x.name
+        else:
+            return x
+
     def encode(self, obj):
         self._validate_object_type(obj)
         buffer = io.BytesIO()
-        fastavro.schemaless_writer(buffer, self._schema, obj.__dict__)
+        m = {k: self._get_serialized_value(v) for k, v in obj.__dict__.items()}
+        fastavro.schemaless_writer(buffer, self._schema, m)
         return buffer.getvalue()
 
     def decode(self, data):
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index 07c7134..9aead6d 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -419,5 +419,62 @@ class SchemaTest(TestCase):
         self.assertEqual(r, msg.value())
         client.close()
 
+    def test_json_enum(self):
+        class MyEnum(Enum):
+            A = 1
+            B = 2
+            C = 3
+
+        class Example(Record):
+            name = String()
+            v = MyEnum
+
+        topic = 'my-json-enum-topic'
+
+        client = pulsar.Client(self.serviceUrl)
+        producer = client.create_producer(
+                        topic=topic,
+                        schema=JsonSchema(Example))
+
+        consumer = client.subscribe(topic, 'test',
+                                    schema=JsonSchema(Example))
+
+        r = Example(name='test', v=MyEnum.C)
+        producer.send(r)
+
+        msg = consumer.receive()
+
+        self.assertEqual('test', msg.value().name)
+        self.assertEqual(MyEnum.C, MyEnum(msg.value().v))
+        client.close()
+
+    def test_avro_enum(self):
+        class MyEnum(Enum):
+            A = 1
+            B = 2
+            C = 3
+
+        class Example(Record):
+            name = String()
+            v = MyEnum
+
+        topic = 'my-avro-enum-topic'
+
+        client = pulsar.Client(self.serviceUrl)
+        producer = client.create_producer(
+                        topic=topic,
+                        schema=AvroSchema(Example))
+
+        consumer = client.subscribe(topic, 'test',
+                                    schema=AvroSchema(Example))
+
+        r = Example(name='test', v=MyEnum.C)
+        producer.send(r)
+
+        msg = consumer.receive()
+        self.assertEqual(MyEnum.C, MyEnum[msg.value().v])
+        client.close()
+
+
 if __name__ == '__main__':
     main()