You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/03/02 20:56:36 UTC
[rocketmq-client-cpp] branch re_dev updated: refactor: auto delete
callback after invokes it
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/re_dev by this push:
new 5d66df7 refactor: auto delete callback after invokes it
5d66df7 is described below
commit 5d66df7d840e5675f3e4151e554adbe924320e78
Author: James Yin <yw...@hotmail.com>
AuthorDate: Wed Mar 3 04:55:15 2021 +0800
refactor: auto delete callback after invokes it
---
include/PullCallback.h | 17 +++++++++++++++++
include/RequestCallback.h | 17 +++++++++++++++++
include/SendCallback.h | 17 +++++++++++++++++
src/common/PullCallbackWrap.cpp | 11 +++--------
src/common/SendCallbackWrap.cpp | 30 +++++-------------------------
src/producer/DefaultMQProducerImpl.cpp | 15 +++------------
src/producer/RequestResponseFuture.cpp | 9 ++-------
7 files changed, 64 insertions(+), 52 deletions(-)
diff --git a/include/PullCallback.h b/include/PullCallback.h
index 7eb63ef..08deaca 100755
--- a/include/PullCallback.h
+++ b/include/PullCallback.h
@@ -35,6 +35,23 @@ class ROCKETMQCLIENT_API PullCallback {
virtual void onException(MQException& e) noexcept = 0;
virtual PullCallbackType getPullCallbackType() const { return PULL_CALLBACK_TYPE_SIMPLE; }
+
+ public:
+ inline void invokeOnSuccess(std::unique_ptr<PullResult> pull_result) {
+ auto type = getPullCallbackType();
+ onSuccess(std::move(pull_result));
+ if (type == PULL_CALLBACK_TYPE_AUTO_DELETE && getPullCallbackType() == PULL_CALLBACK_TYPE_AUTO_DELETE) {
+ delete this;
+ }
+ }
+
+ inline void invokeOnException(MQException& exception) noexcept {
+ auto type = getPullCallbackType();
+ onException(exception);
+ if (type == PULL_CALLBACK_TYPE_AUTO_DELETE && getPullCallbackType() == PULL_CALLBACK_TYPE_AUTO_DELETE) {
+ delete this;
+ }
+ }
};
/**
diff --git a/include/RequestCallback.h b/include/RequestCallback.h
index a6dfec4..c666ccc 100644
--- a/include/RequestCallback.h
+++ b/include/RequestCallback.h
@@ -35,6 +35,23 @@ class ROCKETMQCLIENT_API RequestCallback {
virtual void onException(MQException& e) noexcept = 0;
virtual RequestCallbackType getRequestCallbackType() const { return REQUEST_CALLBACK_TYPE_SIMPLE; }
+
+ public:
+ inline void invokeOnSuccess(MQMessage message) {
+ auto type = getRequestCallbackType();
+ onSuccess(std::move(message));
+ if (type == REQUEST_CALLBACK_TYPE_AUTO_DELETE && getRequestCallbackType() == REQUEST_CALLBACK_TYPE_AUTO_DELETE) {
+ delete this;
+ }
+ }
+
+ inline void invokeOnException(MQException& exception) noexcept {
+ auto type = getRequestCallbackType();
+ onException(exception);
+ if (type == REQUEST_CALLBACK_TYPE_AUTO_DELETE && getRequestCallbackType() == REQUEST_CALLBACK_TYPE_AUTO_DELETE) {
+ delete this;
+ }
+ }
};
/**
diff --git a/include/SendCallback.h b/include/SendCallback.h
index 4801891..9191c25 100755
--- a/include/SendCallback.h
+++ b/include/SendCallback.h
@@ -35,6 +35,23 @@ class ROCKETMQCLIENT_API SendCallback {
virtual void onException(MQException& e) noexcept = 0;
virtual SendCallbackType getSendCallbackType() const { return SEND_CALLBACK_TYPE_SIMPLE; }
+
+ public:
+ inline void invokeOnSuccess(SendResult& send_result) {
+ auto type = getSendCallbackType();
+ onSuccess(send_result);
+ if (type == SEND_CALLBACK_TYPE_AUTO_DELETE && getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
+ delete this;
+ }
+ }
+
+ inline void invokeOnException(MQException& exception) noexcept {
+ auto type = getSendCallbackType();
+ onException(exception);
+ if (type == SEND_CALLBACK_TYPE_AUTO_DELETE && getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
+ delete this;
+ }
+ }
};
/**
diff --git a/src/common/PullCallbackWrap.cpp b/src/common/PullCallbackWrap.cpp
index b23970b..bc915a4 100644
--- a/src/common/PullCallbackWrap.cpp
+++ b/src/common/PullCallbackWrap.cpp
@@ -33,9 +33,9 @@ void PullCallbackWrap::operationComplete(ResponseFuture* responseFuture) noexcep
try {
std::unique_ptr<PullResult> pull_result(client_api_impl_->processPullResponse(response.get()));
assert(pull_result != nullptr);
- pull_callback_->onSuccess(std::move(pull_result));
+ pull_callback_->invokeOnSuccess(std::move(pull_result));
} catch (MQException& e) {
- pull_callback_->onException(e);
+ pull_callback_->invokeOnException(e);
}
} else {
std::string err;
@@ -47,12 +47,7 @@ void PullCallbackWrap::operationComplete(ResponseFuture* responseFuture) noexcep
err = "unknown reason";
}
MQException exception(err, -1, __FILE__, __LINE__);
- pull_callback_->onException(exception);
- }
-
- // auto delete callback
- if (pull_callback_->getPullCallbackType() == PULL_CALLBACK_TYPE_AUTO_DELETE) {
- deleteAndZero(pull_callback_);
+ pull_callback_->invokeOnException(exception);
}
}
diff --git a/src/common/SendCallbackWrap.cpp b/src/common/SendCallbackWrap.cpp
index 8763be1..92fd6e1 100644
--- a/src/common/SendCallbackWrap.cpp
+++ b/src/common/SendCallbackWrap.cpp
@@ -22,9 +22,9 @@
#include "Logging.h"
#include "MQClientAPIImpl.h"
#include "MQClientInstance.h"
-#include "MessageDecoder.h"
#include "MQMessageQueue.h"
#include "MQProtos.h"
+#include "MessageDecoder.h"
#include "PullAPIWrapper.h"
#include "PullResultExt.hpp"
#include "TopicPublishInfo.hpp"
@@ -57,12 +57,7 @@ void SendCallbackWrap::operationComplete(ResponseFuture* responseFuture) noexcep
auto producer = producer_.lock();
if (nullptr == producer) {
MQException exception("DefaultMQProducer is released.", -1, __FILE__, __LINE__);
- send_callback_->onException(exception);
-
- // auto delete callback
- if (send_callback_->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
- deleteAndZero(send_callback_);
- }
+ send_callback_->invokeOnException(exception);
return;
}
@@ -99,16 +94,11 @@ void SendCallbackWrap::operationComplete(ResponseFuture* responseFuture) noexcep
// }
try {
- send_callback_->onSuccess(*sendResult);
+ send_callback_->invokeOnSuccess(*sendResult);
} catch (...) {
}
producer->updateFaultItem(broker_name_, UtilAll::currentTimeMillis() - responseFuture->begin_timestamp(), false);
-
- // auto delete callback
- if (send_callback_->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
- deleteAndZero(send_callback_);
- }
} catch (MQException& e) {
producer->updateFaultItem(broker_name_, UtilAll::currentTimeMillis() - responseFuture->begin_timestamp(), true);
LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
@@ -136,12 +126,7 @@ void SendCallbackWrap::onExceptionImpl(ResponseFuture* responseFuture,
auto producer = producer_.lock();
if (nullptr == producer) {
MQException exception("DefaultMQProducer is released.", -1, __FILE__, __LINE__);
- send_callback_->onException(exception);
-
- // auto delete callback
- if (send_callback_->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
- deleteAndZero(send_callback_);
- }
+ send_callback_->invokeOnException(exception);
return;
}
@@ -180,12 +165,7 @@ void SendCallbackWrap::onExceptionImpl(ResponseFuture* responseFuture,
return onExceptionImpl(responseFuture, responseFuture->leftTime(), e1, true);
}
} else {
- send_callback_->onException(e);
-
- // auto delete callback
- if (send_callback_->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
- deleteAndZero(send_callback_);
- }
+ send_callback_->invokeOnException(e);
}
}
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index d3ab73c..20704f8 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -213,10 +213,7 @@ void DefaultMQProducerImpl::send(MQMessage& msg, SendCallback* sendCallback, lon
(void)sendDefaultImpl(msg.getMessageImpl(), ASYNC, sendCallback, timeout);
} catch (MQException& e) {
LOG_ERROR_NEW("send failed, exception:{}", e.what());
- sendCallback->onException(e);
- if (sendCallback->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
- deleteAndZero(sendCallback);
- }
+ sendCallback->invokeOnException(e);
} catch (std::exception& e) {
LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
exit(-1);
@@ -246,10 +243,7 @@ void DefaultMQProducerImpl::send(MQMessage& msg,
}
} catch (MQException& e) {
LOG_ERROR_NEW("send failed, exception:{}", e.what());
- sendCallback->onException(e);
- if (sendCallback->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
- deleteAndZero(sendCallback);
- }
+ sendCallback->invokeOnException(e);
} catch (std::exception& e) {
LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
exit(-1);
@@ -318,10 +312,7 @@ void DefaultMQProducerImpl::send(MQMessage& msg,
}
} catch (MQException& e) {
LOG_ERROR_NEW("send failed, exception:{}", e.what());
- sendCallback->onException(e);
- if (sendCallback->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
- deleteAndZero(sendCallback);
- }
+ sendCallback->invokeOnException(e);
} catch (std::exception& e) {
LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
exit(-1);
diff --git a/src/producer/RequestResponseFuture.cpp b/src/producer/RequestResponseFuture.cpp
index 5bdcd61..615d27b 100644
--- a/src/producer/RequestResponseFuture.cpp
+++ b/src/producer/RequestResponseFuture.cpp
@@ -42,7 +42,7 @@ void RequestResponseFuture::executeRequestCallback() noexcept {
if (request_callback_ != nullptr) {
if (send_request_ok_ && cause_ == nullptr) {
try {
- request_callback_->onSuccess(std::move(response_msg_));
+ request_callback_->invokeOnSuccess(std::move(response_msg_));
} catch (const std::exception& e) {
LOG_WARN_NEW("RequestCallback throw an exception: {}", e.what());
}
@@ -50,16 +50,11 @@ void RequestResponseFuture::executeRequestCallback() noexcept {
try {
std::rethrow_exception(cause_);
} catch (MQException& e) {
- request_callback_->onException(e);
+ request_callback_->invokeOnException(e);
} catch (const std::exception& e) {
LOG_WARN_NEW("unexpected exception in RequestResponseFuture: {}", e.what());
}
}
-
- // auto delete callback
- if (request_callback_->getRequestCallbackType() == REQUEST_CALLBACK_TYPE_AUTO_DELETE) {
- deleteAndZero(request_callback_);
- }
}
}