You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/15 02:16:22 UTC
[pulsar] branch master updated: [Issue 3127][python-client] Replace
Exceptions with PulsarExceptions (#7600)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3f36544 [Issue 3127][python-client] Replace Exceptions with PulsarExceptions (#7600)
3f36544 is described below
commit 3f36544c76620a7c6cd91c8047ecc28dbc3f85db
Author: Livio BenĨik <lb...@gmail.com>
AuthorDate: Sat May 15 04:15:48 2021 +0200
[Issue 3127][python-client] Replace Exceptions with PulsarExceptions (#7600)
Fixes #7600
### Motivation
As the issue says, the Python client throws `Exception`s instead of a subclass of it (`PulsarException`), so the clients must catch the blanket Exception.
### Modifications
Every C `PulsarException` is now thrown in Python with the same type.
---
pulsar-client-cpp/python/CMakeLists.txt | 3 +-
pulsar-client-cpp/python/pulsar/__init__.py | 2 +
pulsar-client-cpp/python/pulsar/exceptions.py | 28 +++++
pulsar-client-cpp/python/pulsar_test.py | 171 +++++++++-----------------
pulsar-client-cpp/python/src/enums.cc | 13 ++
pulsar-client-cpp/python/src/exceptions.cc | 92 ++++++++++++++
pulsar-client-cpp/python/src/pulsar.cc | 6 +-
7 files changed, 201 insertions(+), 114 deletions(-)
diff --git a/pulsar-client-cpp/python/CMakeLists.txt b/pulsar-client-cpp/python/CMakeLists.txt
index e78d80a..c110d01 100644
--- a/pulsar-client-cpp/python/CMakeLists.txt
+++ b/pulsar-client-cpp/python/CMakeLists.txt
@@ -29,7 +29,8 @@ ADD_LIBRARY(_pulsar SHARED src/pulsar.cc
src/authentication.cc
src/reader.cc
src/schema.cc
- src/cryptoKeyReader.cc)
+ src/cryptoKeyReader.cc
+ src/exceptions.cc)
SET(CMAKE_SHARED_LIBRARY_PREFIX )
SET(CMAKE_SHARED_LIBRARY_SUFFIX .so)
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index b47c87d..3f00554 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -103,6 +103,8 @@ import _pulsar
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType # noqa: F401
+from pulsar.exceptions import *
+
from pulsar.functions.function import Function
from pulsar.functions.context import Context
from pulsar.functions.serde import SerDe, IdentitySerDe, PickleSerDe
diff --git a/pulsar-client-cpp/python/pulsar/exceptions.py b/pulsar-client-cpp/python/pulsar/exceptions.py
new file mode 100644
index 0000000..d151564
--- /dev/null
+++ b/pulsar-client-cpp/python/pulsar/exceptions.py
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from _pulsar import PulsarException, UnknownError, InvalidConfiguration, Timeout, LookupError, ConnectError, \
+ ReadError, AuthenticationError, AuthorizationError, ErrorGettingAuthenticationData, BrokerMetadataError, \
+ BrokerPersistenceError, ChecksumError, ConsumerBusy, NotConnected, AlreadyClosed, InvalidMessage, \
+ ConsumerNotInitialized, ProducerNotInitialized, ProducerBusy, TooManyLookupRequestException, InvalidTopicName, \
+ InvalidUrl, ServiceUnitNotReady, OperationNotSupported, ProducerBlockedQuotaExceededError, \
+ ProducerBlockedQuotaExceededException, ProducerQueueIsFull, MessageTooBig, TopicNotFound, SubscriptionNotFound, \
+ ConsumerNotFound, UnsupportedVersionError, TopicTerminated, CryptoError, IncompatibleSchema, ConsumerAssignError, \
+ CumulativeAcknowledgementNotAllowedError, TransactionCoordinatorNotFoundError, InvalidTxnStatusError, \
+ NotAllowedError, TransactionConflict, TransactionNotFound, ProducerFenced, MemoryBufferIsFull
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index e7d05f3..71b67d7 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -42,6 +42,7 @@ except ImportError:
TM = 10000 # Do not wait forever in tests
+
def doHttpPost(url, data):
req = Request(url, data.encode())
req.add_header('Content-Type', 'application/json')
@@ -67,6 +68,7 @@ def doHttpGet(url):
req.add_header('Accept', 'application/json')
return urlopen(req).read()
+
class PulsarTest(TestCase):
serviceUrl = 'pulsar://localhost:6650'
@@ -96,6 +98,16 @@ class PulsarTest(TestCase):
conf.consumer_name("my-name")
self.assertEqual(conf.consumer_name(), "my-name")
+ def test_connect_error(self):
+ with self.assertRaises(pulsar.ConnectError):
+ client = Client('fakeServiceUrl')
+ client.create_producer('connect-error-topic')
+ client.close()
+
+ def test_exception_inheritance(self):
+ assert issubclass(pulsar.ConnectError, pulsar.PulsarException)
+ assert issubclass(pulsar.PulsarException, Exception)
+
def test_simple_producer(self):
client = Client(self.serviceUrl)
producer = client.create_producer('my-python-topic')
@@ -147,11 +159,8 @@ class PulsarTest(TestCase):
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')
- try:
- msg = consumer.receive(100)
- self.assertTrue(False) # Should not reach this point
- except:
- pass # Exception is expected
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(100)
consumer.unsubscribe()
client.close()
@@ -189,11 +198,8 @@ class PulsarTest(TestCase):
producer.send(b'hello', deliver_at=int(round(time.time() * 1000)) + 1100)
# Message should not be available in the next second
- try:
- msg = consumer.receive(1000)
- self.assertTrue(False) # Should not reach this point
- except:
- pass # Exception is expected
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(1000)
# Message should be published now
msg = consumer.receive(TM)
@@ -213,11 +219,8 @@ class PulsarTest(TestCase):
producer.send(b'hello', deliver_after=timedelta(milliseconds=1100))
# Message should not be available in the next second
- try:
- msg = consumer.receive(1000)
- self.assertTrue(False) # Should not reach this point
- except:
- pass # Exception is expected
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(1000)
# Message should be published in the next 500ms
msg = consumer.receive(TM)
@@ -229,14 +232,14 @@ class PulsarTest(TestCase):
def test_consumer_initial_position(self):
client = Client(self.serviceUrl)
- producer = client.create_producer('my-python-topic-producer-consumer')
+ producer = client.create_producer('consumer-initial-position')
# Sending 5 messages before consumer creation.
# These should be received with initial_position set to Earliest but not with Latest.
for i in range(5):
producer.send(b'hello-%d' % i)
- consumer = client.subscribe('my-python-topic-producer-consumer',
+ consumer = client.subscribe('consumer-initial-position',
'my-sub',
consumer_type=ConsumerType.Shared,
initial_position=InitialPosition.Earliest)
@@ -250,11 +253,8 @@ class PulsarTest(TestCase):
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello-%d' % i)
- try:
- msg = consumer.receive(100)
- self.assertTrue(False) # Should not reach this point
- except:
- pass # Exception is expected
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(100)
consumer.unsubscribe()
client.close()
@@ -310,21 +310,18 @@ class PulsarTest(TestCase):
tls_allow_insecure_connection=False,
authentication=AuthenticationTLS(certs_dir + 'client-cert.pem', certs_dir + 'client-key.pem'))
- consumer = client.subscribe('my-python-topic-producer-consumer',
+ consumer = client.subscribe('my-python-topic-tls-auth',
'my-sub',
consumer_type=ConsumerType.Shared)
- producer = client.create_producer('my-python-topic-producer-consumer')
+ producer = client.create_producer('my-python-topic-tls-auth')
producer.send(b'hello')
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')
- try:
- msg = consumer.receive(100)
- self.assertTrue(False) # Should not reach this point
- except:
- pass # Exception is expected
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(100)
client.close()
@@ -340,21 +337,18 @@ class PulsarTest(TestCase):
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams))
- consumer = client.subscribe('my-python-topic-producer-consumer',
+ consumer = client.subscribe('my-python-topic-tls-auth-2',
'my-sub',
consumer_type=ConsumerType.Shared)
- producer = client.create_producer('my-python-topic-producer-consumer')
+ producer = client.create_producer('my-python-topic-tls-auth-2')
producer.send(b'hello')
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')
- try:
- msg = consumer.receive(100)
- self.assertTrue(False) # Should not reach this point
- except:
- pass # Exception is expected
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(100)
client.close()
@@ -389,21 +383,18 @@ class PulsarTest(TestCase):
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams))
- consumer = client.subscribe('my-python-topic-producer-consumer',
+ consumer = client.subscribe('my-python-topic-tls-auth-3',
'my-sub',
consumer_type=ConsumerType.Shared)
- producer = client.create_producer('my-python-topic-producer-consumer')
+ producer = client.create_producer('my-python-topic-tls-auth-3')
producer.send(b'hello')
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')
- try:
- msg = consumer.receive(100)
- self.assertTrue(False) # Should not reach this point
- except:
- pass # Exception is expected
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(100)
client.close()
@@ -417,12 +408,11 @@ class PulsarTest(TestCase):
tls_trust_certs_file_path=certs_dir + 'cacert.pem',
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams))
- try:
- client.subscribe('my-python-topic-producer-consumer',
+
+ with self.assertRaises(pulsar.ConnectError):
+ client.subscribe('my-python-topic-auth-junk-params',
'my-sub',
consumer_type=ConsumerType.Shared)
- except:
- pass # Exception is expected
def test_message_listener(self):
client = Client(self.serviceUrl)
@@ -462,11 +452,8 @@ class PulsarTest(TestCase):
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')
- try:
- msg = reader.read_next(100)
- self.assertTrue(False) # Should not reach this point
- except:
- pass # Exception is expected
+ with self.assertRaises(pulsar.Timeout):
+ reader.read_next(100)
reader.close()
client.close()
@@ -618,13 +605,8 @@ class PulsarTest(TestCase):
self.assertEqual(msg.data(), b'hello-%d' % i)
consumer.acknowledge(msg)
- try:
- # No other messages should be received
- consumer.receive(timeout_millis=1000)
- self.assertTrue(False)
- except:
- # Exception is expected
- pass
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(100)
producer.close()
@@ -636,13 +618,8 @@ class PulsarTest(TestCase):
producer.send(b'hello-2', sequence_id=2)
self.assertEqual(producer.last_sequence_id(), 2)
- try:
- # No other messages should be received
- consumer.receive(timeout_millis=1000)
- self.assertTrue(False)
- except:
- # Exception is expected
- pass
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(100)
doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication',
'false')
@@ -873,11 +850,8 @@ class PulsarTest(TestCase):
# repeat with reader
reader = client.create_reader('my-python-topic-seek', MessageId.latest)
- try:
- msg = reader.read_next(100)
- self.assertTrue(False) # Should not reach this point
- except:
- pass # Exception is expected
+ with self.assertRaises(pulsar.Timeout):
+ reader.read_next(100)
# earliest
reader.seek(MessageId.earliest)
@@ -925,11 +899,8 @@ class PulsarTest(TestCase):
self.assertEqual(msg.data(), b'hello')
consumer.acknowledge(msg)
- try:
- msg = consumer.receive(100)
- self.assertTrue(False) # Should not reach this point
- except:
- pass # Exception is expected
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(100)
client.close()
@@ -972,13 +943,8 @@ class PulsarTest(TestCase):
msg = consumer.receive(TM)
consumer.acknowledge(msg)
- try:
- # No other messages should be received
- consumer.receive(timeout_millis=500)
- self.assertTrue(False)
- except:
- # Exception is expected
- pass
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(100)
client.close()
def test_topics_pattern_consumer(self):
@@ -1027,13 +993,8 @@ class PulsarTest(TestCase):
msg = consumer.receive(TM)
consumer.acknowledge(msg)
- try:
- # No other messages should be received
- consumer.receive(timeout_millis=500)
- self.assertTrue(False)
- except:
- # Exception is expected
- pass
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(100)
client.close()
def test_message_id(self):
@@ -1110,11 +1071,8 @@ class PulsarTest(TestCase):
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')
- try:
- msg = consumer.receive(100)
- self.assertTrue(False) # Should not reach this point
- except:
- pass # Exception is expected
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(100)
consumer.unsubscribe()
client.close()
@@ -1158,7 +1116,8 @@ class PulsarTest(TestCase):
client = Client(self.serviceUrl)
consumer = client.subscribe('test_negative_acks',
'test',
- schema=pulsar.schema.StringSchema())
+ schema=pulsar.schema.StringSchema(),
+ negative_ack_redelivery_delay_ms=1000)
producer = client.create_producer('test_negative_acks',
schema=pulsar.schema.StringSchema())
for i in range(10):
@@ -1176,29 +1135,17 @@ class PulsarTest(TestCase):
self.assertEqual(msg.value(), "hello-%d" % i)
consumer.acknowledge(msg)
- try:
- # No more messages expected
- msg = consumer.receive(100)
- self.assertTrue(False)
- except:
- pass # Exception is expected
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(100)
client.close()
def _check_value_error(self, fun):
- try:
+ with self.assertRaises(ValueError):
fun()
- # Should throw exception
- self.assertTrue(False)
- except ValueError:
- pass # Expected
def _check_type_error(self, fun):
- try:
+ with self.assertRaises(TypeError):
fun()
- # Should throw exception
- self.assertTrue(False)
- except TypeError:
- pass # Expected
if __name__ == '__main__':
diff --git a/pulsar-client-cpp/python/src/enums.cc b/pulsar-client-cpp/python/src/enums.cc
index f57e08d..c23b211 100644
--- a/pulsar-client-cpp/python/src/enums.cc
+++ b/pulsar-client-cpp/python/src/enums.cc
@@ -63,6 +63,7 @@ void export_enums() {
.value("InvalidMessage", ResultInvalidMessage)
.value("ConsumerNotInitialized", ResultConsumerNotInitialized)
.value("ProducerNotInitialized", ResultProducerNotInitialized)
+ .value("ProducerBusy", ResultProducerBusy)
.value("TooManyLookupRequestException", ResultTooManyLookupRequestException)
.value("InvalidTopicName", ResultInvalidTopicName)
.value("InvalidUrl", ResultInvalidUrl)
@@ -76,6 +77,18 @@ void export_enums() {
.value("SubscriptionNotFound", ResultSubscriptionNotFound)
.value("ConsumerNotFound", ResultConsumerNotFound)
.value("UnsupportedVersionError", ResultUnsupportedVersionError)
+ .value("TopicTerminated", ResultTopicTerminated)
+ .value("CryptoError", ResultCryptoError)
+ .value("IncompatibleSchema", ResultIncompatibleSchema)
+ .value("ConsumerAssignError", ResultConsumerAssignError)
+ .value("CumulativeAcknowledgementNotAllowedError", ResultCumulativeAcknowledgementNotAllowedError)
+ .value("TransactionCoordinatorNotFoundError", ResultTransactionCoordinatorNotFoundError)
+ .value("InvalidTxnStatusError", ResultInvalidTxnStatusError)
+ .value("NotAllowedError", ResultNotAllowedError)
+ .value("TransactionConflict", ResultTransactionConflict)
+ .value("TransactionNotFound", ResultTransactionNotFound)
+ .value("ProducerFenced", ResultProducerFenced)
+ .value("MemoryBufferIsFull", ResultMemoryBufferIsFull)
;
enum_<SchemaType>("SchemaType", "Supported schema types")
diff --git a/pulsar-client-cpp/python/src/exceptions.cc b/pulsar-client-cpp/python/src/exceptions.cc
new file mode 100644
index 0000000..c39b52d
--- /dev/null
+++ b/pulsar-client-cpp/python/src/exceptions.cc
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <map>
+
+#include "utils.h"
+
+static PyObject* basePulsarException = nullptr;
+std::map<Result, PyObject*> exceptions;
+
+PyObject* createExceptionClass(const char* name, PyObject* baseTypeObj = PyExc_Exception) {
+ using namespace boost::python;
+
+ std::string fullName = "_pulsar.";
+ fullName += name;
+
+ PyObject* typeObj = PyErr_NewException(const_cast<char*>(fullName.c_str()),
+ baseTypeObj, nullptr);
+ if (!typeObj) throw_error_already_set();
+ scope().attr(name) = handle<>(borrowed(typeObj));
+ return typeObj;
+}
+
+PyObject* get_exception_class(Result result) {
+ return exceptions[result];
+}
+
+void export_exceptions() {
+ using namespace boost::python;
+
+ basePulsarException = createExceptionClass("PulsarException");
+
+ exceptions[ResultUnknownError] = createExceptionClass("UnknownError", basePulsarException);
+ exceptions[ResultInvalidConfiguration] = createExceptionClass("InvalidConfiguration", basePulsarException);
+ exceptions[ResultTimeout] = createExceptionClass("Timeout", basePulsarException);
+ exceptions[ResultLookupError] = createExceptionClass("LookupError", basePulsarException);
+ exceptions[ResultConnectError] = createExceptionClass("ConnectError", basePulsarException);
+ exceptions[ResultReadError] = createExceptionClass("ReadError", basePulsarException);
+ exceptions[ResultAuthenticationError] = createExceptionClass("AuthenticationError", basePulsarException);
+ exceptions[ResultAuthorizationError] = createExceptionClass("AuthorizationError", basePulsarException);
+ exceptions[ResultErrorGettingAuthenticationData] = createExceptionClass("ErrorGettingAuthenticationData", basePulsarException);
+ exceptions[ResultBrokerMetadataError] = createExceptionClass("BrokerMetadataError", basePulsarException);
+ exceptions[ResultBrokerPersistenceError] = createExceptionClass("BrokerPersistenceError", basePulsarException);
+ exceptions[ResultChecksumError] = createExceptionClass("ChecksumError", basePulsarException);
+ exceptions[ResultConsumerBusy] = createExceptionClass("ConsumerBusy", basePulsarException);
+ exceptions[ResultNotConnected] = createExceptionClass("NotConnected", basePulsarException);
+ exceptions[ResultAlreadyClosed] = createExceptionClass("AlreadyClosed", basePulsarException);
+ exceptions[ResultInvalidMessage] = createExceptionClass("InvalidMessage", basePulsarException);
+ exceptions[ResultConsumerNotInitialized] = createExceptionClass("ConsumerNotInitialized", basePulsarException);
+ exceptions[ResultProducerNotInitialized] = createExceptionClass("ProducerNotInitialized", basePulsarException);
+ exceptions[ResultProducerBusy] = createExceptionClass("ProducerBusy", basePulsarException);
+ exceptions[ResultTooManyLookupRequestException] = createExceptionClass("TooManyLookupRequestException", basePulsarException);
+ exceptions[ResultInvalidTopicName] = createExceptionClass("InvalidTopicName", basePulsarException);
+ exceptions[ResultInvalidUrl] = createExceptionClass("InvalidUrl", basePulsarException);
+ exceptions[ResultServiceUnitNotReady] = createExceptionClass("ServiceUnitNotReady", basePulsarException);
+ exceptions[ResultOperationNotSupported] = createExceptionClass("OperationNotSupported", basePulsarException);
+ exceptions[ResultProducerBlockedQuotaExceededError] = createExceptionClass("ProducerBlockedQuotaExceededError", basePulsarException);
+ exceptions[ResultProducerBlockedQuotaExceededException] = createExceptionClass("ProducerBlockedQuotaExceededException", basePulsarException);
+ exceptions[ResultProducerQueueIsFull] = createExceptionClass("ProducerQueueIsFull", basePulsarException);
+ exceptions[ResultMessageTooBig] = createExceptionClass("MessageTooBig", basePulsarException);
+ exceptions[ResultTopicNotFound] = createExceptionClass("TopicNotFound", basePulsarException);
+ exceptions[ResultSubscriptionNotFound] = createExceptionClass("SubscriptionNotFound", basePulsarException);
+ exceptions[ResultConsumerNotFound] = createExceptionClass("ConsumerNotFound", basePulsarException);
+ exceptions[ResultUnsupportedVersionError] = createExceptionClass("UnsupportedVersionError", basePulsarException);
+ exceptions[ResultTopicTerminated] = createExceptionClass("TopicTerminated", basePulsarException);
+ exceptions[ResultCryptoError] = createExceptionClass("CryptoError", basePulsarException);
+ exceptions[ResultIncompatibleSchema] = createExceptionClass("IncompatibleSchema", basePulsarException);
+ exceptions[ResultConsumerAssignError] = createExceptionClass("ConsumerAssignError", basePulsarException);
+ exceptions[ResultCumulativeAcknowledgementNotAllowedError] = createExceptionClass("CumulativeAcknowledgementNotAllowedError", basePulsarException);
+ exceptions[ResultTransactionCoordinatorNotFoundError] = createExceptionClass("TransactionCoordinatorNotFoundError", basePulsarException);
+ exceptions[ResultInvalidTxnStatusError] = createExceptionClass("InvalidTxnStatusError", basePulsarException);
+ exceptions[ResultNotAllowedError] = createExceptionClass("NotAllowedError", basePulsarException);
+ exceptions[ResultTransactionConflict] = createExceptionClass("TransactionConflict", basePulsarException);
+ exceptions[ResultTransactionNotFound] = createExceptionClass("TransactionNotFound", basePulsarException);
+ exceptions[ResultProducerFenced] = createExceptionClass("ProducerFenced", basePulsarException);
+ exceptions[ResultMemoryBufferIsFull] = createExceptionClass("MemoryBufferIsFull", basePulsarException);
+}
diff --git a/pulsar-client-cpp/python/src/pulsar.cc b/pulsar-client-cpp/python/src/pulsar.cc
index f80c9a4..a46ce53 100644
--- a/pulsar-client-cpp/python/src/pulsar.cc
+++ b/pulsar-client-cpp/python/src/pulsar.cc
@@ -28,13 +28,16 @@ void export_enums();
void export_authentication();
void export_schema();
void export_cryptoKeyReader();
+void export_exceptions();
+
+PyObject* get_exception_class(Result result);
static void translateException(const PulsarException& ex) {
std::string err = "Pulsar error: ";
err += strResult(ex._result);
- PyErr_SetString(PyExc_Exception, err.c_str());
+ PyErr_SetString(get_exception_class(ex._result), err.c_str());
}
BOOST_PYTHON_MODULE(_pulsar)
@@ -55,4 +58,5 @@ BOOST_PYTHON_MODULE(_pulsar)
export_authentication();
export_schema();
export_cryptoKeyReader();
+ export_exceptions();
}