You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2021/04/29 09:29:35 UTC
[pulsar] 02/02: [Python schema] Support python avro schema set
default value. (#10265)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9a72fe4fa137e369aa27150e46c41b67041de022
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Mon Apr 19 23:11:36 2021 +0800
[Python schema] Support python avro schema set default value. (#10265)
## Motivation
now python avro schema don't support set default value, it led to the python schema can't update.
## implement
1. add `required` field to control the type of schema can set `null`.
2. add `required_default` filed to control to control the schema has default attribute wether or not.
3. add `default` field to control the default value of schema.
(cherry picked from commit d6d0e3a88e5569b16b4c0b9cdbd845d5c07268e0)
---
.../python/pulsar/schema/definition.py | 89 ++++++++---
pulsar-client-cpp/python/pulsar/schema/schema.py | 4 +
pulsar-client-cpp/python/schema_test.py | 169 ++++++++++++++++++++-
3 files changed, 234 insertions(+), 28 deletions(-)
diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py b/pulsar-client-cpp/python/pulsar/schema/definition.py
index 7853d07..d46cf3c 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -56,10 +56,10 @@ class RecordMeta(type):
class Record(with_metaclass(RecordMeta, object)):
- def __init__(self, *args, **kwargs):
- if args:
- # Only allow keyword args
- raise TypeError('Non-keyword arguments not allowed when initializing Records')
+ def __init__(self, default=None, required_default=False, required=False, *args, **kwargs):
+ self._required_default = required_default
+ self._default = default
+ self._required = required
for k, value in self._fields.items():
if k in kwargs:
@@ -85,18 +85,30 @@ class Record(with_metaclass(RecordMeta, object)):
field_type = field.schema() if field._required else ['null', field.schema()]
schema['fields'].append({
'name': name,
- 'type': field_type
+ 'type': field_type,
+ 'default': field.default()
+ }) if field.required_default() else schema['fields'].append({
+ 'name': name,
+ 'type': field_type,
})
+
return schema
def __setattr__(self, key, value):
- if key not in self._fields:
- raise AttributeError('Cannot set undeclared field ' + key + ' on record')
+ if key == '_default':
+ super(Record, self).__setattr__(key, value)
+ elif key == '_required_default':
+ super(Record, self).__setattr__(key, value)
+ elif key == '_required':
+ super(Record, self).__setattr__(key, value)
+ else:
+ if key not in self._fields:
+ raise AttributeError('Cannot set undeclared field ' + key + ' on record')
- # Check that type of value matches the field type
- field = self._fields[key]
- value = field.validate_type(key, value)
- super(Record, self).__setattr__(key, value)
+ # Check that type of value matches the field type
+ field = self._fields[key]
+ value = field.validate_type(key, value)
+ super(Record, self).__setattr__(key, value)
def __eq__(self, other):
for field in self._fields:
@@ -116,12 +128,22 @@ class Record(with_metaclass(RecordMeta, object)):
type(val), name, self.__class__))
return val
+ def default(self):
+ if self._default is not None:
+ return self._default
+ else:
+ return None
+
+ def required_default(self):
+ return self._required_default
+
class Field(object):
- def __init__(self, default=None, required=False):
+ def __init__(self, default=None, required=False, required_default=False):
if default is not None:
default = self.validate_type('default', default)
self._default = default
+ self._required_default = required_default
self._required = required
@abstractmethod
@@ -144,6 +166,10 @@ class Field(object):
def default(self):
return self._default
+ def required_default(self):
+ return self._required_default
+
+
# All types
@@ -185,7 +211,7 @@ class Integer(Field):
if self._default is not None:
return self._default
else:
- return 0
+ return None
class Long(Field):
@@ -199,7 +225,7 @@ class Long(Field):
if self._default is not None:
return self._default
else:
- return 0
+ return None
class Float(Field):
@@ -213,7 +239,7 @@ class Float(Field):
if self._default is not None:
return self._default
else:
- return 0.0
+ return None
class Double(Field):
@@ -227,7 +253,7 @@ class Double(Field):
if self._default is not None:
return self._default
else:
- return 0.0
+ return None
class Bytes(Field):
@@ -241,7 +267,7 @@ class Bytes(Field):
if self._default is not None:
return self._default
else:
- return bytes('')
+ return None
class String(Field):
@@ -261,7 +287,8 @@ class String(Field):
if self._default is not None:
return self._default
else:
- return str('')
+ return None
+
# Complex types
@@ -309,12 +336,18 @@ class _Enum(Field):
'symbols': [x.name for x in self.enum_type]
}
+ def default(self):
+ if self._default is not None:
+ return self._default
+ else:
+ return None
+
class Array(Field):
- def __init__(self, array_type):
+ def __init__(self, array_type, default=None, required=False, required_default=False):
_check_record_or_field(array_type)
self.array_type = array_type
- super(Array, self).__init__()
+ super(Array, self).__init__(default=default, required=required, required_default=required_default)
def type(self):
return 'array'
@@ -338,12 +371,18 @@ class Array(Field):
else self.array_type.type()
}
+ def default(self):
+ if self._default is not None:
+ return self._default
+ else:
+ return None
+
class Map(Field):
- def __init__(self, value_type):
+ def __init__(self, value_type, default=None, required=False, required_default=False):
_check_record_or_field(value_type)
self.value_type = value_type
- super(Map, self).__init__()
+ super(Map, self).__init__(default=default, required=required, required_default=required_default)
def type(self):
return 'map'
@@ -370,6 +409,12 @@ class Map(Field):
else self.value_type.type()
}
+ def default(self):
+ if self._default is not None:
+ return self._default
+ else:
+ return None
+
# Python3 has no `unicode` type, so here we use a tricky way to check if the type of `x` is `unicode` in Python2
# and also make it work well with Python3.
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema.py b/pulsar-client-cpp/python/pulsar/schema/schema.py
index 5f69ea2..d0da91a 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema.py
@@ -87,6 +87,10 @@ class JsonSchema(Schema):
def encode(self, obj):
self._validate_object_type(obj)
+ del obj.__dict__['_default']
+ del obj.__dict__['_required']
+ del obj.__dict__['_required_default']
+
return json.dumps(obj.__dict__, default=self._get_serialized_value, indent=True).encode('utf-8')
def decode(self, data):
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index a86824c..a0d60c0 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -180,9 +180,6 @@ class SchemaTest(TestCase):
# Expected
pass
- try:
- Record('xyz', a=1, b=2)
- self.fail('Should have failed')
except TypeError:
# Expected
pass
@@ -410,7 +407,7 @@ class SchemaTest(TestCase):
r = Example()
self.assertEqual(r.a, 5)
- self.assertEqual(r.b, 0)
+ self.assertEqual(r.b, None)
self.assertEqual(r.c, 'hello')
####
@@ -667,11 +664,171 @@ class SchemaTest(TestCase):
client.close()
+ def test_avro_required_default(self):
+ class MySubRecord(Record):
+ x = Integer()
+ y = Long()
+ z = String()
+
+ class Example(Record):
+ a = Integer()
+ b = Boolean(required=True)
+ c = Long()
+ d = Float()
+ e = Double()
+ f = String()
+ g = Bytes()
+ h = Array(String())
+ i = Map(String())
+ j = MySubRecord()
+
+ class ExampleRequiredDefault(Record):
+ a = Integer(required_default=True)
+ b = Boolean(required=True, required_default=True)
+ c = Long(required_default=True)
+ d = Float(required_default=True)
+ e = Double(required_default=True)
+ f = String(required_default=True)
+ g = Bytes(required_default=True)
+ h = Array(String(), required_default=True)
+ i = Map(String(), required_default=True)
+ j = MySubRecord(required_default=True)
+ self.assertEqual(ExampleRequiredDefault.schema(), {
+ "name": "ExampleRequiredDefault",
+ "type": "record",
+ "fields": [
+ {
+ "name": "a",
+ "type": [
+ "null",
+ "int"
+ ],
+ "default": None
+ },
+ {
+ "name": "b",
+ "type": "boolean",
+ "default": False
+ },
+ {
+ "name": "c",
+ "type": [
+ "null",
+ "long"
+ ],
+ "default": None
+ },
+ {
+ "name": "d",
+ "type": [
+ "null",
+ "float"
+ ],
+ "default": None
+ },
+ {
+ "name": "e",
+ "type": [
+ "null",
+ "double"
+ ],
+ "default": None
+ },
+ {
+ "name": "f",
+ "type": [
+ "null",
+ "string"
+ ],
+ "default": None
+ },
+ {
+ "name": "g",
+ "type": [
+ "null",
+ "bytes"
+ ],
+ "default": None
+ },
+ {
+ "name": "h",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": "string"
+ }
+ ],
+ "default": None
+ },
+ {
+ "name": "i",
+ "type": [
+ "null",
+ {
+ "type": "map",
+ "values": "string"
+ }
+ ],
+ "default": None
+ },
+ {
+ "name": "j",
+ "type": [
+ "null",
+ {
+ "name": "MySubRecord",
+ "type": "record",
+ "fields": [
+ {
+ "name": "x",
+ "type": [
+ "null",
+ "int"
+ ]
+ },
+ {
+ "name": "y",
+ "type": [
+ "null",
+ "long"
+ ],
+ },
+ {
+ "name": "z",
+ "type": [
+ "null",
+ "string"
+ ]
+ }
+ ]
+ }
+ ],
+ "default": None
+ }
+ ]
+ })
+
+ client = pulsar.Client(self.serviceUrl)
+ producer = client.create_producer(
+ 'my-avro-python-default-topic',
+ schema=AvroSchema(Example))
+
+ producer_default = client.create_producer(
+ 'my-avro-python-default-topic',
+ schema=AvroSchema(ExampleRequiredDefault))
+
+ producer.close()
+ producer_default.close()
+
+ client.close()
+
+
def test_default_value(self):
class MyRecord(Record):
A = Integer()
B = String()
- C = Boolean()
+ C = Boolean(default=True, required=True)
D = Double(default=6.4)
topic = "my-default-value-topic"
@@ -689,7 +846,7 @@ class SchemaTest(TestCase):
msg = consumer.receive()
self.assertEqual(msg.value().A, 5)
self.assertEqual(msg.value().B, u'text')
- self.assertEqual(msg.value().C, False)
+ self.assertEqual(msg.value().C, True)
self.assertEqual(msg.value().D, 6.4)
producer.close()