You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/07 13:37:33 UTC

[pulsar] branch branch-2.8 updated (52764ce -> b8ea17a)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 52764ce  revert the wrong modification in org.apache.pulsar.broker.namespace.OwnershipCache#checkOwnership (#12650)
     new 4e4f49d  PulsarAdmin: Fix last exit code storage (#12581)
     new b8ea17a  [Python Schema] Python schema support custom Avro configurations for Enum type (#12642)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pulsar-client-cpp/python/examples/company.avsc     |   4 +-
 pulsar-client-cpp/python/pulsar/schema/__init__.py |   2 +-
 .../python/pulsar/schema/definition.py             |  18 ++--
 pulsar-client-cpp/python/schema_test.py            | 118 ++++++++++-----------
 .../apache/pulsar/admin/cli/PulsarAdminTool.java   |   2 +-
 .../org/apache/pulsar/admin/cli/TestRunMain.java   |   4 +-
 6 files changed, 77 insertions(+), 71 deletions(-)

[pulsar] 02/02: [Python Schema] Python schema support custom Avro configurations for Enum type (#12642)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b8ea17a07a8161990e79598debddaf1cc65d502b
Author: ran <ga...@126.com>
AuthorDate: Sun Nov 7 21:24:33 2021 +0800

    [Python Schema] Python schema support custom Avro configurations for Enum type (#12642)
    
    ### Motivation
    
    Currently, the Python client didn't support setting configurations `required`, `default`, `required_default` for Enum type in Record.
    
    ### Modifications
    
    Modify the `_Enum` class to `CustomEnum` class, the `_Enum` wasn't exposed to users, the new class `CustomEnum` will be exposed to users, they could set Avro definition configurations `required`, `default`, `required_default`.
    
    ### How to use
    
    ```
    class Color(Enum):
        red = 1
        green = 2
        blue = 3
    
    class NestedObj(Record):
        a = Integer()
        color = CustomEnum(Color, required_default=True, default=Color.blue)
    ```
    
    The schema definition will be like this
    ```
    {
    	'type': 'record',
    	'name': 'NestedObj',
    	'fields': [
    		{'name': 'a', 'type': ['null', 'int']},
    		{'name': 'color', 'default': 'blue', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols': ['red', 'green', 'blue']}]}
    	]
    }
    ```
    
    The old way of use will also be preserved.
    
    This feature could work well with Java client.
    
    (cherry picked from commit e7389ed965cb712058c4ed1e8931d7563ca10bc8)
---
 pulsar-client-cpp/python/examples/company.avsc     |   4 +-
 pulsar-client-cpp/python/pulsar/schema/__init__.py |   2 +-
 .../python/pulsar/schema/definition.py             |  18 ++--
 pulsar-client-cpp/python/schema_test.py            | 118 ++++++++++-----------
 4 files changed, 74 insertions(+), 68 deletions(-)

diff --git a/pulsar-client-cpp/python/examples/company.avsc b/pulsar-client-cpp/python/examples/company.avsc
index cdb595f..5fb1860 100644
--- a/pulsar-client-cpp/python/examples/company.avsc
+++ b/pulsar-client-cpp/python/examples/company.avsc
@@ -14,6 +14,8 @@
                 {"name": "age", "type": ["null", "int"]}
             ]
         }}]},
-        {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
+        {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]},
+        {"name": "companyType", "type": ["null", {"type": "enum", "name": "CompanyType", "symbols":
+                    ["companyType1", "companyType2", "companyType3"]}]}
     ]
 }
\ No newline at end of file
diff --git a/pulsar-client-cpp/python/pulsar/schema/__init__.py b/pulsar-client-cpp/python/pulsar/schema/__init__.py
index 150629d..efa6806 100644
--- a/pulsar-client-cpp/python/pulsar/schema/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/schema/__init__.py
@@ -18,7 +18,7 @@
 #
 
 from .definition import Record, Field, Null, Boolean, Integer, Long, \
-            Float, Double, Bytes, String, Array, Map
+            Float, Double, Bytes, String, Array, Map, CustomEnum
 
 from .schema import Schema, BytesSchema, StringSchema, JsonSchema
 from .schema_avro import AvroSchema
diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index fd778f3..9b6c861 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -44,8 +44,7 @@ class RecordMeta(type):
         fields = OrderedDict()
         for name, value in dct.items():
             if issubclass(type(value), EnumMeta):
-                # Wrap Python enums
-                value = _Enum(value)
+                value = CustomEnum(value)
             elif type(value) == RecordMeta:
                 # We expect an instance of a record rather than the class itself
                 value = value()
@@ -125,6 +124,12 @@ class Record(with_metaclass(RecordMeta, object)):
             schema['namespace'] = cls._avro_namespace
         schema['fields'] = []
 
+        def get_filed_default_value(value):
+            if isinstance(value, Enum):
+                return value.name
+            else:
+                return value
+
         if cls._sorted_fields:
             fields = sorted(cls._fields.keys())
         else:
@@ -135,7 +140,7 @@ class Record(with_metaclass(RecordMeta, object)):
                 if field._required else ['null', field.schema_info(defined_names)]
             schema['fields'].append({
                 'name': name,
-                'default': field.default(),
+                'default': get_filed_default_value(field.default()),
                 'type': field_type
             }) if field.required_default() else schema['fields'].append({
                 'name': name,
@@ -360,15 +365,16 @@ class String(Field):
 
 # Complex types
 
-class _Enum(Field):
-    def __init__(self, enum_type):
+
+class CustomEnum(Field):
+    def __init__(self, enum_type, default=None, required=False, required_default=False):
         if not issubclass(enum_type, Enum):
             raise Exception(enum_type + " is not a valid Enum type")
         self.enum_type = enum_type
         self.values = {}
         for x in enum_type.__members__.values():
             self.values[x.value] = x
-        super(_Enum, self).__init__()
+        super(CustomEnum, self).__init__(default, required, required_default)
 
     def type(self):
         return 'enum'
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index d2554da..077f2bb 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -49,6 +49,7 @@ class SchemaTest(TestCase):
             g = Double()
             h = Bytes()
             i = Map(String())
+            j = CustomEnum(Color)
 
         fastavro.parse_schema(Example.schema())
         self.assertEqual(Example.schema(), {
@@ -74,16 +75,23 @@ class SchemaTest(TestCase):
                 {"name": "i", "type": ["null", {
                     "type": "map",
                     "values": "string"}]
-                 },
+                },
+                {"name": "j", "type": ["null", "Color"]}
             ]
         })
 
     def test_complex(self):
+        class Color(Enum):
+            red = 1
+            green = 2
+            blue = 3
+
         class MySubRecord(Record):
             _sorted_fields = True
             x = Integer()
             y = Long()
             z = String()
+            color = CustomEnum(Color)
 
         class Example(Record):
             _sorted_fields = True
@@ -101,9 +109,12 @@ class SchemaTest(TestCase):
                  "type": ["null", {
                      "name": "MySubRecord",
                      "type": "record",
-                     "fields": [{"name": "x", "type": ["null", "int"]},
-                                {"name": "y", "type": ["null", "long"]},
-                                {"name": "z", "type": ["null", "string"]}]
+                     "fields": [
+                        {'name': 'color', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols':
+                            ['red', 'green', 'blue']}]},
+                        {"name": "x", "type": ["null", "int"]},
+                        {"name": "y", "type": ["null", "long"]},
+                        {"name": "z", "type": ["null", "string"]}]
                  }]
                  },
                  {"name": "sub2",
@@ -630,6 +641,8 @@ class SchemaTest(TestCase):
         class Example(Record):
             name = String()
             v = MyEnum
+            w = CustomEnum(MyEnum)
+            x = CustomEnum(MyEnum, required=True, default=MyEnum.A, required_default=True)
 
         topic = 'my-json-enum-topic'
 
@@ -641,13 +654,15 @@ class SchemaTest(TestCase):
         consumer = client.subscribe(topic, 'test',
                                     schema=JsonSchema(Example))
 
-        r = Example(name='test', v=MyEnum.C)
+        r = Example(name='test', v=MyEnum.C, w=MyEnum.B)
         producer.send(r)
 
         msg = consumer.receive()
 
         self.assertEqual('test', msg.value().name)
         self.assertEqual(MyEnum.C, MyEnum(msg.value().v))
+        self.assertEqual(MyEnum.B, MyEnum(msg.value().w))
+        self.assertEqual(MyEnum.A, MyEnum(msg.value().x))
         client.close()
 
     def test_avro_enum(self):
@@ -659,6 +674,8 @@ class SchemaTest(TestCase):
         class Example(Record):
             name = String()
             v = MyEnum
+            w = CustomEnum(MyEnum)
+            x = CustomEnum(MyEnum, required=True, default=MyEnum.B, required_default=True)
 
         topic = 'my-avro-enum-topic'
 
@@ -670,12 +687,14 @@ class SchemaTest(TestCase):
         consumer = client.subscribe(topic, 'test',
                                     schema=AvroSchema(Example))
 
-        r = Example(name='test', v=MyEnum.C)
+        r = Example(name='test', v=MyEnum.C, w=MyEnum.A)
         producer.send(r)
 
         msg = consumer.receive()
         msg.value()
         self.assertEqual(MyEnum.C, msg.value().v)
+        self.assertEqual(MyEnum.A, MyEnum(msg.value().w))
+        self.assertEqual(MyEnum.B, MyEnum(msg.value().x))
         client.close()
 
     def test_avro_map_array(self):
@@ -913,6 +932,11 @@ class SchemaTest(TestCase):
         client.close()
 
     def test_serialize_schema_complex(self):
+        class Color(Enum):
+            red = 1
+            green = 2
+            blue = 3
+
         class NestedObj1(Record):
             _sorted_fields = True
             na1 = String()
@@ -925,6 +949,8 @@ class SchemaTest(TestCase):
             nc2 = NestedObj1()
 
         class NestedObj3(Record):
+            _sorted_fields = True
+            color = CustomEnum(Color)
             na3 = Integer()
 
         class NestedObj4(Record):
@@ -933,11 +959,6 @@ class SchemaTest(TestCase):
             na4 = String()
             nb4 = Integer()
 
-        class Color(Enum):
-            red = 1
-            green = 2
-            blue = 3
-
         class ComplexRecord(Record):
             _avro_namespace = 'xxx.xxx'
             _sorted_fields = True
@@ -945,6 +966,7 @@ class SchemaTest(TestCase):
             b = Integer()
             color = Color
             color2 = Color
+            color3 = CustomEnum(Color, required=True, default=Color.red, required_default=True)
             nested = NestedObj2()
             nested2 = NestedObj2()
             mapNested = Map(NestedObj3())
@@ -970,8 +992,10 @@ class SchemaTest(TestCase):
                 {'name': 'color', 'type': ['null', {'type': 'enum', 'name': 'Color', 'symbols': [
                     'red', 'green', 'blue']}]},
                 {'name': 'color2', 'type': ['null', 'Color']},
+                {'name': 'color3', 'default': 'red', 'type': 'Color'},
                 {'name': 'mapNested', 'type': ['null', {'type': 'map', 'values':
                     {'name': 'NestedObj3', 'type': 'record', 'fields': [
+                        {'name': 'color', 'type': ['null', 'Color']},
                         {'name': 'na3', 'type': ['null', 'int']}
                     ]}}
                 ]},
@@ -998,12 +1022,12 @@ class SchemaTest(TestCase):
             r = ComplexRecord(a=1, b=2, color=Color.red, color2=Color.blue,
                               nested=nested_obj2, nested2=nested_obj2,
             mapNested={
-                'a': NestedObj3(na3=1),
+                'a': NestedObj3(na3=1, color=Color.green),
                 'b': NestedObj3(na3=2),
-                'c': NestedObj3(na3=3)
+                'c': NestedObj3(na3=3, color=Color.red)
             }, mapNested2={
-                'd': NestedObj3(na3=4),
-                'e': NestedObj3(na3=5),
+                'd': NestedObj3(na3=4, color=Color.red),
+                'e': NestedObj3(na3=5, color=Color.blue),
                 'f': NestedObj3(na3=6)
             }, arrayNested=[
                 NestedObj4(na4='value na4 1', nb4=100),
@@ -1017,32 +1041,9 @@ class SchemaTest(TestCase):
             data_decode = data_schema.decode(data_encode)
             self.assertEqual(data_decode.__class__.__name__, 'ComplexRecord')
             self.assertEqual(data_decode, r)
-            self.assertEqual(data_decode.a, 1)
-            self.assertEqual(data_decode.b, 2)
-            self.assertEqual(data_decode.color, Color.red)
-            self.assertEqual(data_decode.color2, Color.blue)
-            self.assertEqual(data_decode.nested.na2, 22)
-            self.assertEqual(data_decode.nested.nb2, True)
-            self.assertEqual(data_decode.nested.nc2.na1, 'na1 value')
-            self.assertEqual(data_decode.nested.nc2.nb1, 20.5)
-            self.assertEqual(data_decode.nested2.na2, 22)
-            self.assertEqual(data_decode.nested2.nb2, True)
-            self.assertEqual(data_decode.nested2.nc2.na1, 'na1 value')
-            self.assertEqual(data_decode.nested2.nc2.nb1, 20.5)
-            self.assertEqual(data_decode.mapNested['a'].na3, 1)
-            self.assertEqual(data_decode.mapNested['b'].na3, 2)
-            self.assertEqual(data_decode.mapNested['c'].na3, 3)
-            self.assertEqual(data_decode.mapNested2['d'].na3, 4)
-            self.assertEqual(data_decode.mapNested2['e'].na3, 5)
-            self.assertEqual(data_decode.mapNested2['f'].na3, 6)
-            self.assertEqual(data_decode.arrayNested[0].na4, 'value na4 1')
-            self.assertEqual(data_decode.arrayNested[0].nb4, 100)
-            self.assertEqual(data_decode.arrayNested[1].na4, 'value na4 2')
-            self.assertEqual(data_decode.arrayNested[1].nb4, 200)
-            self.assertEqual(data_decode.arrayNested2[0].na4, 'value na4 3')
-            self.assertEqual(data_decode.arrayNested2[0].nb4, 300)
-            self.assertEqual(data_decode.arrayNested2[1].na4, 'value na4 4')
-            self.assertEqual(data_decode.arrayNested2[1].nb4, 400)
+            self.assertEqual(r.color3, Color.red)
+            self.assertEqual(r.mapNested['a'].color, Color.green)
+            self.assertEqual(r.mapNested['b'].color, None)
             print('Encode and decode complex schema finish. schema_type: ', schema_type)
 
         encode_and_decode('avro')
@@ -1069,8 +1070,12 @@ class SchemaTest(TestCase):
         self.assertEqual(data_decode.na2, 1)
         self.assertTrue(data_decode.nb2)
 
-
     def test_produce_and_consume_complex_schema_data(self):
+        class Color(Enum):
+            red = 1
+            green = 2
+            blue = 3
+
         class NestedObj1(Record):
             na1 = String()
             nb1 = Double()
@@ -1082,6 +1087,7 @@ class SchemaTest(TestCase):
 
         class NestedObj3(Record):
             na3 = Integer()
+            color = CustomEnum(Color, required=True, required_default=True, default=Color.blue)
 
         class NestedObj4(Record):
             na4 = String()
@@ -1090,6 +1096,7 @@ class SchemaTest(TestCase):
         class ComplexRecord(Record):
             a = Integer()
             b = Integer()
+            color = CustomEnum(Color)
             nested = NestedObj2()
             mapNested = Map(NestedObj3())
             arrayNested = Array(NestedObj4())
@@ -1112,8 +1119,8 @@ class SchemaTest(TestCase):
             nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
             nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
             r = ComplexRecord(a=1, b=2, nested=nested_obj2, mapNested={
-                'a': NestedObj3(na3=1),
-                'b': NestedObj3(na3=2),
+                'a': NestedObj3(na3=1, color=Color.red),
+                'b': NestedObj3(na3=2, color=Color.green),
                 'c': NestedObj3(na3=3)
             }, arrayNested=[
                 NestedObj4(na4='value na4 1', nb4=100),
@@ -1125,19 +1132,6 @@ class SchemaTest(TestCase):
             value = msg.value()
             self.assertEqual(value.__class__.__name__, 'ComplexRecord')
             self.assertEqual(value, r)
-            self.assertEqual(value.a, 1)
-            self.assertEqual(value.b, 2)
-            self.assertEqual(value.nested.na2, 22)
-            self.assertEqual(value.nested.nb2, True)
-            self.assertEqual(value.nested.nc2.na1, 'na1 value')
-            self.assertEqual(value.nested.nc2.nb1, 20.5)
-            self.assertEqual(value.mapNested['a'].na3, 1)
-            self.assertEqual(value.mapNested['b'].na3, 2)
-            self.assertEqual(value.mapNested['c'].na3, 3)
-            self.assertEqual(value.arrayNested[0].na4, 'value na4 1')
-            self.assertEqual(value.arrayNested[0].nb4, 100)
-            self.assertEqual(value.arrayNested[1].na4, 'value na4 2')
-            self.assertEqual(value.arrayNested[1].nb4, 200)
 
             print('Produce and consume complex schema data finish. schema_type', schema_type)
 
@@ -1163,7 +1157,8 @@ class SchemaTest(TestCase):
                     "industry": "software",
                     "scale": ">100",
                     "funds": "1000000.0"
-                }
+                },
+                "companyType": "companyType1"
             }
             data = avro_schema.encode(company)
             company_decode = avro_schema.decode(data)
@@ -1185,7 +1180,9 @@ class SchemaTest(TestCase):
                         {'name': 'age', 'type': ['null', 'int']}
                     ]
                 }}]},
-                {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]}
+                {'name': 'labels', 'type': ['null', {'type': 'map', 'values': 'string'}]},
+                {'name': 'companyType', 'type': ['null', {'type': 'enum', 'name': 'CompanyType', 'symbols':
+                    ['companyType1', 'companyType2', 'companyType3']}]}
             ]
         }
         encode_and_decode(schema_definition)
@@ -1218,7 +1215,8 @@ class SchemaTest(TestCase):
                         "industry": "software" + str(i),
                         "scale": ">100",
                         "funds": "1000000.0"
-                    }
+                    },
+                    "companyType": "companyType" + str((i % 3) + 1)
                 }
                 producer.send(company)
 

[pulsar] 01/02: PulsarAdmin: Fix last exit code storage (#12581)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4e4f49d283b83e7310e909a2d9c5d960793486b1
Author: Thomas Leplus <th...@users.noreply.github.com>
AuthorDate: Fri Nov 5 20:11:19 2021 -0700

    PulsarAdmin: Fix last exit code storage (#12581)
    
    (cherry picked from commit d357cec05ee935e9dc3d719a1bdcba3beee95edf)
---
 .../src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java    | 2 +-
 .../src/test/java/org/apache/pulsar/admin/cli/TestRunMain.java        | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index a432b92..595f6ce 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -334,7 +334,7 @@ public class PulsarAdminTool {
     }
 
     private static void exit(int code) {
-        lastExitCode = lastExitCode;
+        lastExitCode = code;
         if (allowSystemExit) {
             // we are using halt and not System.exit, we do not mind about shutdown hooks
             // they are only slowing down the tool
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestRunMain.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestRunMain.java
index 84a992d..1e470ff 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestRunMain.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestRunMain.java
@@ -31,7 +31,7 @@ public class TestRunMain {
     public void runMainNoArguments() throws Exception {
         PulsarAdminTool.setAllowSystemExit(false);
         PulsarAdminTool.main(new String[0]);
-        assertEquals(PulsarAdminTool.getLastExitCode(), 0);
+        assertEquals(PulsarAdminTool.getLastExitCode(), 1);
     }
 
     @Test
@@ -39,6 +39,6 @@ public class TestRunMain {
         PulsarAdminTool.setAllowSystemExit(false);
         Path dummyEmptyFile = Files.createTempFile("test", ".conf");
         PulsarAdminTool.main(new String[] {dummyEmptyFile.toAbsolutePath().toString()});
-        assertEquals(PulsarAdminTool.getLastExitCode(), 0);
+        assertEquals(PulsarAdminTool.getLastExitCode(), 1);
     }
 }