You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/02/24 10:46:45 UTC

[rocketmq-client-cpp] branch master updated: feat(producer): add regionId support in the send result (#258)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new e0f05d6  feat(producer): add regionId support in the send result  (#258)
e0f05d6 is described below

commit e0f05d65e5926e8d120ae8e7b4c3780de78948ed
Author: dinglei <li...@163.com>
AuthorDate: Mon Feb 24 18:46:38 2020 +0800

    feat(producer): add regionId support in the send result  (#258)
    
    * feat(producer): add regionId support in the send result to prepare to support message trace.
---
 build.sh                                        | 11 ++++++-----
 include/SendResult.h                            | 10 ++++++++++
 src/MQClientAPIImpl.cpp                         |  3 ++-
 src/producer/SendResult.cpp                     | 21 +++++++++++++++++++++
 src/protocol/CommandHeader.cpp                  | 12 ++++++++++++
 src/protocol/CommandHeader.h                    |  7 ++++++-
 test/src/producer/DefaultMQProducerImplTest.cpp |  4 +++-
 test/src/protocol/CommandHeaderTest.cpp         |  8 ++++++++
 8 files changed, 68 insertions(+), 8 deletions(-)

diff --git a/build.sh b/build.sh
index aa9b746..37fe0ea 100755
--- a/build.sh
+++ b/build.sh
@@ -23,6 +23,7 @@ declare down_dir="${basepath}/tmp_down_dir"
 declare build_dir="${basepath}/tmp_build_dir"
 declare packet_dir="${basepath}/tmp_packet_dir"
 declare install_lib_dir="${basepath}/bin"
+declare static_package_dir="${basepath}/tmp_static_package_dir"
 declare fname_libevent="libevent*.zip"
 declare fname_jsoncpp="jsoncpp*.zip"
 declare fname_boost="boost*.tar.gz"
@@ -407,21 +408,21 @@ ExecutionTesting() {
 }
 
 PackageRocketMQStatic() {
-  echo ">>>>>>>>>Start package static rocketmq library."
+  echo "############# Start package static rocketmq library. #############"
   if test "$(uname)" = "Linux"; then
     #packet libevent,jsoncpp,boost,rocketmq,Signature to one librocketmq.a
     cp -f ${basepath}/libs/signature/lib/libSignature.a ${install_lib_dir}/lib
     ar -M <${basepath}/package_rocketmq.mri
     cp -f librocketmq.a ${install_lib_dir}
   elif test "$(uname)" = "Darwin" ; then
-    mkdir ${static_package_dir}
+    mkdir -p ${static_package_dir}
     cd ${static_package_dir}
     cp -f ${basepath}/libs/signature/lib/libSignature.a .
     cp -f ${install_lib_dir}/lib/lib*.a .
     cp -f ${install_lib_dir}/librocketmq.a .
     echo "Md5 Hash RocketMQ Before:"
     md5sum librocketmq.a
-    dir=`ls *.a | grep -v  gtest | grep -v gmock `
+    local dir=`ls *.a | grep -v  gtest | grep -v gmock `
     for i in $dir
     do
       echo $i
@@ -433,14 +434,14 @@ PackageRocketMQStatic() {
     ranlib librocketmq.a
     echo "Md5 Hash RocketMQ After:"
     md5sum librocketmq.a
-    echo "Try to copy $(pwd)/librocketmq to ${install_lib_dir}/"
+    echo "Try to copy $(pwd)/librocketmq.a to ${install_lib_dir}/"
     cp -f librocketmq.a  ${install_lib_dir}/
     rm -rf *.o
     rm -rf __.*
     cd ${basepath}
     rm -rf ${static_package_dir}
   fi
-  echo "<<<<<<<<Success package static rocketmq library."
+  echo "############# Package static rocketmq library success.#############"
 }
 
 PrintParams
diff --git a/include/SendResult.h b/include/SendResult.h
index 94fd43b..cfe83ce 100644
--- a/include/SendResult.h
+++ b/include/SendResult.h
@@ -32,6 +32,12 @@ class ROCKETMQCLIENT_API SendResult {
              const std::string& offsetMsgId,
              const MQMessageQueue& messageQueue,
              int64 queueOffset);
+  SendResult(const SendStatus& sendStatus,
+             const std::string& msgId,
+             const std::string& offsetMsgId,
+             const MQMessageQueue& messageQueue,
+             int64 queueOffset,
+             const std::string& regionId);
 
   virtual ~SendResult();
   SendResult(const SendResult& other);
@@ -43,6 +49,9 @@ class ROCKETMQCLIENT_API SendResult {
 
   const std::string& getMsgId() const;
   const std::string& getOffsetMsgId() const;
+
+  const std::string& getRegionId() const;
+  void setRegionId(const std::string& regionId);
   SendStatus getSendStatus() const;
   MQMessageQueue getMessageQueue() const;
   int64 getQueueOffset() const;
@@ -55,6 +64,7 @@ class ROCKETMQCLIENT_API SendResult {
   MQMessageQueue m_messageQueue;
   int64 m_queueOffset;
   std::string m_transactionId;
+  std::string m_regionId;
 };
 
 }  // namespace rocketmq
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index df1effa..7d16d67 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -537,7 +537,8 @@ SendResult MQClientAPIImpl::processSendResponse(const string& brokerName,
     SendMessageResponseHeader* responseHeader = (SendMessageResponseHeader*)pResponse->getCommandHeader();
     MQMessageQueue messageQueue(msg.getTopic(), brokerName, responseHeader->queueId);
     string unique_msgId = msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
-    return SendResult(sendStatus, unique_msgId, responseHeader->msgId, messageQueue, responseHeader->queueOffset);
+    return SendResult(sendStatus, unique_msgId, responseHeader->msgId, messageQueue, responseHeader->queueOffset,
+                      responseHeader->regionId);
   }
   LOG_ERROR("processSendResponse error remark:%s, error code:%d", (pResponse->getRemark()).c_str(),
             pResponse->getCode());
diff --git a/src/producer/SendResult.cpp b/src/producer/SendResult.cpp
index 6c55769..3b56ff3 100644
--- a/src/producer/SendResult.cpp
+++ b/src/producer/SendResult.cpp
@@ -34,12 +34,26 @@ SendResult::SendResult(const SendStatus& sendStatus,
       m_messageQueue(messageQueue),
       m_queueOffset(queueOffset) {}
 
+SendResult::SendResult(const SendStatus& sendStatus,
+                       const std::string& msgId,
+                       const std::string& offsetMsgId,
+                       const MQMessageQueue& messageQueue,
+                       int64 queueOffset,
+                       const string& regionId)
+    : m_sendStatus(sendStatus),
+      m_msgId(msgId),
+      m_offsetMsgId(offsetMsgId),
+      m_messageQueue(messageQueue),
+      m_queueOffset(queueOffset),
+      m_regionId(regionId) {}
+
 SendResult::SendResult(const SendResult& other) {
   m_sendStatus = other.m_sendStatus;
   m_msgId = other.m_msgId;
   m_offsetMsgId = other.m_offsetMsgId;
   m_messageQueue = other.m_messageQueue;
   m_queueOffset = other.m_queueOffset;
+  m_regionId = other.m_regionId;
 }
 
 SendResult& SendResult::operator=(const SendResult& other) {
@@ -49,6 +63,7 @@ SendResult& SendResult::operator=(const SendResult& other) {
     m_offsetMsgId = other.m_offsetMsgId;
     m_messageQueue = other.m_messageQueue;
     m_queueOffset = other.m_queueOffset;
+    m_regionId = other.m_regionId;
   }
   return *this;
 }
@@ -63,6 +78,12 @@ const string& SendResult::getOffsetMsgId() const {
   return m_offsetMsgId;
 }
 
+const string& SendResult::getRegionId() const {
+  return m_regionId;
+}
+void SendResult::setRegionId(const string& regionId) {
+  m_regionId = regionId;
+}
 SendStatus SendResult::getSendStatus() const {
   return m_sendStatus;
 }
diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index bc02028..20477a3 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -261,6 +261,16 @@ CommandHeader* SendMessageResponseHeader::Decode(Json::Value& ext) {
   if (tempValue.isString()) {
     h->queueOffset = UtilAll::str2ll(tempValue.asCString());
   }
+
+  tempValue = ext["transactionId"];
+  if (tempValue.isString()) {
+    h->transactionId = tempValue.asCString();
+  }
+
+  tempValue = ext["MSG_REGION"];
+  if (tempValue.isString()) {
+    h->regionId = tempValue.asCString();
+  }
   return h;
 }
 
@@ -268,6 +278,8 @@ void SendMessageResponseHeader::SetDeclaredFieldOfCommandHeader(map<string, stri
   requestMap.insert(pair<string, string>("msgId", msgId));
   requestMap.insert(pair<string, string>("queueId", UtilAll::to_string(queueId)));
   requestMap.insert(pair<string, string>("queueOffset", UtilAll::to_string(queueOffset)));
+  requestMap.insert(pair<string, string>("transactionId", transactionId));
+  requestMap.insert(pair<string, string>("MSG_REGION", regionId));
 }
 //<!************************************************************************
 void PullMessageRequestHeader::Encode(Json::Value& outData) {
diff --git a/src/protocol/CommandHeader.h b/src/protocol/CommandHeader.h
index bf74e07..7ebf54b 100644
--- a/src/protocol/CommandHeader.h
+++ b/src/protocol/CommandHeader.h
@@ -215,7 +215,10 @@ class SendMessageRequestHeaderV2 : public CommandHeader {
 //<!************************************************************************
 class SendMessageResponseHeader : public CommandHeader {
  public:
-  SendMessageResponseHeader() : queueId(0), queueOffset(0) { msgId.clear(); }
+  SendMessageResponseHeader() : queueId(0), queueOffset(0) {
+    msgId.clear();
+    regionId.clear();
+  }
   virtual ~SendMessageResponseHeader() {}
   static CommandHeader* Decode(Json::Value& ext);
   virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
@@ -224,6 +227,8 @@ class SendMessageResponseHeader : public CommandHeader {
   string msgId;
   int queueId;
   int64 queueOffset;
+  string regionId;
+  string transactionId;
 };
 
 //<!************************************************************************
diff --git a/test/src/producer/DefaultMQProducerImplTest.cpp b/test/src/producer/DefaultMQProducerImplTest.cpp
index 033e606..b9a042b 100644
--- a/test/src/producer/DefaultMQProducerImplTest.cpp
+++ b/test/src/producer/DefaultMQProducerImplTest.cpp
@@ -122,8 +122,10 @@ TEST(DefaultMQProducerImplTest, Sends) {
   topicPublishInfo->updateMessageQueueList(mqA);
   topicPublishInfo->updateMessageQueueList(mqB);
 
-  SendResult okMQAResult(SEND_OK, "MSSAGEID", "OFFSETID", mqA, 1024);
+  SendResult okMQAResult(SEND_OK, "MSSAGEID", "OFFSETID", mqA, 1024, "DEFAULT_REGION");
   SendResult okMQBResult(SEND_OK, "MSSAGEID", "OFFSETID", mqB, 2048);
+  okMQBResult.setRegionId("DEFAULT_REGION");
+  okMQBResult.toString();
   SendResult errorMQBResult(SEND_SLAVE_NOT_AVAILABLE, "MSSAGEID", "OFFSETID", mqB, 2048);
 
   EXPECT_CALL(*mockFactory, start()).Times(1).WillOnce(Return());
diff --git a/test/src/protocol/CommandHeaderTest.cpp b/test/src/protocol/CommandHeaderTest.cpp
index 6874adf..9e043cc 100644
--- a/test/src/protocol/CommandHeaderTest.cpp
+++ b/test/src/protocol/CommandHeaderTest.cpp
@@ -345,21 +345,29 @@ TEST(commandHeader, SendMessageResponseHeader) {
   header.msgId = "ABCDEFG";
   header.queueId = 1;
   header.queueOffset = 2;
+  header.transactionId = "ID";
+  header.regionId = "public";
   map<string, string> requestMap;
   header.SetDeclaredFieldOfCommandHeader(requestMap);
   EXPECT_EQ(requestMap["msgId"], "ABCDEFG");
   EXPECT_EQ(requestMap["queueId"], "1");
   EXPECT_EQ(requestMap["queueOffset"], "2");
+  EXPECT_EQ(requestMap["transactionId"], "ID");
+  EXPECT_EQ(requestMap["MSG_REGION"], "public");
 
   Value value;
   value["msgId"] = "EFGHIJK";
   value["queueId"] = "3";
   value["queueOffset"] = "4";
+  value["transactionId"] = "transactionId";
+  value["MSG_REGION"] = "MSG_REGION";
   shared_ptr<SendMessageResponseHeader> headerDecode(
       static_cast<SendMessageResponseHeader*>(SendMessageResponseHeader::Decode(value)));
   EXPECT_EQ(headerDecode->msgId, "EFGHIJK");
   EXPECT_EQ(headerDecode->queueId, 3);
   EXPECT_EQ(headerDecode->queueOffset, 4);
+  EXPECT_EQ(headerDecode->transactionId, "transactionId");
+  EXPECT_EQ(headerDecode->regionId, "MSG_REGION");
 }
 
 TEST(commandHeader, PullMessageRequestHeader) {