You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/07/24 02:57:59 UTC
[rocketmq-client-cpp] branch master updated: [Feature] Support
producer transaction message (#154)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 5e4b657 [Feature] Support producer transaction message (#154)
5e4b657 is described below
commit 5e4b6577b84bd6481a56d38748d7e57e423c2415
Author: jonnxu <jo...@163.com>
AuthorDate: Wed Jul 24 10:57:54 2019 +0800
[Feature] Support producer transaction message (#154)
* Send transaction message feature
---
example/TransactionProducer.cpp | 135 +++++++++++++++++++
include/MQMessage.h | 4 +
include/SendResult.h | 8 +-
include/TransactionListener.h | 48 +++++++
include/TransactionMQProducer.h | 74 ++++++++++
include/TransactionSendResult.h | 48 +++++++
src/MQClientAPIImpl.cpp | 16 ++-
src/MQClientAPIImpl.h | 7 +-
src/MQClientFactory.cpp | 43 ++++++
src/MQClientFactory.h | 9 +-
src/message/MQMessageId.h | 11 +-
src/producer/SendResult.cpp | 15 ++-
src/producer/TransactionMQProducer.cpp | 216 ++++++++++++++++++++++++++++++
src/protocol/CommandHeader.cpp | 86 +++++++++++-
src/protocol/CommandHeader.h | 62 ++++++++-
src/protocol/RemotingCommand.cpp | 4 +
src/transport/ClientRemotingProcessor.cpp | 50 ++++++-
src/transport/ClientRemotingProcessor.h | 13 ++
18 files changed, 836 insertions(+), 13 deletions(-)
diff --git a/example/TransactionProducer.cpp b/example/TransactionProducer.cpp
new file mode 100644
index 0000000..1aabb08
--- /dev/null
+++ b/example/TransactionProducer.cpp
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <atomic>
+#include <condition_variable>
+#include <iomanip>
+#include <iostream>
+#include <mutex>
+#include <thread>
+#include "TransactionListener.h"
+#include "TransactionMQProducer.h"
+#include "TransactionSendResult.h"
+#include "common.h"
+
+using namespace rocketmq;
+
+std::atomic<bool> g_quit;
+std::mutex g_mtx;
+std::condition_variable g_finished;
+TpsReportService g_tps;
+
+class MyTransactionListener : public TransactionListener {
+ virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) {
+ if (!arg) {
+ std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId()
+ << ", return state: COMMIT_MESAGE " << endl;
+ return LocalTransactionState::COMMIT_MESSAGE;
+ }
+
+ LocalTransactionState state = (LocalTransactionState)(*(int*)arg % 3);
+ std::cout << "executeLocalTransaction transactionId:" << msg.getTransactionId() << ", return state: " << state
+ << endl;
+ return state;
+ }
+
+ virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) {
+ std::cout << "checkLocalTransaction enter msg:" << msg.toString() << endl;
+ return LocalTransactionState::COMMIT_MESSAGE;
+ }
+};
+
+void SyncProducerWorker(RocketmqSendAndConsumerArgs* info, TransactionMQProducer* producer) {
+ while (!g_quit.load()) {
+ if (g_msgCount.load() <= 0) {
+ std::this_thread::sleep_for(std::chrono::seconds(60));
+ std::unique_lock<std::mutex> lck(g_mtx);
+ g_finished.notify_one();
+ break;
+ }
+
+ MQMessage msg(info->topic, // topic
+ "*", // tag
+ info->body); // body
+ try {
+ auto start = std::chrono::system_clock::now();
+ std::cout << "before sendMessageInTransaction" << endl;
+ LocalTransactionState state = LocalTransactionState::UNKNOWN;
+ TransactionSendResult sendResult = producer->sendMessageInTransaction(msg, &state);
+ std::cout << "after sendMessageInTransaction msgId: " << sendResult.getMsgId() << endl;
+ g_tps.Increment();
+ --g_msgCount;
+ auto end = std::chrono::system_clock::now();
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
+ if (duration.count() >= 500) {
+ std::cout << "send RT more than: " << duration.count() << " ms with msgid: " << sendResult.getMsgId() << endl;
+ }
+ } catch (const MQException& e) {
+ std::cout << "send failed: " << e.what() << std::endl;
+ }
+ }
+}
+
+int main(int argc, char* argv[]) {
+ RocketmqSendAndConsumerArgs info;
+ if (!ParseArgs(argc, argv, &info)) {
+ exit(-1);
+ }
+ PrintRocketmqSendAndConsumerArgs(info);
+ TransactionMQProducer producer("please_rename_unique_group_name");
+ producer.setNamesrvAddr(info.namesrv);
+ producer.setNamesrvDomain(info.namesrv_domain);
+ producer.setGroupName(info.groupname);
+ producer.setInstanceName(info.groupname);
+ producer.setSessionCredentials("mq acesskey", "mq secretkey", "ALIYUN");
+ producer.setSendMsgTimeout(500);
+ producer.setTcpTransportTryLockTimeout(1000);
+ producer.setTcpTransportConnectTimeout(400);
+ producer.setLogLevel(eLOG_LEVEL_DEBUG);
+ producer.setTransactionListener(new MyTransactionListener());
+ producer.start();
+ std::vector<std::shared_ptr<std::thread>> work_pool;
+ auto start = std::chrono::system_clock::now();
+ int msgcount = g_msgCount.load();
+ g_tps.start();
+
+ int threadCount = info.thread_count;
+ for (int j = 0; j < threadCount; j++) {
+ std::shared_ptr<std::thread> th = std::make_shared<std::thread>(SyncProducerWorker, &info, &producer);
+ work_pool.push_back(th);
+ }
+
+ {
+ std::unique_lock<std::mutex> lck(g_mtx);
+ g_finished.wait(lck);
+ g_quit.store(true);
+ }
+
+ auto end = std::chrono::system_clock::now();
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
+
+ std::cout << "per msg time: " << duration.count() / (double)msgcount << "ms \n"
+ << "========================finished==============================\n";
+
+ for (size_t th = 0; th != work_pool.size(); ++th) {
+ work_pool[th]->join();
+ }
+
+ producer.shutdown();
+
+ return 0;
+}
diff --git a/include/MQMessage.h b/include/MQMessage.h
index e4a6b5e..70fab36 100644
--- a/include/MQMessage.h
+++ b/include/MQMessage.h
@@ -73,6 +73,9 @@ class ROCKETMQCLIENT_API MQMessage {
void setBody(const char* body, int len);
void setBody(const std::string& body);
+ void setTransactionId(const std::string& id) { m_transactionId = id; }
+ std::string getTransactionId() const { return m_transactionId; }
+
std::map<std::string, std::string> getProperties() const;
void setProperties(std::map<std::string, std::string>& properties);
@@ -132,6 +135,7 @@ class ROCKETMQCLIENT_API MQMessage {
std::string m_topic;
int m_flag;
std::string m_body;
+ std::string m_transactionId;
std::map<std::string, std::string> m_properties;
};
//<!***************************************************************************
diff --git a/include/SendResult.h b/include/SendResult.h
index 2f8883a..870d03b 100644
--- a/include/SendResult.h
+++ b/include/SendResult.h
@@ -39,11 +39,16 @@ class ROCKETMQCLIENT_API SendResult {
SendResult(const SendResult& other);
SendResult& operator=(const SendResult& other);
+ void setTransactionId(const std::string& id) { m_transactionId = id; }
+
+ std::string getTransactionId() { return m_transactionId; }
+
const std::string& getMsgId() const;
const std::string& getOffsetMsgId() const;
SendStatus getSendStatus() const;
MQMessageQueue getMessageQueue() const;
int64 getQueueOffset() const;
+ std::string toString() const;
private:
SendStatus m_sendStatus;
@@ -51,8 +56,9 @@ class ROCKETMQCLIENT_API SendResult {
std::string m_offsetMsgId;
MQMessageQueue m_messageQueue;
int64 m_queueOffset;
+ std::string m_transactionId;
};
//<!***************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
#endif
diff --git a/include/TransactionListener.h b/include/TransactionListener.h
new file mode 100644
index 0000000..6756e96
--- /dev/null
+++ b/include/TransactionListener.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __TRANSACTIONLISTENER_H__
+#define __TRANSACTIONLISTENER_H__
+
+#include "MQMessage.h"
+#include "MQMessageExt.h"
+#include "TransactionSendResult.h"
+
+namespace rocketmq {
+class ROCKETMQCLIENT_API TransactionListener {
+ public:
+ virtual ~TransactionListener() {}
+ /**
+ * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
+ *
+ * @param msg Half(prepare) message
+ * @param arg Custom business parameter
+ * @return Transaction state
+ */
+ virtual LocalTransactionState executeLocalTransaction(const MQMessage& msg, void* arg) = 0;
+
+ /**
+ * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
+ * method will be invoked to get local transaction status.
+ *
+ * @param msg Check message
+ * @return Transaction state
+ */
+ virtual LocalTransactionState checkLocalTransaction(const MQMessageExt& msg) = 0;
+};
+} // namespace rocketmq
+#endif
diff --git a/include/TransactionMQProducer.h b/include/TransactionMQProducer.h
new file mode 100644
index 0000000..fcd9a7c
--- /dev/null
+++ b/include/TransactionMQProducer.h
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __TRANSACTIONMQPRODUCER_H__
+#define __TRANSACTIONMQPRODUCER_H__
+
+#include <boost/asio.hpp>
+#include <boost/asio/io_service.hpp>
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/weak_ptr.hpp>
+#include <memory>
+#include <string>
+#include "DefaultMQProducer.h"
+#include "MQMessageExt.h"
+#include "TransactionListener.h"
+#include "TransactionSendResult.h"
+
+namespace rocketmq {
+
+class ROCKETMQCLIENT_API TransactionMQProducer : public DefaultMQProducer {
+ public:
+ TransactionMQProducer(const std::string& producerGroup)
+ : DefaultMQProducer(producerGroup), m_thread_num(1), m_ioServiceWork(m_ioService) {}
+ virtual ~TransactionMQProducer() {}
+ void start();
+ void shutdown();
+ std::shared_ptr<TransactionListener> getTransactionListener() { return m_transactionListener; }
+ void setTransactionListener(TransactionListener* listener) { m_transactionListener.reset(listener); }
+ TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg);
+ void checkTransactionState(const std::string& addr,
+ const MQMessageExt& message,
+ long tranStateTableOffset,
+ long commitLogOffset,
+ const std::string& msgId,
+ const std::string& transactionId,
+ const std::string& offsetMsgId);
+
+ private:
+ void initTransactionEnv();
+ void destroyTransactionEnv();
+ void endTransaction(SendResult& sendResult, LocalTransactionState& localTransactionState);
+ void checkTransactionStateImpl(const std::string& addr,
+ const MQMessageExt& message,
+ long tranStateTableOffset,
+ long commitLogOffset,
+ const std::string& msgId,
+ const std::string& transactionId,
+ const std::string& offsetMsgId);
+
+ private:
+ std::shared_ptr<TransactionListener> m_transactionListener;
+ int m_thread_num;
+ boost::thread_group m_threadpool;
+ boost::asio::io_service m_ioService;
+ boost::asio::io_service::work m_ioServiceWork;
+};
+} // namespace rocketmq
+
+#endif
diff --git a/include/TransactionSendResult.h b/include/TransactionSendResult.h
new file mode 100644
index 0000000..0bb1e48
--- /dev/null
+++ b/include/TransactionSendResult.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __TRANSACTIONSENDRESULT_H__
+#define __TRANSACTIONSENDRESULT_H__
+
+#include "SendResult.h"
+
+namespace rocketmq {
+
+enum LocalTransactionState { COMMIT_MESSAGE, ROLLBACK_MESSAGE, UNKNOWN };
+
+class ROCKETMQCLIENT_API TransactionSendResult : public SendResult {
+ public:
+ TransactionSendResult() {}
+
+ TransactionSendResult(const SendStatus& sendStatus,
+ const std::string& msgId,
+ const std::string& offsetMsgId,
+ const MQMessageQueue& messageQueue,
+ int64 queueOffset)
+ : SendResult(sendStatus, msgId, offsetMsgId, messageQueue, queueOffset) {}
+
+ LocalTransactionState getLocalTransactionState() { return m_localTransactionState; }
+
+ void setLocalTransactionState(LocalTransactionState localTransactionState) {
+ m_localTransactionState = localTransactionState;
+ }
+
+ private:
+ LocalTransactionState m_localTransactionState;
+};
+} // namespace rocketmq
+#endif
\ No newline at end of file
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 2038f6c..f890968 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -209,6 +209,18 @@ void MQClientAPIImpl::createTopic(const string& addr,
THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
}
+void MQClientAPIImpl::endTransactionOneway(std::string addr,
+ EndTransactionRequestHeader* requestHeader,
+ std::string remark,
+ const SessionCredentials& sessionCredentials) {
+ RemotingCommand request(END_TRANSACTION, requestHeader);
+ request.setRemark(remark);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+ m_pRemotingClient->invokeOneway(addr, request);
+ return;
+}
+
SendResult MQClientAPIImpl::sendMessage(const string& addr,
const string& brokerName,
const MQMessage& msg,
@@ -373,9 +385,9 @@ SendResult MQClientAPIImpl::sendMessageSync(const string& addr,
unique_ptr<RemotingCommand> pResponse(m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
if (pResponse != NULL) {
try {
- LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s", msg.toString().c_str(), addr.c_str(),
- brokerName.c_str());
SendResult result = processSendResponse(brokerName, msg, pResponse.get());
+ LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s, send status:%d", msg.toString().c_str(),
+ addr.c_str(), brokerName.c_str(), (int)result.getSendStatus());
return result;
} catch (...) {
LOG_ERROR("send error");
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index a36038d..763e45d 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
#ifndef __MQCLIENTAPIIMPL_H__
#define __MQCLIENTAPIIMPL_H__
#include "AsyncCallback.h"
@@ -60,6 +61,10 @@ class MQClientAPIImpl {
const string& defaultTopic,
TopicConfig topicConfig,
const SessionCredentials& sessionCredentials);
+ void endTransactionOneway(std::string addr,
+ EndTransactionRequestHeader* requestHeader,
+ std::string remark,
+ const SessionCredentials& sessionCredentials);
SendResult sendMessage(const string& addr,
const string& brokerName,
@@ -214,6 +219,6 @@ class MQClientAPIImpl {
bool m_firstFetchNameSrv;
string m_mqClientId;
};
-} //<!end namespace;
+} // namespace rocketmq
//<!***************************************************************************
#endif
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index 38ce229..99c4ffd 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -21,6 +21,7 @@
#include "PullRequest.h"
#include "Rebalance.h"
#include "TopicPublishInfo.h"
+#include "TransactionMQProducer.h"
#define MAX_BUFF_SIZE 8192
#define SAFE_BUFF_SIZE 7936 // 8192 - 256 = 7936
@@ -667,6 +668,31 @@ FindBrokerResult* MQClientFactory::findBrokerAddressInAdmin(const string& broker
return NULL;
}
+void MQClientFactory::checkTransactionState(const std::string& addr,
+ const MQMessageExt& messageExt,
+ const CheckTransactionStateRequestHeader& checkRequestHeader) {
+ string group = messageExt.getProperty(MQMessage::PROPERTY_PRODUCER_GROUP);
+ if (!group.empty()) {
+ MQProducer* producer = selectProducer(group);
+ if (producer != nullptr) {
+ TransactionMQProducer* transProducer = dynamic_cast<TransactionMQProducer*>(producer);
+ if (transProducer != nullptr) {
+ transProducer->checkTransactionState(addr, messageExt, checkRequestHeader.m_tranStateTableOffset,
+ checkRequestHeader.m_commitLogOffset, checkRequestHeader.m_msgId,
+ checkRequestHeader.m_transactionId, checkRequestHeader.m_offsetMsgId);
+ } else {
+ LOG_ERROR("checkTransactionState, producer not TransactionMQProducer failed, msg:%s",
+ messageExt.toString().data());
+ }
+ } else {
+ LOG_ERROR("checkTransactionState, pick producer by group[%s] failed, msg:%s", group.data(),
+ messageExt.toString().data());
+ }
+ } else {
+ LOG_ERROR("checkTransactionState, pick producer group failed, msg:%s", messageExt.toString().data());
+ }
+}
+
MQClientAPIImpl* MQClientFactory::getMQClientAPIImpl() const {
return m_pClientAPIImpl.get();
}
@@ -836,6 +862,23 @@ void MQClientFactory::doRebalanceByConsumerGroup(const string& consumerGroup) {
}
}
+void MQClientFactory::endTransactionOneway(const MQMessageQueue& mq,
+ EndTransactionRequestHeader* requestHeader,
+ const SessionCredentials& sessionCredentials) {
+ string brokerAddr = findBrokerAddressInPublish(mq.getBrokerName());
+ string remark = "";
+ if (!brokerAddr.empty()) {
+ try {
+ getMQClientAPIImpl()->endTransactionOneway(brokerAddr, requestHeader, remark, sessionCredentials);
+ } catch (MQException& e) {
+ LOG_ERROR("endTransactionOneway exception:%s", e.what());
+ throw e;
+ }
+ } else {
+ THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+ }
+}
+
void MQClientFactory::unregisterClient(const string& producerGroup,
const string& consumerGroup,
const SessionCredentials& sessionCredentials) {
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index eeb3637..e0d0efd 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -69,7 +69,12 @@ class MQClientFactory {
int64 begin,
int64 end,
const SessionCredentials& session_credentials);
-
+ void endTransactionOneway(const MQMessageQueue& mq,
+ EndTransactionRequestHeader* requestHeader,
+ const SessionCredentials& sessionCredentials);
+ void checkTransactionState(const std::string& addr,
+ const MQMessageExt& message,
+ const CheckTransactionStateRequestHeader& checkRequestHeader);
MQClientAPIImpl* getMQClientAPIImpl() const;
MQProducer* selectProducer(const string& group);
MQConsumer* selectConsumer(const string& group);
@@ -198,6 +203,6 @@ class MQClientFactory {
unique_ptr<boost::thread> m_consumer_async_service_thread;
};
-} //<!end namespace;
+} // namespace rocketmq
#endif
diff --git a/src/message/MQMessageId.h b/src/message/MQMessageId.h
index 38d11ee..fbe937b 100644
--- a/src/message/MQMessageId.h
+++ b/src/message/MQMessageId.h
@@ -24,7 +24,16 @@ namespace rocketmq {
//<!***************************************************************************
class MQMessageId {
public:
+ MQMessageId() {}
MQMessageId(sockaddr address, int64 offset) : m_address(address), m_offset(offset) {}
+ MQMessageId& operator=(const MQMessageId& id) {
+ if (&id == this) {
+ return *this;
+ }
+ this->m_address = id.m_address;
+ this->m_offset = id.m_offset;
+ return *this;
+ }
sockaddr getAddress() const { return m_address; }
@@ -39,6 +48,6 @@ class MQMessageId {
int64 m_offset;
};
-} //<!end namespace;
+} // namespace rocketmq
#endif
diff --git a/src/producer/SendResult.cpp b/src/producer/SendResult.cpp
index 81ddf76..6c55769 100644
--- a/src/producer/SendResult.cpp
+++ b/src/producer/SendResult.cpp
@@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "SendResult.h"
+#include <sstream>
#include "UtilAll.h"
#include "VirtualEnvUtil.h"
@@ -74,5 +75,17 @@ int64 SendResult::getQueueOffset() const {
return m_queueOffset;
}
+std::string SendResult::toString() const {
+ stringstream ss;
+ ss << "SendResult: ";
+ ss << "sendStatus:" << m_sendStatus;
+ ss << ",msgId:" << m_msgId;
+ ss << ",offsetMsgId:" << m_offsetMsgId;
+ ss << ",queueOffset:" << m_queueOffset;
+ ss << ",transactionId:" << m_transactionId;
+ ss << ",messageQueue:" << m_messageQueue.toString();
+ return ss.str();
+}
+
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/producer/TransactionMQProducer.cpp b/src/producer/TransactionMQProducer.cpp
new file mode 100644
index 0000000..fbd78c5
--- /dev/null
+++ b/src/producer/TransactionMQProducer.cpp
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TransactionMQProducer.h"
+#include <string>
+#include "CommandHeader.h"
+#include "Logging.h"
+#include "MQClientFactory.h"
+#include "MQDecoder.h"
+#include "MessageSysFlag.h"
+#include "TransactionListener.h"
+#include "TransactionSendResult.h"
+
+using namespace std;
+namespace rocketmq {
+
+void TransactionMQProducer::initTransactionEnv() {
+ for (int i = 0; i < m_thread_num; ++i) {
+ m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &m_ioService));
+ }
+}
+
+void TransactionMQProducer::destroyTransactionEnv() {
+ m_ioService.stop();
+ m_threadpool.join_all();
+}
+
+TransactionSendResult TransactionMQProducer::sendMessageInTransaction(MQMessage& msg, void* arg) {
+ if (!m_transactionListener) {
+ THROW_MQEXCEPTION(MQClientException, "transactionListener is null", -1);
+ }
+
+ SendResult sendResult;
+ msg.setProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED, "true");
+ msg.setProperty(MQMessage::PROPERTY_PRODUCER_GROUP, getGroupName());
+ try {
+ sendResult = send(msg);
+ } catch (MQException& e) {
+ THROW_MQEXCEPTION(MQClientException, e.what(), -1);
+ }
+
+ LocalTransactionState localTransactionState = LocalTransactionState::UNKNOWN;
+ switch (sendResult.getSendStatus()) {
+ case SendStatus::SEND_OK:
+ try {
+ if (sendResult.getTransactionId() != "") {
+ msg.setProperty("__transactionId__", sendResult.getTransactionId());
+ }
+ string transactionId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+ if (transactionId != "") {
+ msg.setTransactionId(transactionId);
+ }
+ LOG_DEBUG("sendMessageInTransaction, msgId:%s, transactionId:%s", sendResult.getMsgId().data(),
+ transactionId.data());
+ localTransactionState = m_transactionListener->executeLocalTransaction(msg, arg);
+ if (localTransactionState != LocalTransactionState::COMMIT_MESSAGE) {
+ LOG_WARN("executeLocalTransaction ret not LocalTransactionState::commit, msg:%s", msg.toString().data());
+ }
+ } catch (MQException& e) {
+ THROW_MQEXCEPTION(MQClientException, e.what(), -1);
+ }
+ break;
+ case SendStatus::SEND_FLUSH_DISK_TIMEOUT:
+ case SendStatus::SEND_FLUSH_SLAVE_TIMEOUT:
+ case SendStatus::SEND_SLAVE_NOT_AVAILABLE:
+ localTransactionState = LocalTransactionState::ROLLBACK_MESSAGE;
+ LOG_WARN("sendMessageInTransaction, send not ok, rollback, result:%s", sendResult.toString().data());
+ break;
+ default:
+ break;
+ }
+
+ try {
+ endTransaction(sendResult, localTransactionState);
+ } catch (MQException& e) {
+ LOG_WARN("endTransaction exception:%s", e.what());
+ }
+
+ TransactionSendResult transactionSendResult(sendResult.getSendStatus(), sendResult.getMsgId(),
+ sendResult.getOffsetMsgId(), sendResult.getMessageQueue(),
+ sendResult.getQueueOffset());
+ transactionSendResult.setTransactionId(msg.getTransactionId());
+ transactionSendResult.setLocalTransactionState(localTransactionState);
+ return transactionSendResult;
+}
+
+void TransactionMQProducer::endTransaction(SendResult& sendResult, LocalTransactionState& localTransactionState) {
+ MQMessageId id;
+ if (sendResult.getOffsetMsgId() != "") {
+ id = MQDecoder::decodeMessageId(sendResult.getOffsetMsgId());
+ } else {
+ id = MQDecoder::decodeMessageId(sendResult.getMsgId());
+ }
+ string transId = sendResult.getTransactionId();
+
+ int commitOrRollback = MessageSysFlag::TransactionNotType;
+ switch (localTransactionState) {
+ case COMMIT_MESSAGE:
+ commitOrRollback = MessageSysFlag::TransactionCommitType;
+ break;
+ case ROLLBACK_MESSAGE:
+ commitOrRollback = MessageSysFlag::TransactionRollbackType;
+ break;
+ case UNKNOWN:
+ commitOrRollback = MessageSysFlag::TransactionNotType;
+ break;
+ default:
+ break;
+ }
+
+ bool fromTransCheck = false;
+ EndTransactionRequestHeader* requestHeader =
+ new EndTransactionRequestHeader(getGroupName(), sendResult.getQueueOffset(), id.getOffset(), commitOrRollback,
+ fromTransCheck, sendResult.getMsgId(), transId);
+ LOG_DEBUG("endTransaction: msg:%s", requestHeader->toString().data());
+ getFactory()->endTransactionOneway(sendResult.getMessageQueue(), requestHeader, getSessionCredentials());
+}
+
+void TransactionMQProducer::checkTransactionState(const std::string& addr,
+ const MQMessageExt& message,
+ long tranStateTableOffset,
+ long commitLogOffset,
+ const std::string& msgId,
+ const std::string& transactionId,
+ const std::string& offsetMsgId) {
+ LOG_DEBUG("checkTransactionState: msgId:%s, transactionId:%s", msgId.data(), transactionId.data());
+ if (!m_transactionListener) {
+ LOG_WARN("checkTransactionState, transactionListener null");
+ THROW_MQEXCEPTION(MQClientException, "checkTransactionState, transactionListener null", -1);
+ }
+
+ m_ioService.post(boost::bind(&TransactionMQProducer::checkTransactionStateImpl, this, addr, message,
+ tranStateTableOffset, commitLogOffset, msgId, transactionId, offsetMsgId));
+}
+
+void TransactionMQProducer::checkTransactionStateImpl(const std::string& addr,
+ const MQMessageExt& message,
+ long tranStateTableOffset,
+ long commitLogOffset,
+ const std::string& msgId,
+ const std::string& transactionId,
+ const std::string& offsetMsgId) {
+ LOG_DEBUG("checkTransactionStateImpl: msgId:%s, transactionId:%s", msgId.data(), transactionId.data());
+ LocalTransactionState localTransactionState = UNKNOWN;
+ try {
+ localTransactionState = m_transactionListener->checkLocalTransaction(message);
+ } catch (MQException& e) {
+ LOG_INFO("checkTransactionState, checkLocalTransaction exception: %s", e.what());
+ }
+
+ EndTransactionRequestHeader* endHeader = new EndTransactionRequestHeader();
+ endHeader->m_commitLogOffset = commitLogOffset;
+ endHeader->m_producerGroup = getGroupName();
+ endHeader->m_tranStateTableOffset = tranStateTableOffset;
+ endHeader->m_fromTransactionCheck = true;
+
+ string uniqueKey = transactionId;
+ if (transactionId.empty()) {
+ uniqueKey = message.getMsgId();
+ }
+
+ endHeader->m_msgId = uniqueKey;
+ endHeader->m_transactionId = transactionId;
+ switch (localTransactionState) {
+ case COMMIT_MESSAGE:
+ endHeader->m_commitOrRollback = MessageSysFlag::TransactionCommitType;
+ break;
+ case ROLLBACK_MESSAGE:
+ endHeader->m_commitOrRollback = MessageSysFlag::TransactionRollbackType;
+ LOG_WARN("when broker check, client rollback this transaction, %s", endHeader->toString().data());
+ break;
+ case UNKNOWN:
+ endHeader->m_commitOrRollback = MessageSysFlag::TransactionNotType;
+ LOG_WARN("when broker check, client does not know this transaction state, %s", endHeader->toString().data());
+ break;
+ default:
+ break;
+ }
+
+ LOG_INFO("checkTransactionState, endTransactionOneway: uniqueKey:%s, client state:%d, end header: %s",
+ uniqueKey.data(), localTransactionState, endHeader->toString().data());
+
+ string remark;
+ try {
+ getFactory()->getMQClientAPIImpl()->endTransactionOneway(addr, endHeader, remark, getSessionCredentials());
+ } catch (MQException& e) {
+ LOG_ERROR("endTransactionOneway exception:%s", e.what());
+ throw e;
+ }
+}
+
+void TransactionMQProducer::start() {
+ initTransactionEnv();
+ DefaultMQProducer::start();
+}
+
+void TransactionMQProducer::shutdown() {
+ DefaultMQProducer::shutdown();
+ destroyTransactionEnv();
+}
+
+} // namespace rocketmq
diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index 95ef166..2f19236 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -60,6 +60,90 @@ void CreateTopicRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, strin
requestMap.insert(pair<string, string>("topicFilterType", topicFilterType));
}
+void CheckTransactionStateRequestHeader::Encode(Json::Value& outData) {}
+
+CommandHeader* CheckTransactionStateRequestHeader::Decode(Json::Value& ext) {
+ CheckTransactionStateRequestHeader* h = new CheckTransactionStateRequestHeader();
+ Json::Value& tempValue = ext["msgId"];
+ if (tempValue.isString()) {
+ h->m_msgId = tempValue.asString();
+ }
+
+ tempValue = ext["transactionId"];
+ if (tempValue.isString()) {
+ h->m_transactionId = tempValue.asString();
+ }
+
+ tempValue = ext["offsetMsgId"];
+ if (tempValue.isString()) {
+ h->m_offsetMsgId = tempValue.asString();
+ }
+
+ tempValue = ext["tranStateTableOffset"];
+ if (tempValue.isString()) {
+ h->m_tranStateTableOffset = UtilAll::str2ll(tempValue.asCString());
+ }
+
+ tempValue = ext["commitLogOffset"];
+ if (tempValue.isString()) {
+ h->m_commitLogOffset = UtilAll::str2ll(tempValue.asCString());
+ }
+
+ return h;
+}
+
+void CheckTransactionStateRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("msgId", m_msgId));
+ requestMap.insert(pair<string, string>("transactionId", m_transactionId));
+ requestMap.insert(pair<string, string>("offsetMsgId", m_offsetMsgId));
+ requestMap.insert(pair<string, string>("commitLogOffset", UtilAll::to_string(m_commitLogOffset)));
+ requestMap.insert(pair<string, string>("tranStateTableOffset", UtilAll::to_string(m_tranStateTableOffset)));
+}
+
+std::string CheckTransactionStateRequestHeader::toString() {
+ stringstream ss;
+ ss << "CheckTransactionStateRequestHeader:";
+ ss << " msgId:" << m_msgId;
+ ss << " transactionId:" << m_transactionId;
+ ss << " offsetMsgId:" << m_offsetMsgId;
+ ss << " commitLogOffset:" << m_commitLogOffset;
+ ss << " tranStateTableOffset:" << m_tranStateTableOffset;
+ return ss.str();
+}
+
+void EndTransactionRequestHeader::Encode(Json::Value& outData) {
+ outData["msgId"] = m_msgId;
+ outData["transactionId"] = m_transactionId;
+ outData["producerGroup"] = m_producerGroup;
+ outData["tranStateTableOffset"] = UtilAll::to_string(m_tranStateTableOffset);
+ outData["commitLogOffset"] = UtilAll::to_string(m_commitLogOffset);
+ outData["commitOrRollback"] = UtilAll::to_string(m_commitOrRollback);
+ outData["fromTransactionCheck"] = UtilAll::to_string(m_fromTransactionCheck);
+}
+
+void EndTransactionRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("msgId", m_msgId));
+ requestMap.insert(pair<string, string>("transactionId", m_transactionId));
+ requestMap.insert(pair<string, string>("producerGroup", m_producerGroup));
+ requestMap.insert(pair<string, string>("tranStateTableOffset", UtilAll::to_string(m_tranStateTableOffset)));
+ requestMap.insert(pair<string, string>("commitLogOffset", UtilAll::to_string(m_commitLogOffset)));
+ requestMap.insert(pair<string, string>("commitOrRollback", UtilAll::to_string(m_commitOrRollback)));
+ requestMap.insert(pair<string, string>("fromTransactionCheck", UtilAll::to_string(m_fromTransactionCheck)));
+}
+
+std::string EndTransactionRequestHeader::toString() {
+ stringstream ss;
+ ss << "EndTransactionRequestHeader:";
+ ss << " m_msgId:" << m_msgId;
+ ss << " m_transactionId:" << m_transactionId;
+ ss << " m_producerGroup:" << m_producerGroup;
+ ss << " m_tranStateTableOffset:" << m_tranStateTableOffset;
+ ss << " m_commitLogOffset:" << m_commitLogOffset;
+ ss << " m_commitOrRollback:" << m_commitOrRollback;
+ ss << " m_fromTransactionCheck:" << m_fromTransactionCheck;
+ return ss.str();
+}
+
//<!************************************************************************
void SendMessageRequestHeader::Encode(Json::Value& outData) {
outData["producerGroup"] = producerGroup;
@@ -532,4 +616,4 @@ const string NotifyConsumerIdsChangedRequestHeader::getGroup() const {
}
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/protocol/CommandHeader.h b/src/protocol/CommandHeader.h
index 2ad3e47..4a80ecf 100644
--- a/src/protocol/CommandHeader.h
+++ b/src/protocol/CommandHeader.h
@@ -18,6 +18,7 @@
#ifndef __COMMANDCUSTOMHEADER_H__
#define __COMMANDCUSTOMHEADER_H__
+#include <map>
#include <string>
#include "MQClientException.h"
#include "MessageSysFlag.h"
@@ -35,6 +36,65 @@ class CommandHeader {
virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap) {}
};
+class CheckTransactionStateRequestHeader : public CommandHeader {
+ public:
+ CheckTransactionStateRequestHeader() {}
+ CheckTransactionStateRequestHeader(long tableOffset,
+ long commLogOffset,
+ const std::string& msgid,
+ const std::string& transactionId,
+ const std::string& offsetMsgId)
+ : m_tranStateTableOffset(tableOffset),
+ m_commitLogOffset(commLogOffset),
+ m_msgId(msgid),
+ m_transactionId(transactionId),
+ m_offsetMsgId(offsetMsgId) {}
+ virtual ~CheckTransactionStateRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ static CommandHeader* Decode(Json::Value& ext);
+ virtual void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap);
+ std::string toString();
+
+ public:
+ long m_tranStateTableOffset;
+ long m_commitLogOffset;
+ std::string m_msgId;
+ std::string m_transactionId;
+ std::string m_offsetMsgId;
+};
+
+class EndTransactionRequestHeader : public CommandHeader {
+ public:
+ EndTransactionRequestHeader() {}
+ EndTransactionRequestHeader(const std::string& groupName,
+ long tableOffset,
+ long commLogOffset,
+ int commitOrRoll,
+ bool fromTransCheck,
+ const std::string& msgid,
+ const std::string& transId)
+ : m_producerGroup(groupName),
+ m_tranStateTableOffset(tableOffset),
+ m_commitLogOffset(commLogOffset),
+ m_commitOrRollback(commitOrRoll),
+ m_fromTransactionCheck(fromTransCheck),
+ m_msgId(msgid),
+ m_transactionId(transId) {}
+ virtual ~EndTransactionRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(std::map<string, string>& requestMap);
+ std::string toString();
+
+ public:
+ std::string m_producerGroup;
+ long m_tranStateTableOffset;
+ long m_commitLogOffset;
+ int m_commitOrRollback;
+ bool m_fromTransactionCheck;
+ std::string m_msgId;
+ std::string m_transactionId;
+};
+
//<!************************************************************************
class GetRouteInfoRequestHeader : public CommandHeader {
public:
@@ -423,6 +483,6 @@ class NotifyConsumerIdsChangedRequestHeader : public CommandHeader {
};
//<!***************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
#endif
diff --git a/src/protocol/RemotingCommand.cpp b/src/protocol/RemotingCommand.cpp
index f556a24..08765de 100644
--- a/src/protocol/RemotingCommand.cpp
+++ b/src/protocol/RemotingCommand.cpp
@@ -242,6 +242,10 @@ void RemotingCommand::SetExtHeader(int code) {
break;
case NOTIFY_CONSUMER_IDS_CHANGED:
m_pExtHeader.reset(NotifyConsumerIdsChangedRequestHeader::Decode(ext));
+ break;
+ case CHECK_TRANSACTION_STATE:
+ m_pExtHeader.reset(CheckTransactionStateRequestHeader::Decode(ext));
+ break;
default:
break;
}
diff --git a/src/transport/ClientRemotingProcessor.cpp b/src/transport/ClientRemotingProcessor.cpp
index b0be046..63736c5 100644
--- a/src/transport/ClientRemotingProcessor.cpp
+++ b/src/transport/ClientRemotingProcessor.cpp
@@ -28,10 +28,10 @@ ClientRemotingProcessor::ClientRemotingProcessor(MQClientFactory* mqClientFactor
ClientRemotingProcessor::~ClientRemotingProcessor() {}
RemotingCommand* ClientRemotingProcessor::processRequest(const string& addr, RemotingCommand* request) {
- LOG_DEBUG("request Command received:processRequest");
+ LOG_INFO("request Command received:processRequest, addr:%s, code:%d", addr.data(), request->getCode());
switch (request->getCode()) {
case CHECK_TRANSACTION_STATE:
- // return checkTransactionState( request);
+ return checkTransactionState(addr, request);
break;
case NOTIFY_CONSUMER_IDS_CHANGED:
return notifyConsumerIdsChanged(request);
@@ -142,8 +142,52 @@ RemotingCommand* ClientRemotingProcessor::notifyConsumerIdsChanged(RemotingComma
request->SetExtHeader(request->getCode());
NotifyConsumerIdsChangedRequestHeader* requestHeader =
(NotifyConsumerIdsChangedRequestHeader*)request->getCommandHeader();
- LOG_INFO("notifyConsumerIdsChanged:%s", requestHeader->getGroup().c_str());
+ if (requestHeader == nullptr) {
+ LOG_ERROR("notifyConsumerIdsChanged requestHeader null");
+ return NULL;
+ }
+ string group = requestHeader->getGroup();
+ LOG_INFO("notifyConsumerIdsChanged:%s", group.c_str());
m_mqClientFactory->doRebalanceByConsumerGroup(requestHeader->getGroup());
return NULL;
}
+
+RemotingCommand* ClientRemotingProcessor::checkTransactionState(const std::string& addr, RemotingCommand* request) {
+ if (!request) {
+ LOG_ERROR("checkTransactionState request null");
+ return nullptr;
+ }
+
+ LOG_INFO("checkTransactionState addr:%s, request: %s", addr.data(), request->ToString().data());
+
+ request->SetExtHeader(request->getCode());
+ CheckTransactionStateRequestHeader* requestHeader = (CheckTransactionStateRequestHeader*)request->getCommandHeader();
+ if (!requestHeader) {
+ LOG_ERROR("checkTransactionState CheckTransactionStateRequestHeader requestHeader null");
+ return nullptr;
+ }
+ LOG_INFO("checkTransactionState request: %s", requestHeader->toString().data());
+
+ const MemoryBlock* block = request->GetBody();
+ if (block && block->getSize() > 0) {
+ std::vector<MQMessageExt> mqvec;
+ MQDecoder::decodes(block, mqvec);
+ if (mqvec.size() == 0) {
+ LOG_ERROR("checkTransactionState decodes MQMessageExt fail, request:%s", requestHeader->toString().data());
+ return nullptr;
+ }
+
+ MQMessageExt& messageExt = mqvec[0];
+ string transactionId = messageExt.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+ if (transactionId != "") {
+ messageExt.setTransactionId(transactionId);
+ }
+
+ m_mqClientFactory->checkTransactionState(addr, messageExt, *requestHeader);
+ } else {
+ LOG_ERROR("checkTransactionState getbody null or size 0, request Header:%s", requestHeader->toString().data());
+ }
+ return nullptr;
}
+
+} // namespace rocketmq
diff --git a/src/transport/ClientRemotingProcessor.h b/src/transport/ClientRemotingProcessor.h
index 58a0417..c88b8bb 100644
--- a/src/transport/ClientRemotingProcessor.h
+++ b/src/transport/ClientRemotingProcessor.h
@@ -33,6 +33,7 @@ class ClientRemotingProcessor {
RemotingCommand* resetOffset(RemotingCommand* request);
RemotingCommand* getConsumerRunningInfo(const string& addr, RemotingCommand* request);
RemotingCommand* notifyConsumerIdsChanged(RemotingCommand* request);
+ RemotingCommand* checkTransactionState(const string& addr, RemotingCommand* request);
private:
MQClientFactory* m_mqClientFactory;
@@ -49,6 +50,18 @@ class ResetOffsetBody {
private:
std::map<MQMessageQueue, int64> m_offsetTable;
};
+
+class CheckTransactionStateBody {
+ public:
+ CheckTransactionStateBody() {}
+ virtual ~CheckTransactionStateBody() { m_offsetTable.clear(); }
+ void setOffsetTable(MQMessageQueue mq, int64 offset);
+ std::map<MQMessageQueue, int64> getOffsetTable();
+ static ResetOffsetBody* Decode(const MemoryBlock* mem);
+
+ private:
+ std::map<MQMessageQueue, int64> m_offsetTable;
+};
}
#endif