You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2020/02/24 07:32:14 UTC
[rocketmq-client-cpp] branch master updated: refactor(transaction):
use userdata to cache the local checker callback (#252)
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 915d827 refactor(transaction): use userdata to cache the local checker callback (#252)
915d827 is described below
commit 915d8279cc50a758a504d3002cc00a188b08c627
Author: dinglei <li...@163.com>
AuthorDate: Mon Feb 24 15:32:08 2020 +0800
refactor(transaction): use userdata to cache the local checker callback (#252)
---
src/extern/CProducer.cpp | 29 +++++++++++++++++------------
1 file changed, 17 insertions(+), 12 deletions(-)
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index d025a21..a0c3698 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -39,7 +39,15 @@ extern "C" {
#endif
using namespace rocketmq;
using namespace std;
-
+class MyLocalTransactionExecuterInner {
+ public:
+ MyLocalTransactionExecuterInner(CLocalTransactionExecutorCallback executor, CMessage* msg, void* userData)
+ : m_ExcutorCallback(executor), message(msg), data(userData) {}
+ ~MyLocalTransactionExecuterInner() {}
+ CLocalTransactionExecutorCallback m_ExcutorCallback;
+ CMessage* message;
+ void* data;
+};
class LocalTransactionListenerInner : public TransactionListener {
public:
LocalTransactionListenerInner() {}
@@ -56,8 +64,10 @@ class LocalTransactionListenerInner : public TransactionListener {
if (m_CheckerCallback == NULL) {
return LocalTransactionState::UNKNOWN;
}
- CMessage* msg = (CMessage*)(&message);
- CTransactionStatus status = m_ExcutorCallback(m_producer, msg, arg);
+ (void)(message);
+ MyLocalTransactionExecuterInner* executerInner = (MyLocalTransactionExecuterInner*)arg;
+ CTransactionStatus status =
+ executerInner->m_ExcutorCallback(m_producer, executerInner->message, executerInner->data);
switch (status) {
case E_COMMIT_TRANSACTION:
return LocalTransactionState::COMMIT_MESSAGE;
@@ -91,15 +101,8 @@ class LocalTransactionListenerInner : public TransactionListener {
private:
CLocalTransactionCheckerCallback m_CheckerCallback;
- CLocalTransactionExecutorCallback m_ExcutorCallback;
-
CProducer* m_producer;
void* m_data;
-
- public:
- void setM_m_ExcutorCallback(CLocalTransactionExecutorCallback excutorcallback) {
- m_ExcutorCallback = excutorcallback;
- }
};
class SelectMessageQueueInner : public MessageQueueSelector {
@@ -584,8 +587,10 @@ int SendMessageTransaction(CProducer* producer,
try {
DefaultProducer* defaultMQProducer = (DefaultProducer*)producer;
MQMessage* message = (MQMessage*)msg;
- defaultMQProducer->listenerInner->setM_m_ExcutorCallback(callback);
- SendResult sendResult = defaultMQProducer->innerTransactionProducer->sendMessageInTransaction(*message, userData);
+ MyLocalTransactionExecuterInner executerInner(callback, msg, userData);
+ // defaultMQProducer->listenerInner->setM_m_ExcutorCallback(callback);
+ SendResult sendResult =
+ defaultMQProducer->innerTransactionProducer->sendMessageInTransaction(*message, &executerInner);
result->sendStatus = CSendStatus((int)sendResult.getSendStatus());
result->offset = sendResult.getQueueOffset();
strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);