You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/02/24 10:46:45 UTC
[rocketmq-client-cpp] branch master updated: feat(producer): add
regionId support in the send result (#258)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new e0f05d6 feat(producer): add regionId support in the send result (#258)
e0f05d6 is described below
commit e0f05d65e5926e8d120ae8e7b4c3780de78948ed
Author: dinglei <li...@163.com>
AuthorDate: Mon Feb 24 18:46:38 2020 +0800
feat(producer): add regionId support in the send result (#258)
* feat(producer): add regionId support in the send result to prepare to support message trace.
---
build.sh | 11 ++++++-----
include/SendResult.h | 10 ++++++++++
src/MQClientAPIImpl.cpp | 3 ++-
src/producer/SendResult.cpp | 21 +++++++++++++++++++++
src/protocol/CommandHeader.cpp | 12 ++++++++++++
src/protocol/CommandHeader.h | 7 ++++++-
test/src/producer/DefaultMQProducerImplTest.cpp | 4 +++-
test/src/protocol/CommandHeaderTest.cpp | 8 ++++++++
8 files changed, 68 insertions(+), 8 deletions(-)
diff --git a/build.sh b/build.sh
index aa9b746..37fe0ea 100755
--- a/build.sh
+++ b/build.sh
@@ -23,6 +23,7 @@ declare down_dir="${basepath}/tmp_down_dir"
declare build_dir="${basepath}/tmp_build_dir"
declare packet_dir="${basepath}/tmp_packet_dir"
declare install_lib_dir="${basepath}/bin"
+declare static_package_dir="${basepath}/tmp_static_package_dir"
declare fname_libevent="libevent*.zip"
declare fname_jsoncpp="jsoncpp*.zip"
declare fname_boost="boost*.tar.gz"
@@ -407,21 +408,21 @@ ExecutionTesting() {
}
PackageRocketMQStatic() {
- echo ">>>>>>>>>Start package static rocketmq library."
+ echo "############# Start package static rocketmq library. #############"
if test "$(uname)" = "Linux"; then
#packet libevent,jsoncpp,boost,rocketmq,Signature to one librocketmq.a
cp -f ${basepath}/libs/signature/lib/libSignature.a ${install_lib_dir}/lib
ar -M <${basepath}/package_rocketmq.mri
cp -f librocketmq.a ${install_lib_dir}
elif test "$(uname)" = "Darwin" ; then
- mkdir ${static_package_dir}
+ mkdir -p ${static_package_dir}
cd ${static_package_dir}
cp -f ${basepath}/libs/signature/lib/libSignature.a .
cp -f ${install_lib_dir}/lib/lib*.a .
cp -f ${install_lib_dir}/librocketmq.a .
echo "Md5 Hash RocketMQ Before:"
md5sum librocketmq.a
- dir=`ls *.a | grep -v gtest | grep -v gmock `
+ local dir=`ls *.a | grep -v gtest | grep -v gmock `
for i in $dir
do
echo $i
@@ -433,14 +434,14 @@ PackageRocketMQStatic() {
ranlib librocketmq.a
echo "Md5 Hash RocketMQ After:"
md5sum librocketmq.a
- echo "Try to copy $(pwd)/librocketmq to ${install_lib_dir}/"
+ echo "Try to copy $(pwd)/librocketmq.a to ${install_lib_dir}/"
cp -f librocketmq.a ${install_lib_dir}/
rm -rf *.o
rm -rf __.*
cd ${basepath}
rm -rf ${static_package_dir}
fi
- echo "<<<<<<<<Success package static rocketmq library."
+ echo "############# Package static rocketmq library success.#############"
}
PrintParams
diff --git a/include/SendResult.h b/include/SendResult.h
index 94fd43b..cfe83ce 100644
--- a/include/SendResult.h
+++ b/include/SendResult.h
@@ -32,6 +32,12 @@ class ROCKETMQCLIENT_API SendResult {
const std::string& offsetMsgId,
const MQMessageQueue& messageQueue,
int64 queueOffset);
+ SendResult(const SendStatus& sendStatus,
+ const std::string& msgId,
+ const std::string& offsetMsgId,
+ const MQMessageQueue& messageQueue,
+ int64 queueOffset,
+ const std::string& regionId);
virtual ~SendResult();
SendResult(const SendResult& other);
@@ -43,6 +49,9 @@ class ROCKETMQCLIENT_API SendResult {
const std::string& getMsgId() const;
const std::string& getOffsetMsgId() const;
+
+ const std::string& getRegionId() const;
+ void setRegionId(const std::string& regionId);
SendStatus getSendStatus() const;
MQMessageQueue getMessageQueue() const;
int64 getQueueOffset() const;
@@ -55,6 +64,7 @@ class ROCKETMQCLIENT_API SendResult {
MQMessageQueue m_messageQueue;
int64 m_queueOffset;
std::string m_transactionId;
+ std::string m_regionId;
};
} // namespace rocketmq
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index df1effa..7d16d67 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -537,7 +537,8 @@ SendResult MQClientAPIImpl::processSendResponse(const string& brokerName,
SendMessageResponseHeader* responseHeader = (SendMessageResponseHeader*)pResponse->getCommandHeader();
MQMessageQueue messageQueue(msg.getTopic(), brokerName, responseHeader->queueId);
string unique_msgId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
- return SendResult(sendStatus, unique_msgId, responseHeader->msgId, messageQueue, responseHeader->queueOffset);
+ return SendResult(sendStatus, unique_msgId, responseHeader->msgId, messageQueue, responseHeader->queueOffset,
+ responseHeader->regionId);
}
LOG_ERROR("processSendResponse error remark:%s, error code:%d", (pResponse->getRemark()).c_str(),
pResponse->getCode());
diff --git a/src/producer/SendResult.cpp b/src/producer/SendResult.cpp
index 6c55769..3b56ff3 100644
--- a/src/producer/SendResult.cpp
+++ b/src/producer/SendResult.cpp
@@ -34,12 +34,26 @@ SendResult::SendResult(const SendStatus& sendStatus,
m_messageQueue(messageQueue),
m_queueOffset(queueOffset) {}
+SendResult::SendResult(const SendStatus& sendStatus,
+ const std::string& msgId,
+ const std::string& offsetMsgId,
+ const MQMessageQueue& messageQueue,
+ int64 queueOffset,
+ const string& regionId)
+ : m_sendStatus(sendStatus),
+ m_msgId(msgId),
+ m_offsetMsgId(offsetMsgId),
+ m_messageQueue(messageQueue),
+ m_queueOffset(queueOffset),
+ m_regionId(regionId) {}
+
SendResult::SendResult(const SendResult& other) {
m_sendStatus = other.m_sendStatus;
m_msgId = other.m_msgId;
m_offsetMsgId = other.m_offsetMsgId;
m_messageQueue = other.m_messageQueue;
m_queueOffset = other.m_queueOffset;
+ m_regionId = other.m_regionId;
}
SendResult& SendResult::operator=(const SendResult& other) {
@@ -49,6 +63,7 @@ SendResult& SendResult::operator=(const SendResult& other) {
m_offsetMsgId = other.m_offsetMsgId;
m_messageQueue = other.m_messageQueue;
m_queueOffset = other.m_queueOffset;
+ m_regionId = other.m_regionId;
}
return *this;
}
@@ -63,6 +78,12 @@ const string& SendResult::getOffsetMsgId() const {
return m_offsetMsgId;
}
+const string& SendResult::getRegionId() const {
+ return m_regionId;
+}
+void SendResult::setRegionId(const string& regionId) {
+ m_regionId = regionId;
+}
SendStatus SendResult::getSendStatus() const {
return m_sendStatus;
}
diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index bc02028..20477a3 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -261,6 +261,16 @@ CommandHeader* SendMessageResponseHeader::Decode(Json::Value& ext) {
if (tempValue.isString()) {
h->queueOffset = UtilAll::str2ll(tempValue.asCString());
}
+
+ tempValue = ext["transactionId"];
+ if (tempValue.isString()) {
+ h->transactionId = tempValue.asCString();
+ }
+
+ tempValue = ext["MSG_REGION"];
+ if (tempValue.isString()) {
+ h->regionId = tempValue.asCString();
+ }
return h;
}
@@ -268,6 +278,8 @@ void SendMessageResponseHeader::SetDeclaredFieldOfCommandHeader(map<string, stri
requestMap.insert(pair<string, string>("msgId", msgId));
requestMap.insert(pair<string, string>("queueId", UtilAll::to_string(queueId)));
requestMap.insert(pair<string, string>("queueOffset", UtilAll::to_string(queueOffset)));
+ requestMap.insert(pair<string, string>("transactionId", transactionId));
+ requestMap.insert(pair<string, string>("MSG_REGION", regionId));
}
//<!************************************************************************
void PullMessageRequestHeader::Encode(Json::Value& outData) {
diff --git a/src/protocol/CommandHeader.h b/src/protocol/CommandHeader.h
index bf74e07..7ebf54b 100644
--- a/src/protocol/CommandHeader.h
+++ b/src/protocol/CommandHeader.h
@@ -215,7 +215,10 @@ class SendMessageRequestHeaderV2 : public CommandHeader {
//<!************************************************************************
class SendMessageResponseHeader : public CommandHeader {
public:
- SendMessageResponseHeader() : queueId(0), queueOffset(0) { msgId.clear(); }
+ SendMessageResponseHeader() : queueId(0), queueOffset(0) {
+ msgId.clear();
+ regionId.clear();
+ }
virtual ~SendMessageResponseHeader() {}
static CommandHeader* Decode(Json::Value& ext);
virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
@@ -224,6 +227,8 @@ class SendMessageResponseHeader : public CommandHeader {
string msgId;
int queueId;
int64 queueOffset;
+ string regionId;
+ string transactionId;
};
//<!************************************************************************
diff --git a/test/src/producer/DefaultMQProducerImplTest.cpp b/test/src/producer/DefaultMQProducerImplTest.cpp
index 033e606..b9a042b 100644
--- a/test/src/producer/DefaultMQProducerImplTest.cpp
+++ b/test/src/producer/DefaultMQProducerImplTest.cpp
@@ -122,8 +122,10 @@ TEST(DefaultMQProducerImplTest, Sends) {
topicPublishInfo->updateMessageQueueList(mqA);
topicPublishInfo->updateMessageQueueList(mqB);
- SendResult okMQAResult(SEND_OK, "MSSAGEID", "OFFSETID", mqA, 1024);
+ SendResult okMQAResult(SEND_OK, "MSSAGEID", "OFFSETID", mqA, 1024, "DEFAULT_REGION");
SendResult okMQBResult(SEND_OK, "MSSAGEID", "OFFSETID", mqB, 2048);
+ okMQBResult.setRegionId("DEFAULT_REGION");
+ okMQBResult.toString();
SendResult errorMQBResult(SEND_SLAVE_NOT_AVAILABLE, "MSSAGEID", "OFFSETID", mqB, 2048);
EXPECT_CALL(*mockFactory, start()).Times(1).WillOnce(Return());
diff --git a/test/src/protocol/CommandHeaderTest.cpp b/test/src/protocol/CommandHeaderTest.cpp
index 6874adf..9e043cc 100644
--- a/test/src/protocol/CommandHeaderTest.cpp
+++ b/test/src/protocol/CommandHeaderTest.cpp
@@ -345,21 +345,29 @@ TEST(commandHeader, SendMessageResponseHeader) {
header.msgId = "ABCDEFG";
header.queueId = 1;
header.queueOffset = 2;
+ header.transactionId = "ID";
+ header.regionId = "public";
map<string, string> requestMap;
header.SetDeclaredFieldOfCommandHeader(requestMap);
EXPECT_EQ(requestMap["msgId"], "ABCDEFG");
EXPECT_EQ(requestMap["queueId"], "1");
EXPECT_EQ(requestMap["queueOffset"], "2");
+ EXPECT_EQ(requestMap["transactionId"], "ID");
+ EXPECT_EQ(requestMap["MSG_REGION"], "public");
Value value;
value["msgId"] = "EFGHIJK";
value["queueId"] = "3";
value["queueOffset"] = "4";
+ value["transactionId"] = "transactionId";
+ value["MSG_REGION"] = "MSG_REGION";
shared_ptr<SendMessageResponseHeader> headerDecode(
static_cast<SendMessageResponseHeader*>(SendMessageResponseHeader::Decode(value)));
EXPECT_EQ(headerDecode->msgId, "EFGHIJK");
EXPECT_EQ(headerDecode->queueId, 3);
EXPECT_EQ(headerDecode->queueOffset, 4);
+ EXPECT_EQ(headerDecode->transactionId, "transactionId");
+ EXPECT_EQ(headerDecode->regionId, "MSG_REGION");
}
TEST(commandHeader, PullMessageRequestHeader) {