You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/01/30 19:06:13 UTC
[incubator-pulsar] branch master updated: Validate expected types
in Python code wrapper (#1097)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 53619ff Validate expected types in Python code wrapper (#1097)
53619ff is described below
commit 53619ff1923ff41aedb273dacb9312d973b5fb09
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jan 30 11:06:11 2018 -0800
Validate expected types in Python code wrapper (#1097)
* Validate expected types in Python code wrapper
* Address comments
* Left a couple of checks in old format
* More compact unit tests
* Fixed checks for producer_name which can be None
---
pulsar-client-cpp/python/pulsar.py | 58 ++++++++++++++++++++++
pulsar-client-cpp/python/pulsar_test.py | 85 ++++++++++++++++++++++++++++++++-
2 files changed, 142 insertions(+), 1 deletion(-)
diff --git a/pulsar-client-cpp/python/pulsar.py b/pulsar-client-cpp/python/pulsar.py
index 8c1ff45..060e611 100644
--- a/pulsar-client-cpp/python/pulsar.py
+++ b/pulsar-client-cpp/python/pulsar.py
@@ -185,6 +185,8 @@ class Authentication:
* `authParamsString`: Comma-separated list of provider-specific
configuration params
"""
+ _check_type(str, dynamicLibPath, 'dynamicLibPath')
+ _check_type(str, authParamsString, 'authParamsString')
self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
@@ -243,6 +245,17 @@ class Client:
Configure whether the Pulsar client accepts untrusted TLS certificates
from the broker.
"""
+ _check_type(str, service_url, 'service_url')
+ _check_type_or_none(Authentication, authentication, 'authentication')
+ _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
+ _check_type(int, io_threads, 'io_threads')
+ _check_type(int, message_listener_threads, 'message_listener_threads')
+ _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
+ _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
+ _check_type(bool, use_tls, 'use_tls')
+ _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
+ _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
+
conf = _pulsar.ClientConfiguration()
if authentication:
conf.authentication(authentication.auth)
@@ -306,6 +319,18 @@ class Client:
* `message_routing_mode`:
Set the message routing mode for the partitioned producer.
"""
+ _check_type(str, topic, 'topic')
+ _check_type_or_none(str, producer_name, 'producer_name')
+ _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
+ _check_type(int, send_timeout_millis, 'send_timeout_millis')
+ _check_type(CompressionType, compression_type, 'compression_type')
+ _check_type(int, max_pending_messages, 'max_pending_messages')
+ _check_type(bool, block_if_queue_full, 'block_if_queue_full')
+ _check_type(bool, batching_enabled, 'batching_enabled')
+ _check_type(int, batching_max_messages, 'batching_max_messages')
+ _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
+ _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
+
conf = _pulsar.ProducerConfiguration()
conf.send_timeout_millis(send_timeout_millis)
conf.compression_type(compression_type)
@@ -380,6 +405,14 @@ class Client:
Sets the time duration for which the broker-side consumer stats will
be cached in the client.
"""
+ _check_type(str, topic, 'topic')
+ _check_type(str, subscription_name, 'subscription_name')
+ _check_type(ConsumerType, consumer_type, 'consumer_type')
+ _check_type(int, receiver_queue_size, 'receiver_queue_size')
+ _check_type_or_none(str, consumer_name, 'consumer_name')
+ _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
+ _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
+
conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
if message_listener:
@@ -444,6 +477,11 @@ class Client:
* `reader_name`:
Sets the reader name.
"""
+ _check_type(str, topic, 'topic')
+ _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
+ _check_type(int, receiver_queue_size, 'receiver_queue_size')
+ _check_type_or_none(str, reader_name, 'reader_name')
+
conf = _pulsar.ReaderConfiguration()
if reader_listener:
conf.reader_listener(reader_listener)
@@ -588,6 +626,13 @@ class Producer:
def _build_msg(self, content, properties, partition_key, sequence_id,
replication_clusters, disable_replication):
+ _check_type(bytes, content, 'content')
+ _check_type_or_none(dict, properties, 'properties')
+ _check_type_or_none(str, partition_key, 'partition_key')
+ _check_type_or_none(int, sequence_id, 'sequence_id')
+ _check_type_or_none(list, replication_clusters, 'replication_clusters')
+ _check_type(bool, disable_replication, 'disable_replication')
+
mb = _pulsar.MessageBuilder()
mb.content(content)
if properties:
@@ -649,6 +694,7 @@ class Consumer:
if timeout_millis is None:
return self._consumer.receive()
else:
+ _check_type(int, timeout_millis, 'timeout_millis')
return self._consumer.receive(timeout_millis)
def acknowledge(self, message):
@@ -741,6 +787,7 @@ class Reader:
if timeout_millis is None:
return self._reader.read_next()
else:
+ _check_type(int, timeout_millis, 'timeout_millis')
return self._reader.read_next(timeout_millis)
def close(self):
@@ -749,3 +796,14 @@ class Reader:
"""
self._reader.close()
self._client._consumers.remove(self)
+
+
+def _check_type(var_type, var, name):
+ if not isinstance(var, var_type):
+ raise ValueError("Argument %s is expected to be of type '%s'" % (name, var_type.__name__))
+
+
+def _check_type_or_none(var_type, var, name):
+ if var is not None and not isinstance(var, var_type):
+ raise ValueError("Argument %s is expected to be either None or of type '%s'"
+ % (name, var_type.__name__))
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 01ac521..4d3e0ad 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -248,7 +248,8 @@ class PulsarTest(TestCase):
'true')
client = Client(self.serviceUrl)
- topic = 'persistent://sample/standalone/ns1/my-python-test-producer-sequence-after-reconnection-' + str(time.time())
+ topic = 'persistent://sample/standalone/ns1/my-python-test-producer-sequence-after-reconnection-' \
+ + str(time.time())
producer = client.create_producer(topic, producer_name='my-producer-name')
self.assertEqual(producer.last_sequence_id(), -1)
@@ -320,6 +321,88 @@ class PulsarTest(TestCase):
# Exception is expected
pass
+ def test_message_argument_errors(self):
+ client = Client(self.serviceUrl)
+ topic = 'persistent://sample/standalone/ns1/my-python-test-producer'
+ producer = client.create_producer(topic)
+
+ content = 'test'.encode('utf-8')
+
+ self._check_value_error(lambda: producer.send(5))
+ self._check_value_error(lambda: producer.send(content, properties='test'))
+ self._check_value_error(lambda: producer.send(content, partition_key=5))
+ self._check_value_error(lambda: producer.send(content, sequence_id='test'))
+ self._check_value_error(lambda: producer.send(content, replication_clusters=5))
+ self._check_value_error(lambda: producer.send(content, disable_replication='test'))
+ client.close()
+
+ def test_client_argument_errors(self):
+ self._check_value_error(lambda: Client(None))
+ self._check_value_error(lambda: Client(self.serviceUrl, authentication="test"))
+ self._check_value_error(lambda: Client(self.serviceUrl, operation_timeout_seconds="test"))
+ self._check_value_error(lambda: Client(self.serviceUrl, io_threads="test"))
+ self._check_value_error(lambda: Client(self.serviceUrl, message_listener_threads="test"))
+ self._check_value_error(lambda: Client(self.serviceUrl, concurrent_lookup_requests="test"))
+ self._check_value_error(lambda: Client(self.serviceUrl, log_conf_file_path=5))
+ self._check_value_error(lambda: Client(self.serviceUrl, use_tls="test"))
+ self._check_value_error(lambda: Client(self.serviceUrl, tls_trust_certs_file_path=5))
+ self._check_value_error(lambda: Client(self.serviceUrl, tls_allow_insecure_connection="test"))
+
+ def test_producer_argument_errors(self):
+ client = Client(self.serviceUrl)
+
+ self._check_value_error(lambda: client.create_producer(None))
+
+ topic = 'persistent://sample/standalone/ns1/my-python-test-producer'
+
+ self._check_value_error(lambda: client.create_producer(topic, producer_name=5))
+ self._check_value_error(lambda: client.create_producer(topic, initial_sequence_id='test'))
+ self._check_value_error(lambda: client.create_producer(topic, send_timeout_millis='test'))
+ self._check_value_error(lambda: client.create_producer(topic, compression_type=None))
+ self._check_value_error(lambda: client.create_producer(topic, max_pending_messages='test'))
+ self._check_value_error(lambda: client.create_producer(topic, block_if_queue_full='test'))
+ self._check_value_error(lambda: client.create_producer(topic, batching_enabled='test'))
+ self._check_value_error(lambda: client.create_producer(topic, batching_enabled='test'))
+ self._check_value_error(lambda: client.create_producer(topic, batching_max_allowed_size_in_bytes='test'))
+ self._check_value_error(lambda: client.create_producer(topic, batching_max_publish_delay_ms='test'))
+ client.close()
+
+ def test_consumer_argument_errors(self):
+ client = Client(self.serviceUrl)
+
+ topic = 'persistent://sample/standalone/ns1/my-python-test-producer'
+ sub_name = 'my-sub-name'
+
+ self._check_value_error(lambda: client.subscribe(None, sub_name))
+ self._check_value_error(lambda: client.subscribe(topic, None))
+ self._check_value_error(lambda: client.subscribe(topic, sub_name, consumer_type=None))
+ self._check_value_error(lambda: client.subscribe(topic, sub_name, receiver_queue_size='test'))
+ self._check_value_error(lambda: client.subscribe(topic, sub_name, consumer_name=5))
+ self._check_value_error(lambda: client.subscribe(topic, sub_name, unacked_messages_timeout_ms='test'))
+ self._check_value_error(lambda: client.subscribe(topic, sub_name, broker_consumer_stats_cache_time_ms='test'))
+ client.close()
+
+ def test_reader_argument_errors(self):
+ client = Client(self.serviceUrl)
+ topic = 'persistent://sample/standalone/ns1/my-python-test-producer'
+
+ # This should not raise exception
+ client.create_reader(topic, MessageId.earliest)
+
+ self._check_value_error(lambda: client.create_reader(None, MessageId.earliest))
+ self._check_value_error(lambda: client.create_reader(topic, None))
+ self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, receiver_queue_size='test'))
+ self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, reader_name=5))
+ client.close()
+
+ def _check_value_error(self, fun):
+ try:
+ fun()
+ # Should throw exception
+ self.assertTrue(False)
+ except ValueError:
+ pass # Expected
+
if __name__ == '__main__':
main()
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.