You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2020/02/24 07:32:14 UTC

[rocketmq-client-cpp] branch master updated: refactor(transaction): use userdata to cache the local checker callback (#252)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 915d827  refactor(transaction): use userdata to cache the local checker callback (#252)
915d827 is described below

commit 915d8279cc50a758a504d3002cc00a188b08c627
Author: dinglei <li...@163.com>
AuthorDate: Mon Feb 24 15:32:08 2020 +0800

    refactor(transaction): use userdata to cache the local checker callback (#252)
---
 src/extern/CProducer.cpp | 29 +++++++++++++++++------------
 1 file changed, 17 insertions(+), 12 deletions(-)

diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index d025a21..a0c3698 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -39,7 +39,15 @@ extern "C" {
 #endif
 using namespace rocketmq;
 using namespace std;
-
+class MyLocalTransactionExecuterInner {
+ public:
+  MyLocalTransactionExecuterInner(CLocalTransactionExecutorCallback executor, CMessage* msg, void* userData)
+      : m_ExcutorCallback(executor), message(msg), data(userData) {}
+  ~MyLocalTransactionExecuterInner() {}
+  CLocalTransactionExecutorCallback m_ExcutorCallback;
+  CMessage* message;
+  void* data;
+};
 class LocalTransactionListenerInner : public TransactionListener {
  public:
   LocalTransactionListenerInner() {}
@@ -56,8 +64,10 @@ class LocalTransactionListenerInner : public TransactionListener {
     if (m_CheckerCallback == NULL) {
       return LocalTransactionState::UNKNOWN;
     }
-    CMessage* msg = (CMessage*)(&message);
-    CTransactionStatus status = m_ExcutorCallback(m_producer, msg, arg);
+    (void)(message);
+    MyLocalTransactionExecuterInner* executerInner = (MyLocalTransactionExecuterInner*)arg;
+    CTransactionStatus status =
+        executerInner->m_ExcutorCallback(m_producer, executerInner->message, executerInner->data);
     switch (status) {
       case E_COMMIT_TRANSACTION:
         return LocalTransactionState::COMMIT_MESSAGE;
@@ -91,15 +101,8 @@ class LocalTransactionListenerInner : public TransactionListener {
 
  private:
   CLocalTransactionCheckerCallback m_CheckerCallback;
-  CLocalTransactionExecutorCallback m_ExcutorCallback;
-
   CProducer* m_producer;
   void* m_data;
-
- public:
-  void setM_m_ExcutorCallback(CLocalTransactionExecutorCallback excutorcallback) {
-    m_ExcutorCallback = excutorcallback;
-  }
 };
 
 class SelectMessageQueueInner : public MessageQueueSelector {
@@ -584,8 +587,10 @@ int SendMessageTransaction(CProducer* producer,
   try {
     DefaultProducer* defaultMQProducer = (DefaultProducer*)producer;
     MQMessage* message = (MQMessage*)msg;
-    defaultMQProducer->listenerInner->setM_m_ExcutorCallback(callback);
-    SendResult sendResult = defaultMQProducer->innerTransactionProducer->sendMessageInTransaction(*message, userData);
+    MyLocalTransactionExecuterInner executerInner(callback, msg, userData);
+    // defaultMQProducer->listenerInner->setM_m_ExcutorCallback(callback);
+    SendResult sendResult =
+        defaultMQProducer->innerTransactionProducer->sendMessageInTransaction(*message, &executerInner);
     result->sendStatus = CSendStatus((int)sendResult.getSendStatus());
     result->offset = sendResult.getQueueOffset();
     strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);