You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/03/26 10:35:25 UTC
[rocketmq-client-cpp] branch re_dev updated: fix: auto delete
callback
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/re_dev by this push:
new 1fd6742 fix: auto delete callback
1fd6742 is described below
commit 1fd67428a30842188f157e507b1c90aaa3bfa8da
Author: James Yin <yw...@hotmail.com>
AuthorDate: Fri Mar 26 18:29:30 2021 +0800
fix: auto delete callback
---
include/PullCallback.h | 23 +++++------------------
include/RequestCallback.h | 23 +++++------------------
include/SendCallback.h | 23 +++++------------------
src/common/PullCallbackWrap.cpp | 24 ++++++++++++++++++++++++
src/common/SendCallbackWrap.cpp | 24 ++++++++++++++++++++++++
src/producer/RequestResponseFuture.cpp | 24 ++++++++++++++++++++++++
6 files changed, 87 insertions(+), 54 deletions(-)
diff --git a/include/PullCallback.h b/include/PullCallback.h
index 08deaca..841b598 100755
--- a/include/PullCallback.h
+++ b/include/PullCallback.h
@@ -22,7 +22,7 @@
namespace rocketmq {
-enum PullCallbackType { PULL_CALLBACK_TYPE_SIMPLE = 0, PULL_CALLBACK_TYPE_AUTO_DELETE = 1 };
+enum class PullCallbackType { kSimple, kAutoDelete };
/**
* PullCallback - callback interface for async pull
@@ -34,24 +34,11 @@ class ROCKETMQCLIENT_API PullCallback {
virtual void onSuccess(std::unique_ptr<PullResult> pull_result) = 0;
virtual void onException(MQException& e) noexcept = 0;
- virtual PullCallbackType getPullCallbackType() const { return PULL_CALLBACK_TYPE_SIMPLE; }
+ virtual PullCallbackType getPullCallbackType() const { return PullCallbackType::kSimple; }
public:
- inline void invokeOnSuccess(std::unique_ptr<PullResult> pull_result) {
- auto type = getPullCallbackType();
- onSuccess(std::move(pull_result));
- if (type == PULL_CALLBACK_TYPE_AUTO_DELETE && getPullCallbackType() == PULL_CALLBACK_TYPE_AUTO_DELETE) {
- delete this;
- }
- }
-
- inline void invokeOnException(MQException& exception) noexcept {
- auto type = getPullCallbackType();
- onException(exception);
- if (type == PULL_CALLBACK_TYPE_AUTO_DELETE && getPullCallbackType() == PULL_CALLBACK_TYPE_AUTO_DELETE) {
- delete this;
- }
- }
+ void invokeOnSuccess(std::unique_ptr<PullResult> pull_result) noexcept;
+ void invokeOnException(MQException& exception) noexcept;
};
/**
@@ -61,7 +48,7 @@ class ROCKETMQCLIENT_API PullCallback {
*/
class ROCKETMQCLIENT_API AutoDeletePullCallback : public PullCallback {
public:
- PullCallbackType getPullCallbackType() const override final { return PULL_CALLBACK_TYPE_AUTO_DELETE; }
+ PullCallbackType getPullCallbackType() const override final { return PullCallbackType::kAutoDelete; }
};
} // namespace rocketmq
diff --git a/include/RequestCallback.h b/include/RequestCallback.h
index c666ccc..493e681 100644
--- a/include/RequestCallback.h
+++ b/include/RequestCallback.h
@@ -22,7 +22,7 @@
namespace rocketmq {
-enum RequestCallbackType { REQUEST_CALLBACK_TYPE_SIMPLE = 0, REQUEST_CALLBACK_TYPE_AUTO_DELETE = 1 };
+enum class RequestCallbackType { kSimple, kAutoDelete };
/**
* RequestCallback - callback interface for async request
@@ -34,24 +34,11 @@ class ROCKETMQCLIENT_API RequestCallback {
virtual void onSuccess(MQMessage message) = 0;
virtual void onException(MQException& e) noexcept = 0;
- virtual RequestCallbackType getRequestCallbackType() const { return REQUEST_CALLBACK_TYPE_SIMPLE; }
+ virtual RequestCallbackType getRequestCallbackType() const { return RequestCallbackType::kSimple; }
public:
- inline void invokeOnSuccess(MQMessage message) {
- auto type = getRequestCallbackType();
- onSuccess(std::move(message));
- if (type == REQUEST_CALLBACK_TYPE_AUTO_DELETE && getRequestCallbackType() == REQUEST_CALLBACK_TYPE_AUTO_DELETE) {
- delete this;
- }
- }
-
- inline void invokeOnException(MQException& exception) noexcept {
- auto type = getRequestCallbackType();
- onException(exception);
- if (type == REQUEST_CALLBACK_TYPE_AUTO_DELETE && getRequestCallbackType() == REQUEST_CALLBACK_TYPE_AUTO_DELETE) {
- delete this;
- }
- }
+ void invokeOnSuccess(MQMessage message) noexcept;
+ void invokeOnException(MQException& exception) noexcept;
};
/**
@@ -61,7 +48,7 @@ class ROCKETMQCLIENT_API RequestCallback {
*/
class ROCKETMQCLIENT_API AutoDeleteRequestCallback : public RequestCallback {
public:
- RequestCallbackType getRequestCallbackType() const override final { return REQUEST_CALLBACK_TYPE_AUTO_DELETE; }
+ RequestCallbackType getRequestCallbackType() const override final { return RequestCallbackType::kAutoDelete; }
};
} // namespace rocketmq
diff --git a/include/SendCallback.h b/include/SendCallback.h
index 9191c25..a301d94 100755
--- a/include/SendCallback.h
+++ b/include/SendCallback.h
@@ -22,7 +22,7 @@
namespace rocketmq {
-enum SendCallbackType { SEND_CALLBACK_TYPE_SIMPLE = 0, SEND_CALLBACK_TYPE_AUTO_DELETE = 1 };
+enum class SendCallbackType { kSimple, kAutoDelete };
/**
* SendCallback - callback interface for async send
@@ -34,24 +34,11 @@ class ROCKETMQCLIENT_API SendCallback {
virtual void onSuccess(SendResult& sendResult) = 0;
virtual void onException(MQException& e) noexcept = 0;
- virtual SendCallbackType getSendCallbackType() const { return SEND_CALLBACK_TYPE_SIMPLE; }
+ virtual SendCallbackType getSendCallbackType() const { return SendCallbackType::kSimple; }
public:
- inline void invokeOnSuccess(SendResult& send_result) {
- auto type = getSendCallbackType();
- onSuccess(send_result);
- if (type == SEND_CALLBACK_TYPE_AUTO_DELETE && getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
- delete this;
- }
- }
-
- inline void invokeOnException(MQException& exception) noexcept {
- auto type = getSendCallbackType();
- onException(exception);
- if (type == SEND_CALLBACK_TYPE_AUTO_DELETE && getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
- delete this;
- }
- }
+ void invokeOnSuccess(SendResult& send_result) noexcept;
+ void invokeOnException(MQException& exception) noexcept;
};
/**
@@ -62,7 +49,7 @@ class ROCKETMQCLIENT_API SendCallback {
class ROCKETMQCLIENT_API AutoDeleteSendCallback : public SendCallback // base interface
{
public:
- SendCallbackType getSendCallbackType() const override final { return SEND_CALLBACK_TYPE_AUTO_DELETE; }
+ SendCallbackType getSendCallbackType() const override final { return SendCallbackType::kAutoDelete; }
};
} // namespace rocketmq
diff --git a/src/common/PullCallbackWrap.cpp b/src/common/PullCallbackWrap.cpp
index bc915a4..0dc9ef2 100644
--- a/src/common/PullCallbackWrap.cpp
+++ b/src/common/PullCallbackWrap.cpp
@@ -18,6 +18,30 @@
namespace rocketmq {
+inline void PullCallback::invokeOnSuccess(std::unique_ptr<PullResult> pull_result) noexcept {
+ auto type = getPullCallbackType();
+ try {
+ onSuccess(std::move(pull_result));
+ } catch (const std::exception& e) {
+ LOG_WARN_NEW("encounter exception when invoke PullCallback::onSuccess(), {}", e.what());
+ }
+ if (type == PullCallbackType::kAutoDelete) {
+ delete this;
+ }
+}
+
+inline void PullCallback::invokeOnException(MQException& exception) noexcept {
+ auto type = getPullCallbackType();
+ try {
+ onException(exception);
+ } catch (const std::exception& e) {
+ LOG_WARN_NEW("encounter exception when invoke PullCallback::onException(), {}", e.what());
+ }
+ if (type == PullCallbackType::kAutoDelete) {
+ delete this;
+ }
+}
+
PullCallbackWrap::PullCallbackWrap(PullCallback* pullCallback, MQClientAPIImpl* pClientAPI)
: pull_callback_(pullCallback), client_api_impl_(pClientAPI) {}
diff --git a/src/common/SendCallbackWrap.cpp b/src/common/SendCallbackWrap.cpp
index 562b30f..2732cc9 100644
--- a/src/common/SendCallbackWrap.cpp
+++ b/src/common/SendCallbackWrap.cpp
@@ -32,6 +32,30 @@
namespace rocketmq {
+inline void SendCallback::invokeOnSuccess(SendResult& send_result) noexcept {
+ auto type = getSendCallbackType();
+ try {
+ onSuccess(send_result);
+ } catch (const std::exception& e) {
+ LOG_WARN_NEW("encounter exception when invoke SendCallback::onSuccess(), {}", e.what());
+ }
+ if (type == SendCallbackType::kAutoDelete) {
+ delete this;
+ }
+}
+
+inline void SendCallback::invokeOnException(MQException& exception) noexcept {
+ auto type = getSendCallbackType();
+ try {
+ onException(exception);
+ } catch (const std::exception& e) {
+ LOG_WARN_NEW("encounter exception when invoke SendCallback::onException(), {}", e.what());
+ }
+ if (type == SendCallbackType::kAutoDelete) {
+ delete this;
+ }
+}
+
SendCallbackWrap::SendCallbackWrap(const std::string& addr,
const std::string& brokerName,
const MessagePtr msg,
diff --git a/src/producer/RequestResponseFuture.cpp b/src/producer/RequestResponseFuture.cpp
index 615d27b..a88cc2a 100644
--- a/src/producer/RequestResponseFuture.cpp
+++ b/src/producer/RequestResponseFuture.cpp
@@ -21,6 +21,30 @@
namespace rocketmq {
+inline void RequestCallback::invokeOnSuccess(MQMessage message) noexcept {
+ auto type = getRequestCallbackType();
+ try {
+ onSuccess(std::move(message));
+ } catch (const std::exception& e) {
+ LOG_WARN_NEW("encounter exception when invoke RequestCallback::onSuccess(), {}", e.what());
+ }
+ if (type == RequestCallbackType::kAutoDelete) {
+ delete this;
+ }
+}
+
+inline void RequestCallback::invokeOnException(MQException& exception) noexcept {
+ auto type = getRequestCallbackType();
+ try {
+ onException(exception);
+ } catch (const std::exception& e) {
+ LOG_WARN_NEW("encounter exception when invoke RequestCallback::onException(), {}", e.what());
+ }
+ if (type == RequestCallbackType::kAutoDelete) {
+ delete this;
+ }
+}
+
RequestResponseFuture::RequestResponseFuture(const std::string& correlationId,
long timeoutMillis,
RequestCallback* requestCallback)