You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/15 02:16:22 UTC

[pulsar] branch master updated: [Issue 3127][python-client] Replace Exceptions with PulsarExceptions (#7600)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f36544  [Issue 3127][python-client] Replace Exceptions with PulsarExceptions (#7600)
3f36544 is described below

commit 3f36544c76620a7c6cd91c8047ecc28dbc3f85db
Author: Livio BenĨik <lb...@gmail.com>
AuthorDate: Sat May 15 04:15:48 2021 +0200

    [Issue 3127][python-client] Replace Exceptions with PulsarExceptions (#7600)
    
    Fixes #7600
    
    ### Motivation
    
    As the issue says, the Python client throws `Exception`s instead of a subclass of it (`PulsarException`), so the clients must catch the blanket Exception.
    
    ### Modifications
    
    Every C `PulsarException` is now thrown in Python with the same type.
---
 pulsar-client-cpp/python/CMakeLists.txt       |   3 +-
 pulsar-client-cpp/python/pulsar/__init__.py   |   2 +
 pulsar-client-cpp/python/pulsar/exceptions.py |  28 +++++
 pulsar-client-cpp/python/pulsar_test.py       | 171 +++++++++-----------------
 pulsar-client-cpp/python/src/enums.cc         |  13 ++
 pulsar-client-cpp/python/src/exceptions.cc    |  92 ++++++++++++++
 pulsar-client-cpp/python/src/pulsar.cc        |   6 +-
 7 files changed, 201 insertions(+), 114 deletions(-)

diff --git a/pulsar-client-cpp/python/CMakeLists.txt b/pulsar-client-cpp/python/CMakeLists.txt
index e78d80a..c110d01 100644
--- a/pulsar-client-cpp/python/CMakeLists.txt
+++ b/pulsar-client-cpp/python/CMakeLists.txt
@@ -29,7 +29,8 @@ ADD_LIBRARY(_pulsar SHARED src/pulsar.cc
                            src/authentication.cc
                            src/reader.cc
                            src/schema.cc
-                           src/cryptoKeyReader.cc)
+                           src/cryptoKeyReader.cc
+                           src/exceptions.cc)
 
 SET(CMAKE_SHARED_LIBRARY_PREFIX )
 SET(CMAKE_SHARED_LIBRARY_SUFFIX .so)
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index b47c87d..3f00554 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -103,6 +103,8 @@ import _pulsar
 
 from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType  # noqa: F401
 
+from pulsar.exceptions import *
+
 from pulsar.functions.function import Function
 from pulsar.functions.context import Context
 from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
diff --git a/pulsar-client-cpp/python/pulsar/exceptions.py b/pulsar-client-cpp/python/pulsar/exceptions.py
new file mode 100644
index 0000000..d151564
--- /dev/null
+++ b/pulsar-client-cpp/python/pulsar/exceptions.py
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from _pulsar import PulsarException, UnknownError, InvalidConfiguration, Timeout, LookupError, ConnectError, \
+    ReadError, AuthenticationError, AuthorizationError, ErrorGettingAuthenticationData, BrokerMetadataError, \
+    BrokerPersistenceError, ChecksumError, ConsumerBusy, NotConnected, AlreadyClosed, InvalidMessage, \
+    ConsumerNotInitialized, ProducerNotInitialized, ProducerBusy, TooManyLookupRequestException, InvalidTopicName, \
+    InvalidUrl, ServiceUnitNotReady, OperationNotSupported, ProducerBlockedQuotaExceededError, \
+    ProducerBlockedQuotaExceededException, ProducerQueueIsFull, MessageTooBig, TopicNotFound, SubscriptionNotFound, \
+    ConsumerNotFound, UnsupportedVersionError, TopicTerminated, CryptoError, IncompatibleSchema, ConsumerAssignError, \
+    CumulativeAcknowledgementNotAllowedError, TransactionCoordinatorNotFoundError, InvalidTxnStatusError, \
+    NotAllowedError, TransactionConflict, TransactionNotFound, ProducerFenced, MemoryBufferIsFull
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index e7d05f3..71b67d7 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -42,6 +42,7 @@ except ImportError:
 
 TM = 10000  # Do not wait forever in tests
 
+
 def doHttpPost(url, data):
     req = Request(url, data.encode())
     req.add_header('Content-Type', 'application/json')
@@ -67,6 +68,7 @@ def doHttpGet(url):
     req.add_header('Accept', 'application/json')
     return urlopen(req).read()
 
+
 class PulsarTest(TestCase):
 
     serviceUrl = 'pulsar://localhost:6650'
@@ -96,6 +98,16 @@ class PulsarTest(TestCase):
         conf.consumer_name("my-name")
         self.assertEqual(conf.consumer_name(), "my-name")
 
+    def test_connect_error(self):
+        with self.assertRaises(pulsar.ConnectError):
+            client = Client('fakeServiceUrl')
+            client.create_producer('connect-error-topic')
+            client.close()
+
+    def test_exception_inheritance(self):
+        assert issubclass(pulsar.ConnectError, pulsar.PulsarException)
+        assert issubclass(pulsar.PulsarException, Exception)
+
     def test_simple_producer(self):
         client = Client(self.serviceUrl)
         producer = client.create_producer('my-python-topic')
@@ -147,11 +159,8 @@ class PulsarTest(TestCase):
         self.assertTrue(msg)
         self.assertEqual(msg.data(), b'hello')
 
-        try:
-            msg = consumer.receive(100)
-            self.assertTrue(False)  # Should not reach this point
-        except:
-            pass  # Exception is expected
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(100)
 
         consumer.unsubscribe()
         client.close()
@@ -189,11 +198,8 @@ class PulsarTest(TestCase):
         producer.send(b'hello', deliver_at=int(round(time.time() * 1000)) + 1100)
 
         # Message should not be available in the next second
-        try:
-            msg = consumer.receive(1000)
-            self.assertTrue(False)  # Should not reach this point
-        except:
-            pass  # Exception is expected
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(1000)
 
         # Message should be published now
         msg = consumer.receive(TM)
@@ -213,11 +219,8 @@ class PulsarTest(TestCase):
         producer.send(b'hello', deliver_after=timedelta(milliseconds=1100))
 
         # Message should not be available in the next second
-        try:
-            msg = consumer.receive(1000)
-            self.assertTrue(False)  # Should not reach this point
-        except:
-            pass  # Exception is expected
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(1000)
 
         # Message should be published in the next 500ms
         msg = consumer.receive(TM)
@@ -229,14 +232,14 @@ class PulsarTest(TestCase):
 
     def test_consumer_initial_position(self):
         client = Client(self.serviceUrl)
-        producer = client.create_producer('my-python-topic-producer-consumer')
+        producer = client.create_producer('consumer-initial-position')
 
         # Sending 5 messages before consumer creation.
         # These should be received with initial_position set to Earliest but not with Latest.
         for i in range(5):
             producer.send(b'hello-%d' % i)
 
-        consumer = client.subscribe('my-python-topic-producer-consumer',
+        consumer = client.subscribe('consumer-initial-position',
                                     'my-sub',
                                     consumer_type=ConsumerType.Shared,
                                     initial_position=InitialPosition.Earliest)
@@ -250,11 +253,8 @@ class PulsarTest(TestCase):
             self.assertTrue(msg)
             self.assertEqual(msg.data(), b'hello-%d' % i)
 
-        try:
-            msg = consumer.receive(100)
-            self.assertTrue(False)  # Should not reach this point
-        except:
-            pass  # Exception is expected
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(100)
 
         consumer.unsubscribe()
         client.close()
@@ -310,21 +310,18 @@ class PulsarTest(TestCase):
                         tls_allow_insecure_connection=False,
                         authentication=AuthenticationTLS(certs_dir + 'client-cert.pem', certs_dir + 'client-key.pem'))
 
-        consumer = client.subscribe('my-python-topic-producer-consumer',
+        consumer = client.subscribe('my-python-topic-tls-auth',
                                     'my-sub',
                                     consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('my-python-topic-producer-consumer')
+        producer = client.create_producer('my-python-topic-tls-auth')
         producer.send(b'hello')
 
         msg = consumer.receive(TM)
         self.assertTrue(msg)
         self.assertEqual(msg.data(), b'hello')
 
-        try:
-            msg = consumer.receive(100)
-            self.assertTrue(False)  # Should not reach this point
-        except:
-            pass  # Exception is expected
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(100)
 
         client.close()
 
@@ -340,21 +337,18 @@ class PulsarTest(TestCase):
                         tls_allow_insecure_connection=False,
                         authentication=Authentication(authPlugin, authParams))
 
-        consumer = client.subscribe('my-python-topic-producer-consumer',
+        consumer = client.subscribe('my-python-topic-tls-auth-2',
                                     'my-sub',
                                     consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('my-python-topic-producer-consumer')
+        producer = client.create_producer('my-python-topic-tls-auth-2')
         producer.send(b'hello')
 
         msg = consumer.receive(TM)
         self.assertTrue(msg)
         self.assertEqual(msg.data(), b'hello')
 
-        try:
-            msg = consumer.receive(100)
-            self.assertTrue(False)  # Should not reach this point
-        except:
-            pass  # Exception is expected
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(100)
 
         client.close()
 
@@ -389,21 +383,18 @@ class PulsarTest(TestCase):
                         tls_allow_insecure_connection=False,
                         authentication=Authentication(authPlugin, authParams))
 
-        consumer = client.subscribe('my-python-topic-producer-consumer',
+        consumer = client.subscribe('my-python-topic-tls-auth-3',
                                     'my-sub',
                                     consumer_type=ConsumerType.Shared)
-        producer = client.create_producer('my-python-topic-producer-consumer')
+        producer = client.create_producer('my-python-topic-tls-auth-3')
         producer.send(b'hello')
 
         msg = consumer.receive(TM)
         self.assertTrue(msg)
         self.assertEqual(msg.data(), b'hello')
 
-        try:
-            msg = consumer.receive(100)
-            self.assertTrue(False)  # Should not reach this point
-        except:
-            pass  # Exception is expected
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(100)
 
         client.close()
 
@@ -417,12 +408,11 @@ class PulsarTest(TestCase):
                         tls_trust_certs_file_path=certs_dir + 'cacert.pem',
                         tls_allow_insecure_connection=False,
                         authentication=Authentication(authPlugin, authParams))
-        try:
-            client.subscribe('my-python-topic-producer-consumer',
+
+        with self.assertRaises(pulsar.ConnectError):
+            client.subscribe('my-python-topic-auth-junk-params',
                              'my-sub',
                              consumer_type=ConsumerType.Shared)
-        except:
-            pass  # Exception is expected
 
     def test_message_listener(self):
         client = Client(self.serviceUrl)
@@ -462,11 +452,8 @@ class PulsarTest(TestCase):
         self.assertTrue(msg)
         self.assertEqual(msg.data(), b'hello')
 
-        try:
-            msg = reader.read_next(100)
-            self.assertTrue(False)  # Should not reach this point
-        except:
-            pass  # Exception is expected
+        with self.assertRaises(pulsar.Timeout):
+            reader.read_next(100)
 
         reader.close()
         client.close()
@@ -618,13 +605,8 @@ class PulsarTest(TestCase):
             self.assertEqual(msg.data(), b'hello-%d' % i)
             consumer.acknowledge(msg)
 
-        try:
-            # No other messages should be received
-            consumer.receive(timeout_millis=1000)
-            self.assertTrue(False)
-        except:
-            # Exception is expected
-            pass
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(100)
 
         producer.close()
 
@@ -636,13 +618,8 @@ class PulsarTest(TestCase):
         producer.send(b'hello-2', sequence_id=2)
         self.assertEqual(producer.last_sequence_id(), 2)
 
-        try:
-            # No other messages should be received
-            consumer.receive(timeout_millis=1000)
-            self.assertTrue(False)
-        except:
-            # Exception is expected
-            pass
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(100)
 
         doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication',
                    'false')
@@ -873,11 +850,8 @@ class PulsarTest(TestCase):
 
         # repeat with reader
         reader = client.create_reader('my-python-topic-seek', MessageId.latest)
-        try:
-            msg = reader.read_next(100)
-            self.assertTrue(False)  # Should not reach this point
-        except:
-            pass  # Exception is expected
+        with self.assertRaises(pulsar.Timeout):
+            reader.read_next(100)
 
         # earliest
         reader.seek(MessageId.earliest)
@@ -925,11 +899,8 @@ class PulsarTest(TestCase):
         self.assertEqual(msg.data(), b'hello')
         consumer.acknowledge(msg)
 
-        try:
-            msg = consumer.receive(100)
-            self.assertTrue(False)  # Should not reach this point
-        except:
-            pass  # Exception is expected
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(100)
 
         client.close()
 
@@ -972,13 +943,8 @@ class PulsarTest(TestCase):
             msg = consumer.receive(TM)
             consumer.acknowledge(msg)
 
-        try:
-        # No other messages should be received
-            consumer.receive(timeout_millis=500)
-            self.assertTrue(False)
-        except:
-            # Exception is expected
-            pass
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(100)
         client.close()
 
     def test_topics_pattern_consumer(self):
@@ -1027,13 +993,8 @@ class PulsarTest(TestCase):
             msg = consumer.receive(TM)
             consumer.acknowledge(msg)
 
-        try:
-            # No other messages should be received
-            consumer.receive(timeout_millis=500)
-            self.assertTrue(False)
-        except:
-            # Exception is expected
-            pass
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(100)
         client.close()
 
     def test_message_id(self):
@@ -1110,11 +1071,8 @@ class PulsarTest(TestCase):
         self.assertTrue(msg)
         self.assertEqual(msg.data(), b'hello')
 
-        try:
-            msg = consumer.receive(100)
-            self.assertTrue(False)  # Should not reach this point
-        except:
-            pass  # Exception is expected
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(100)
 
         consumer.unsubscribe()
         client.close()
@@ -1158,7 +1116,8 @@ class PulsarTest(TestCase):
         client = Client(self.serviceUrl)
         consumer = client.subscribe('test_negative_acks',
                                     'test',
-                                    schema=pulsar.schema.StringSchema())
+                                    schema=pulsar.schema.StringSchema(),
+                                    negative_ack_redelivery_delay_ms=1000)
         producer = client.create_producer('test_negative_acks',
                                           schema=pulsar.schema.StringSchema())
         for i in range(10):
@@ -1176,29 +1135,17 @@ class PulsarTest(TestCase):
             self.assertEqual(msg.value(), "hello-%d" % i)
             consumer.acknowledge(msg)
 
-        try:
-            # No more messages expected
-            msg = consumer.receive(100)
-            self.assertTrue(False)
-        except:
-            pass  # Exception is expected
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(100)
         client.close()
 
     def _check_value_error(self, fun):
-        try:
+        with self.assertRaises(ValueError):
             fun()
-            # Should throw exception
-            self.assertTrue(False)
-        except ValueError:
-            pass  # Expected
 
     def _check_type_error(self, fun):
-        try:
+        with self.assertRaises(TypeError):
             fun()
-            # Should throw exception
-            self.assertTrue(False)
-        except TypeError:
-            pass  # Expected
 
 
 if __name__ == '__main__':
diff --git a/pulsar-client-cpp/python/src/enums.cc b/pulsar-client-cpp/python/src/enums.cc
index f57e08d..c23b211 100644
--- a/pulsar-client-cpp/python/src/enums.cc
+++ b/pulsar-client-cpp/python/src/enums.cc
@@ -63,6 +63,7 @@ void export_enums() {
             .value("InvalidMessage", ResultInvalidMessage)
             .value("ConsumerNotInitialized", ResultConsumerNotInitialized)
             .value("ProducerNotInitialized", ResultProducerNotInitialized)
+            .value("ProducerBusy", ResultProducerBusy)
             .value("TooManyLookupRequestException", ResultTooManyLookupRequestException)
             .value("InvalidTopicName", ResultInvalidTopicName)
             .value("InvalidUrl", ResultInvalidUrl)
@@ -76,6 +77,18 @@ void export_enums() {
             .value("SubscriptionNotFound", ResultSubscriptionNotFound)
             .value("ConsumerNotFound", ResultConsumerNotFound)
             .value("UnsupportedVersionError", ResultUnsupportedVersionError)
+            .value("TopicTerminated", ResultTopicTerminated)
+            .value("CryptoError", ResultCryptoError)
+            .value("IncompatibleSchema", ResultIncompatibleSchema)
+            .value("ConsumerAssignError", ResultConsumerAssignError)
+            .value("CumulativeAcknowledgementNotAllowedError", ResultCumulativeAcknowledgementNotAllowedError)
+            .value("TransactionCoordinatorNotFoundError", ResultTransactionCoordinatorNotFoundError)
+            .value("InvalidTxnStatusError", ResultInvalidTxnStatusError)
+            .value("NotAllowedError", ResultNotAllowedError)
+            .value("TransactionConflict", ResultTransactionConflict)
+            .value("TransactionNotFound", ResultTransactionNotFound)
+            .value("ProducerFenced", ResultProducerFenced)
+            .value("MemoryBufferIsFull", ResultMemoryBufferIsFull)
             ;
 
     enum_<SchemaType>("SchemaType", "Supported schema types")
diff --git a/pulsar-client-cpp/python/src/exceptions.cc b/pulsar-client-cpp/python/src/exceptions.cc
new file mode 100644
index 0000000..c39b52d
--- /dev/null
+++ b/pulsar-client-cpp/python/src/exceptions.cc
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <map>
+
+#include "utils.h"
+
+static PyObject* basePulsarException = nullptr;
+std::map<Result, PyObject*> exceptions;
+
+PyObject* createExceptionClass(const char* name, PyObject* baseTypeObj = PyExc_Exception) {
+    using namespace boost::python;
+
+    std::string fullName = "_pulsar.";
+    fullName += name;
+
+    PyObject* typeObj = PyErr_NewException(const_cast<char*>(fullName.c_str()),
+                                           baseTypeObj, nullptr);
+    if (!typeObj) throw_error_already_set();
+    scope().attr(name) = handle<>(borrowed(typeObj));
+    return typeObj;
+}
+
+PyObject* get_exception_class(Result result) {
+    return exceptions[result];
+}
+
+void export_exceptions() {
+    using namespace boost::python;
+
+    basePulsarException = createExceptionClass("PulsarException");
+
+    exceptions[ResultUnknownError] = createExceptionClass("UnknownError", basePulsarException);
+    exceptions[ResultInvalidConfiguration] = createExceptionClass("InvalidConfiguration", basePulsarException);
+    exceptions[ResultTimeout] = createExceptionClass("Timeout", basePulsarException);
+    exceptions[ResultLookupError] = createExceptionClass("LookupError", basePulsarException);
+    exceptions[ResultConnectError] = createExceptionClass("ConnectError", basePulsarException);
+    exceptions[ResultReadError] = createExceptionClass("ReadError", basePulsarException);
+    exceptions[ResultAuthenticationError] = createExceptionClass("AuthenticationError", basePulsarException);
+    exceptions[ResultAuthorizationError] = createExceptionClass("AuthorizationError", basePulsarException);
+    exceptions[ResultErrorGettingAuthenticationData] = createExceptionClass("ErrorGettingAuthenticationData", basePulsarException);
+    exceptions[ResultBrokerMetadataError] = createExceptionClass("BrokerMetadataError", basePulsarException);
+    exceptions[ResultBrokerPersistenceError] = createExceptionClass("BrokerPersistenceError", basePulsarException);
+    exceptions[ResultChecksumError] = createExceptionClass("ChecksumError", basePulsarException);
+    exceptions[ResultConsumerBusy] = createExceptionClass("ConsumerBusy", basePulsarException);
+    exceptions[ResultNotConnected] = createExceptionClass("NotConnected", basePulsarException);
+    exceptions[ResultAlreadyClosed] = createExceptionClass("AlreadyClosed", basePulsarException);
+    exceptions[ResultInvalidMessage] = createExceptionClass("InvalidMessage", basePulsarException);
+    exceptions[ResultConsumerNotInitialized] = createExceptionClass("ConsumerNotInitialized", basePulsarException);
+    exceptions[ResultProducerNotInitialized] = createExceptionClass("ProducerNotInitialized", basePulsarException);
+    exceptions[ResultProducerBusy] = createExceptionClass("ProducerBusy", basePulsarException);
+    exceptions[ResultTooManyLookupRequestException] = createExceptionClass("TooManyLookupRequestException", basePulsarException);
+    exceptions[ResultInvalidTopicName] = createExceptionClass("InvalidTopicName", basePulsarException);
+    exceptions[ResultInvalidUrl] = createExceptionClass("InvalidUrl", basePulsarException);
+    exceptions[ResultServiceUnitNotReady] = createExceptionClass("ServiceUnitNotReady", basePulsarException);
+    exceptions[ResultOperationNotSupported] = createExceptionClass("OperationNotSupported", basePulsarException);
+    exceptions[ResultProducerBlockedQuotaExceededError] = createExceptionClass("ProducerBlockedQuotaExceededError", basePulsarException);
+    exceptions[ResultProducerBlockedQuotaExceededException] = createExceptionClass("ProducerBlockedQuotaExceededException", basePulsarException);
+    exceptions[ResultProducerQueueIsFull] = createExceptionClass("ProducerQueueIsFull", basePulsarException);
+    exceptions[ResultMessageTooBig] = createExceptionClass("MessageTooBig", basePulsarException);
+    exceptions[ResultTopicNotFound] = createExceptionClass("TopicNotFound", basePulsarException);
+    exceptions[ResultSubscriptionNotFound] = createExceptionClass("SubscriptionNotFound", basePulsarException);
+    exceptions[ResultConsumerNotFound] = createExceptionClass("ConsumerNotFound", basePulsarException);
+    exceptions[ResultUnsupportedVersionError] = createExceptionClass("UnsupportedVersionError", basePulsarException);
+    exceptions[ResultTopicTerminated] = createExceptionClass("TopicTerminated", basePulsarException);
+    exceptions[ResultCryptoError] = createExceptionClass("CryptoError", basePulsarException);
+    exceptions[ResultIncompatibleSchema] = createExceptionClass("IncompatibleSchema", basePulsarException);
+    exceptions[ResultConsumerAssignError] = createExceptionClass("ConsumerAssignError", basePulsarException);
+    exceptions[ResultCumulativeAcknowledgementNotAllowedError] = createExceptionClass("CumulativeAcknowledgementNotAllowedError", basePulsarException);
+    exceptions[ResultTransactionCoordinatorNotFoundError] = createExceptionClass("TransactionCoordinatorNotFoundError", basePulsarException);
+    exceptions[ResultInvalidTxnStatusError] = createExceptionClass("InvalidTxnStatusError", basePulsarException);
+    exceptions[ResultNotAllowedError] = createExceptionClass("NotAllowedError", basePulsarException);
+    exceptions[ResultTransactionConflict] = createExceptionClass("TransactionConflict", basePulsarException);
+    exceptions[ResultTransactionNotFound] = createExceptionClass("TransactionNotFound", basePulsarException);
+    exceptions[ResultProducerFenced] = createExceptionClass("ProducerFenced", basePulsarException);
+    exceptions[ResultMemoryBufferIsFull] = createExceptionClass("MemoryBufferIsFull", basePulsarException);
+}
diff --git a/pulsar-client-cpp/python/src/pulsar.cc b/pulsar-client-cpp/python/src/pulsar.cc
index f80c9a4..a46ce53 100644
--- a/pulsar-client-cpp/python/src/pulsar.cc
+++ b/pulsar-client-cpp/python/src/pulsar.cc
@@ -28,13 +28,16 @@ void export_enums();
 void export_authentication();
 void export_schema();
 void export_cryptoKeyReader();
+void export_exceptions();
+
+PyObject* get_exception_class(Result result);
 
 
 static void translateException(const PulsarException& ex) {
     std::string err = "Pulsar error: ";
     err += strResult(ex._result);
 
-    PyErr_SetString(PyExc_Exception, err.c_str());
+    PyErr_SetString(get_exception_class(ex._result), err.c_str());
 }
 
 BOOST_PYTHON_MODULE(_pulsar)
@@ -55,4 +58,5 @@ BOOST_PYTHON_MODULE(_pulsar)
     export_authentication();
     export_schema();
     export_cryptoKeyReader();
+    export_exceptions();
 }