You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/01/30 19:06:13 UTC

[incubator-pulsar] branch master updated: Validate expected types in Python code wrapper (#1097)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 53619ff  Validate expected types in Python code wrapper (#1097)
53619ff is described below

commit 53619ff1923ff41aedb273dacb9312d973b5fb09
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jan 30 11:06:11 2018 -0800

    Validate expected types in Python code wrapper (#1097)
    
    * Validate expected types in Python code wrapper
    
    * Address comments
    
    * Left a couple of checks in old format
    
    * More compact unit tests
    
    * Fixed checks for producer_name which can be None
---
 pulsar-client-cpp/python/pulsar.py      | 58 ++++++++++++++++++++++
 pulsar-client-cpp/python/pulsar_test.py | 85 ++++++++++++++++++++++++++++++++-
 2 files changed, 142 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-cpp/python/pulsar.py b/pulsar-client-cpp/python/pulsar.py
index 8c1ff45..060e611 100644
--- a/pulsar-client-cpp/python/pulsar.py
+++ b/pulsar-client-cpp/python/pulsar.py
@@ -185,6 +185,8 @@ class Authentication:
         * `authParamsString`: Comma-separated list of provider-specific
           configuration params
         """
+        _check_type(str, dynamicLibPath, 'dynamicLibPath')
+        _check_type(str, authParamsString, 'authParamsString')
         self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString)
 
 
@@ -243,6 +245,17 @@ class Client:
           Configure whether the Pulsar client accepts untrusted TLS certificates
           from the broker.
         """
+        _check_type(str, service_url, 'service_url')
+        _check_type_or_none(Authentication, authentication, 'authentication')
+        _check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
+        _check_type(int, io_threads, 'io_threads')
+        _check_type(int, message_listener_threads, 'message_listener_threads')
+        _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
+        _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
+        _check_type(bool, use_tls, 'use_tls')
+        _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
+        _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
+
         conf = _pulsar.ClientConfiguration()
         if authentication:
             conf.authentication(authentication.auth)
@@ -306,6 +319,18 @@ class Client:
         * `message_routing_mode`:
           Set the message routing mode for the partitioned producer.
         """
+        _check_type(str, topic, 'topic')
+        _check_type_or_none(str, producer_name, 'producer_name')
+        _check_type_or_none(int, initial_sequence_id, 'initial_sequence_id')
+        _check_type(int, send_timeout_millis, 'send_timeout_millis')
+        _check_type(CompressionType, compression_type, 'compression_type')
+        _check_type(int, max_pending_messages, 'max_pending_messages')
+        _check_type(bool, block_if_queue_full, 'block_if_queue_full')
+        _check_type(bool, batching_enabled, 'batching_enabled')
+        _check_type(int, batching_max_messages, 'batching_max_messages')
+        _check_type(int, batching_max_allowed_size_in_bytes, 'batching_max_allowed_size_in_bytes')
+        _check_type(int, batching_max_publish_delay_ms, 'batching_max_publish_delay_ms')
+
         conf = _pulsar.ProducerConfiguration()
         conf.send_timeout_millis(send_timeout_millis)
         conf.compression_type(compression_type)
@@ -380,6 +405,14 @@ class Client:
           Sets the time duration for which the broker-side consumer stats will
           be cached in the client.
         """
+        _check_type(str, topic, 'topic')
+        _check_type(str, subscription_name, 'subscription_name')
+        _check_type(ConsumerType, consumer_type, 'consumer_type')
+        _check_type(int, receiver_queue_size, 'receiver_queue_size')
+        _check_type_or_none(str, consumer_name, 'consumer_name')
+        _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms')
+        _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms')
+
         conf = _pulsar.ConsumerConfiguration()
         conf.consumer_type(consumer_type)
         if message_listener:
@@ -444,6 +477,11 @@ class Client:
         * `reader_name`:
           Sets the reader name.
         """
+        _check_type(str, topic, 'topic')
+        _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
+        _check_type(int, receiver_queue_size, 'receiver_queue_size')
+        _check_type_or_none(str, reader_name, 'reader_name')
+
         conf = _pulsar.ReaderConfiguration()
         if reader_listener:
             conf.reader_listener(reader_listener)
@@ -588,6 +626,13 @@ class Producer:
 
     def _build_msg(self, content, properties, partition_key, sequence_id,
                    replication_clusters, disable_replication):
+        _check_type(bytes, content, 'content')
+        _check_type_or_none(dict, properties, 'properties')
+        _check_type_or_none(str, partition_key, 'partition_key')
+        _check_type_or_none(int, sequence_id, 'sequence_id')
+        _check_type_or_none(list, replication_clusters, 'replication_clusters')
+        _check_type(bool, disable_replication, 'disable_replication')
+
         mb = _pulsar.MessageBuilder()
         mb.content(content)
         if properties:
@@ -649,6 +694,7 @@ class Consumer:
         if timeout_millis is None:
             return self._consumer.receive()
         else:
+            _check_type(int, timeout_millis, 'timeout_millis')
             return self._consumer.receive(timeout_millis)
 
     def acknowledge(self, message):
@@ -741,6 +787,7 @@ class Reader:
         if timeout_millis is None:
             return self._reader.read_next()
         else:
+            _check_type(int, timeout_millis, 'timeout_millis')
             return self._reader.read_next(timeout_millis)
 
     def close(self):
@@ -749,3 +796,14 @@ class Reader:
         """
         self._reader.close()
         self._client._consumers.remove(self)
+
+
+def _check_type(var_type, var, name):
+    if not isinstance(var, var_type):
+        raise ValueError("Argument %s is expected to be of type '%s'" % (name, var_type.__name__))
+
+
+def _check_type_or_none(var_type, var, name):
+    if var is not None and not isinstance(var, var_type):
+        raise ValueError("Argument %s is expected to be either None or of type '%s'"
+                         % (name, var_type.__name__))
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 01ac521..4d3e0ad 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -248,7 +248,8 @@ class PulsarTest(TestCase):
                    'true')
         client = Client(self.serviceUrl)
 
-        topic = 'persistent://sample/standalone/ns1/my-python-test-producer-sequence-after-reconnection-' + str(time.time())
+        topic = 'persistent://sample/standalone/ns1/my-python-test-producer-sequence-after-reconnection-' \
+            + str(time.time())
 
         producer = client.create_producer(topic, producer_name='my-producer-name')
         self.assertEqual(producer.last_sequence_id(), -1)
@@ -320,6 +321,88 @@ class PulsarTest(TestCase):
             # Exception is expected
             pass
 
+    def test_message_argument_errors(self):
+        client = Client(self.serviceUrl)
+        topic = 'persistent://sample/standalone/ns1/my-python-test-producer'
+        producer = client.create_producer(topic)
+
+        content = 'test'.encode('utf-8')
+
+        self._check_value_error(lambda: producer.send(5))
+        self._check_value_error(lambda: producer.send(content, properties='test'))
+        self._check_value_error(lambda: producer.send(content, partition_key=5))
+        self._check_value_error(lambda: producer.send(content, sequence_id='test'))
+        self._check_value_error(lambda: producer.send(content, replication_clusters=5))
+        self._check_value_error(lambda: producer.send(content, disable_replication='test'))
+        client.close()
+
+    def test_client_argument_errors(self):
+        self._check_value_error(lambda: Client(None))
+        self._check_value_error(lambda: Client(self.serviceUrl, authentication="test"))
+        self._check_value_error(lambda: Client(self.serviceUrl, operation_timeout_seconds="test"))
+        self._check_value_error(lambda: Client(self.serviceUrl, io_threads="test"))
+        self._check_value_error(lambda: Client(self.serviceUrl, message_listener_threads="test"))
+        self._check_value_error(lambda: Client(self.serviceUrl, concurrent_lookup_requests="test"))
+        self._check_value_error(lambda: Client(self.serviceUrl, log_conf_file_path=5))
+        self._check_value_error(lambda: Client(self.serviceUrl, use_tls="test"))
+        self._check_value_error(lambda: Client(self.serviceUrl, tls_trust_certs_file_path=5))
+        self._check_value_error(lambda: Client(self.serviceUrl, tls_allow_insecure_connection="test"))
+
+    def test_producer_argument_errors(self):
+        client = Client(self.serviceUrl)
+
+        self._check_value_error(lambda: client.create_producer(None))
+
+        topic = 'persistent://sample/standalone/ns1/my-python-test-producer'
+
+        self._check_value_error(lambda: client.create_producer(topic, producer_name=5))
+        self._check_value_error(lambda: client.create_producer(topic, initial_sequence_id='test'))
+        self._check_value_error(lambda: client.create_producer(topic, send_timeout_millis='test'))
+        self._check_value_error(lambda: client.create_producer(topic, compression_type=None))
+        self._check_value_error(lambda: client.create_producer(topic, max_pending_messages='test'))
+        self._check_value_error(lambda: client.create_producer(topic, block_if_queue_full='test'))
+        self._check_value_error(lambda: client.create_producer(topic, batching_enabled='test'))
+        self._check_value_error(lambda: client.create_producer(topic, batching_enabled='test'))
+        self._check_value_error(lambda: client.create_producer(topic, batching_max_allowed_size_in_bytes='test'))
+        self._check_value_error(lambda: client.create_producer(topic, batching_max_publish_delay_ms='test'))
+        client.close()
+
+    def test_consumer_argument_errors(self):
+        client = Client(self.serviceUrl)
+
+        topic = 'persistent://sample/standalone/ns1/my-python-test-producer'
+        sub_name = 'my-sub-name'
+
+        self._check_value_error(lambda: client.subscribe(None, sub_name))
+        self._check_value_error(lambda: client.subscribe(topic, None))
+        self._check_value_error(lambda: client.subscribe(topic, sub_name, consumer_type=None))
+        self._check_value_error(lambda: client.subscribe(topic, sub_name, receiver_queue_size='test'))
+        self._check_value_error(lambda: client.subscribe(topic, sub_name, consumer_name=5))
+        self._check_value_error(lambda: client.subscribe(topic, sub_name, unacked_messages_timeout_ms='test'))
+        self._check_value_error(lambda: client.subscribe(topic, sub_name, broker_consumer_stats_cache_time_ms='test'))
+        client.close()
+
+    def test_reader_argument_errors(self):
+        client = Client(self.serviceUrl)
+        topic = 'persistent://sample/standalone/ns1/my-python-test-producer'
+
+        # This should not raise exception
+        client.create_reader(topic, MessageId.earliest)
+
+        self._check_value_error(lambda: client.create_reader(None, MessageId.earliest))
+        self._check_value_error(lambda: client.create_reader(topic, None))
+        self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, receiver_queue_size='test'))
+        self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, reader_name=5))
+        client.close()
+
+    def _check_value_error(self, fun):
+        try:
+            fun()
+            # Should throw exception
+            self.assertTrue(False)
+        except ValueError:
+            pass  # Expected
+
 
 if __name__ == '__main__':
     main()

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.