You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/03/02 20:56:36 UTC

[rocketmq-client-cpp] branch re_dev updated: refactor: auto delete callback after invokes it

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/re_dev by this push:
     new 5d66df7  refactor: auto delete callback after invokes it
5d66df7 is described below

commit 5d66df7d840e5675f3e4151e554adbe924320e78
Author: James Yin <yw...@hotmail.com>
AuthorDate: Wed Mar 3 04:55:15 2021 +0800

    refactor: auto delete callback after invokes it
---
 include/PullCallback.h                 | 17 +++++++++++++++++
 include/RequestCallback.h              | 17 +++++++++++++++++
 include/SendCallback.h                 | 17 +++++++++++++++++
 src/common/PullCallbackWrap.cpp        | 11 +++--------
 src/common/SendCallbackWrap.cpp        | 30 +++++-------------------------
 src/producer/DefaultMQProducerImpl.cpp | 15 +++------------
 src/producer/RequestResponseFuture.cpp |  9 ++-------
 7 files changed, 64 insertions(+), 52 deletions(-)

diff --git a/include/PullCallback.h b/include/PullCallback.h
index 7eb63ef..08deaca 100755
--- a/include/PullCallback.h
+++ b/include/PullCallback.h
@@ -35,6 +35,23 @@ class ROCKETMQCLIENT_API PullCallback {
   virtual void onException(MQException& e) noexcept = 0;
 
   virtual PullCallbackType getPullCallbackType() const { return PULL_CALLBACK_TYPE_SIMPLE; }
+
+ public:
+  inline void invokeOnSuccess(std::unique_ptr<PullResult> pull_result) {
+    auto type = getPullCallbackType();
+    onSuccess(std::move(pull_result));
+    if (type == PULL_CALLBACK_TYPE_AUTO_DELETE && getPullCallbackType() == PULL_CALLBACK_TYPE_AUTO_DELETE) {
+      delete this;
+    }
+  }
+
+  inline void invokeOnException(MQException& exception) noexcept {
+    auto type = getPullCallbackType();
+    onException(exception);
+    if (type == PULL_CALLBACK_TYPE_AUTO_DELETE && getPullCallbackType() == PULL_CALLBACK_TYPE_AUTO_DELETE) {
+      delete this;
+    }
+  }
 };
 
 /**
diff --git a/include/RequestCallback.h b/include/RequestCallback.h
index a6dfec4..c666ccc 100644
--- a/include/RequestCallback.h
+++ b/include/RequestCallback.h
@@ -35,6 +35,23 @@ class ROCKETMQCLIENT_API RequestCallback {
   virtual void onException(MQException& e) noexcept = 0;
 
   virtual RequestCallbackType getRequestCallbackType() const { return REQUEST_CALLBACK_TYPE_SIMPLE; }
+
+ public:
+  inline void invokeOnSuccess(MQMessage message) {
+    auto type = getRequestCallbackType();
+    onSuccess(std::move(message));
+    if (type == REQUEST_CALLBACK_TYPE_AUTO_DELETE && getRequestCallbackType() == REQUEST_CALLBACK_TYPE_AUTO_DELETE) {
+      delete this;
+    }
+  }
+
+  inline void invokeOnException(MQException& exception) noexcept {
+    auto type = getRequestCallbackType();
+    onException(exception);
+    if (type == REQUEST_CALLBACK_TYPE_AUTO_DELETE && getRequestCallbackType() == REQUEST_CALLBACK_TYPE_AUTO_DELETE) {
+      delete this;
+    }
+  }
 };
 
 /**
diff --git a/include/SendCallback.h b/include/SendCallback.h
index 4801891..9191c25 100755
--- a/include/SendCallback.h
+++ b/include/SendCallback.h
@@ -35,6 +35,23 @@ class ROCKETMQCLIENT_API SendCallback {
   virtual void onException(MQException& e) noexcept = 0;
 
   virtual SendCallbackType getSendCallbackType() const { return SEND_CALLBACK_TYPE_SIMPLE; }
+
+ public:
+  inline void invokeOnSuccess(SendResult& send_result) {
+    auto type = getSendCallbackType();
+    onSuccess(send_result);
+    if (type == SEND_CALLBACK_TYPE_AUTO_DELETE && getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
+      delete this;
+    }
+  }
+
+  inline void invokeOnException(MQException& exception) noexcept {
+    auto type = getSendCallbackType();
+    onException(exception);
+    if (type == SEND_CALLBACK_TYPE_AUTO_DELETE && getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
+      delete this;
+    }
+  }
 };
 
 /**
diff --git a/src/common/PullCallbackWrap.cpp b/src/common/PullCallbackWrap.cpp
index b23970b..bc915a4 100644
--- a/src/common/PullCallbackWrap.cpp
+++ b/src/common/PullCallbackWrap.cpp
@@ -33,9 +33,9 @@ void PullCallbackWrap::operationComplete(ResponseFuture* responseFuture) noexcep
     try {
       std::unique_ptr<PullResult> pull_result(client_api_impl_->processPullResponse(response.get()));
       assert(pull_result != nullptr);
-      pull_callback_->onSuccess(std::move(pull_result));
+      pull_callback_->invokeOnSuccess(std::move(pull_result));
     } catch (MQException& e) {
-      pull_callback_->onException(e);
+      pull_callback_->invokeOnException(e);
     }
   } else {
     std::string err;
@@ -47,12 +47,7 @@ void PullCallbackWrap::operationComplete(ResponseFuture* responseFuture) noexcep
       err = "unknown reason";
     }
     MQException exception(err, -1, __FILE__, __LINE__);
-    pull_callback_->onException(exception);
-  }
-
-  // auto delete callback
-  if (pull_callback_->getPullCallbackType() == PULL_CALLBACK_TYPE_AUTO_DELETE) {
-    deleteAndZero(pull_callback_);
+    pull_callback_->invokeOnException(exception);
   }
 }
 
diff --git a/src/common/SendCallbackWrap.cpp b/src/common/SendCallbackWrap.cpp
index 8763be1..92fd6e1 100644
--- a/src/common/SendCallbackWrap.cpp
+++ b/src/common/SendCallbackWrap.cpp
@@ -22,9 +22,9 @@
 #include "Logging.h"
 #include "MQClientAPIImpl.h"
 #include "MQClientInstance.h"
-#include "MessageDecoder.h"
 #include "MQMessageQueue.h"
 #include "MQProtos.h"
+#include "MessageDecoder.h"
 #include "PullAPIWrapper.h"
 #include "PullResultExt.hpp"
 #include "TopicPublishInfo.hpp"
@@ -57,12 +57,7 @@ void SendCallbackWrap::operationComplete(ResponseFuture* responseFuture) noexcep
   auto producer = producer_.lock();
   if (nullptr == producer) {
     MQException exception("DefaultMQProducer is released.", -1, __FILE__, __LINE__);
-    send_callback_->onException(exception);
-
-    // auto delete callback
-    if (send_callback_->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
-      deleteAndZero(send_callback_);
-    }
+    send_callback_->invokeOnException(exception);
     return;
   }
 
@@ -99,16 +94,11 @@ void SendCallbackWrap::operationComplete(ResponseFuture* responseFuture) noexcep
       // }
 
       try {
-        send_callback_->onSuccess(*sendResult);
+        send_callback_->invokeOnSuccess(*sendResult);
       } catch (...) {
       }
 
       producer->updateFaultItem(broker_name_, UtilAll::currentTimeMillis() - responseFuture->begin_timestamp(), false);
-
-      // auto delete callback
-      if (send_callback_->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
-        deleteAndZero(send_callback_);
-      }
     } catch (MQException& e) {
       producer->updateFaultItem(broker_name_, UtilAll::currentTimeMillis() - responseFuture->begin_timestamp(), true);
       LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
@@ -136,12 +126,7 @@ void SendCallbackWrap::onExceptionImpl(ResponseFuture* responseFuture,
   auto producer = producer_.lock();
   if (nullptr == producer) {
     MQException exception("DefaultMQProducer is released.", -1, __FILE__, __LINE__);
-    send_callback_->onException(exception);
-
-    // auto delete callback
-    if (send_callback_->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
-      deleteAndZero(send_callback_);
-    }
+    send_callback_->invokeOnException(exception);
     return;
   }
 
@@ -180,12 +165,7 @@ void SendCallbackWrap::onExceptionImpl(ResponseFuture* responseFuture,
       return onExceptionImpl(responseFuture, responseFuture->leftTime(), e1, true);
     }
   } else {
-    send_callback_->onException(e);
-
-    // auto delete callback
-    if (send_callback_->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
-      deleteAndZero(send_callback_);
-    }
+    send_callback_->invokeOnException(e);
   }
 }
 
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index d3ab73c..20704f8 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -213,10 +213,7 @@ void DefaultMQProducerImpl::send(MQMessage& msg, SendCallback* sendCallback, lon
     (void)sendDefaultImpl(msg.getMessageImpl(), ASYNC, sendCallback, timeout);
   } catch (MQException& e) {
     LOG_ERROR_NEW("send failed, exception:{}", e.what());
-    sendCallback->onException(e);
-    if (sendCallback->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
-      deleteAndZero(sendCallback);
-    }
+    sendCallback->invokeOnException(e);
   } catch (std::exception& e) {
     LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
     exit(-1);
@@ -246,10 +243,7 @@ void DefaultMQProducerImpl::send(MQMessage& msg,
     }
   } catch (MQException& e) {
     LOG_ERROR_NEW("send failed, exception:{}", e.what());
-    sendCallback->onException(e);
-    if (sendCallback->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
-      deleteAndZero(sendCallback);
-    }
+    sendCallback->invokeOnException(e);
   } catch (std::exception& e) {
     LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
     exit(-1);
@@ -318,10 +312,7 @@ void DefaultMQProducerImpl::send(MQMessage& msg,
     }
   } catch (MQException& e) {
     LOG_ERROR_NEW("send failed, exception:{}", e.what());
-    sendCallback->onException(e);
-    if (sendCallback->getSendCallbackType() == SEND_CALLBACK_TYPE_AUTO_DELETE) {
-      deleteAndZero(sendCallback);
-    }
+    sendCallback->invokeOnException(e);
   } catch (std::exception& e) {
     LOG_FATAL_NEW("[BUG] encounter unexcepted exception: {}", e.what());
     exit(-1);
diff --git a/src/producer/RequestResponseFuture.cpp b/src/producer/RequestResponseFuture.cpp
index 5bdcd61..615d27b 100644
--- a/src/producer/RequestResponseFuture.cpp
+++ b/src/producer/RequestResponseFuture.cpp
@@ -42,7 +42,7 @@ void RequestResponseFuture::executeRequestCallback() noexcept {
   if (request_callback_ != nullptr) {
     if (send_request_ok_ && cause_ == nullptr) {
       try {
-        request_callback_->onSuccess(std::move(response_msg_));
+        request_callback_->invokeOnSuccess(std::move(response_msg_));
       } catch (const std::exception& e) {
         LOG_WARN_NEW("RequestCallback throw an exception: {}", e.what());
       }
@@ -50,16 +50,11 @@ void RequestResponseFuture::executeRequestCallback() noexcept {
       try {
         std::rethrow_exception(cause_);
       } catch (MQException& e) {
-        request_callback_->onException(e);
+        request_callback_->invokeOnException(e);
       } catch (const std::exception& e) {
         LOG_WARN_NEW("unexpected exception in RequestResponseFuture: {}", e.what());
       }
     }
-
-    // auto delete callback
-    if (request_callback_->getRequestCallbackType() == REQUEST_CALLBACK_TYPE_AUTO_DELETE) {
-      deleteAndZero(request_callback_);
-    }
   }
 }