You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/12/25 13:01:33 UTC

[GitHub] ShannonDing closed pull request #37: Bugs fixed

ShannonDing closed pull request #37: Bugs fixed
URL: https://github.com/apache/rocketmq-client-cpp/pull/37
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 547132d6..3a21ab4f 100755
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
- 
+
 cmake_minimum_required(VERSION 2.8)
 
 if (APPLE)
@@ -52,11 +52,11 @@ if(WIN32)
     find_package(Boost 1.56 REQUIRED COMPONENTS atomic thread system chrono date_time
         log log_setup regex serialization filesystem locale iostreams zlib)
     if(Boost_FOUND)
-        message(status "** Boost Include dir: ${Boost_INCLUDE_DIR}")
-        message(status "** Boost Libraries dir: ${Boost_LIBRARY_DIRS}")
-        message(status "** Boost Libraries: ${Boost_LIBRARIES}")
+        message(STATUS "** Boost Include dir: ${Boost_INCLUDE_DIR}")
+        message(STATUS "** Boost Libraries dir: ${Boost_LIBRARY_DIRS}")
+        message(STATUS "** Boost Libraries: ${Boost_LIBRARIES}")
         include_directories(${Boost_INCLUDE_DIRS})
-    endif()    
+    endif()
 else()
     #find_package(Boost 1.56 REQUIRED COMPONENTS atomic thread system chrono date_time log log_setup regex serialization filesystem locale iostreams) 
     set(Boost_INCLUDE_DIR    ${PROJECT_SOURCE_DIR}/bin/include)
@@ -68,16 +68,16 @@ else()
     include_directories(${Boost_INCLUDE_DIRS})
 endif()
 
-message(status "** Boost_INCLUDE_DIR: ${Boost_INCLUDE_DIR}")
-message(status "** Boost_LIBRARIES: ${Boost_LIBRARIES}")
+message(STATUS "** Boost_INCLUDE_DIR: ${Boost_INCLUDE_DIR}")
+message(STATUS "** Boost_LIBRARIES: ${Boost_LIBRARIES}")
 
 option(Libevent_USE_STATIC_LIBS "only find libevent static libs"   ON) # only find static libs
 if(WIN32)
     find_package(Libevent 2.0.22 REQUIRED COMPONENTS)
     if(LIBEVENT_FOUND)
         include_directories(${LIBEVENT_INCLUDE_DIRS})
-        message(status "** libevent Include dir: ${LIBEVENT_INCLUDE_DIR}")
-        message(status "** libevent Libraries: ${LIBEVENT_LIBRARIES}")
+        message(STATUS "** libevent Include dir: ${LIBEVENT_INCLUDE_DIR}")
+        message(STATUS "** libevent Libraries: ${LIBEVENT_LIBRARIES}")
     endif()
 else()
     #find_package(Libevent 2.0.22 REQUIRED COMPONENTS)
@@ -87,8 +87,8 @@ else()
     include_directories(${LIBEVENT_INCLUDE_DIRS})
 endif()
 
-message(status "** LIBEVENT_INCLUDE_DIR: ${LIBEVENT_INCLUDE_DIR}")
-message(status "** LIBEVENT_LIBRARIES: ${LIBEVENT_LIBRARIES}")
+message(STATUS "** LIBEVENT_INCLUDE_DIR: ${LIBEVENT_INCLUDE_DIR}")
+message(STATUS "** LIBEVENT_LIBRARIES: ${LIBEVENT_LIBRARIES}")
 
 option(JSONCPP_USE_STATIC_LIBS  "only find jsoncpp static libs"  ON) # only find static libs
 if(WIN32)
@@ -103,8 +103,8 @@ else()
     include_directories(${JSONCPP_INCLUDE_DIRS})
 endif()
 
-message(status "** JSONCPP_INCLUDE_DIRS: ${JSONCPP_INCLUDE_DIRS}")
-message(status "** JSONCPP_LIBRARIES: ${JSONCPP_LIBRARIES}")
+message(STATUS "** JSONCPP_INCLUDE_DIRS: ${JSONCPP_INCLUDE_DIRS}")
+message(STATUS "** JSONCPP_LIBRARIES: ${JSONCPP_LIBRARIES}")
 
 # put binaries in a different dir to make them easier to find.
 set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
@@ -170,7 +170,7 @@ endif()
 
 string(REPLACE ";" " " CMAKE_CXX_FLAGS "${CXX_FLAGS}")
 string(REPLACE ";" " " CMAKE_C_FLAGS "${C_FLAGS}")
- 
+
 set(CMAKE_CXX_FLAGS_DEBUG   "-O0 -DDEBUG")
 set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG")
 
@@ -223,6 +223,7 @@ add_subdirectory(example)
 option(RUN_UNIT_TEST "RUN_UNIT_TEST" OFF)
 
 if(RUN_UNIT_TEST)
-  message(status "** RUN_UNIT_TEST: ${RUN_UNIT_TEST} Do execution testing")
+  message(STATUS "** RUN_UNIT_TEST: ${RUN_UNIT_TEST} Do execution testing")
   add_subdirectory(test)
-endif()
\ No newline at end of file
+endif()
+
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index 9a394845..4051c772 100755
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -138,7 +138,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
   OffsetStore* m_pOffsetStore;
   Rebalance* m_pRebalance;
   PullAPIWrapper* m_pPullAPIWrapper;
-  ConsumeMsgService* m_consumerServeice;
+  ConsumeMsgService* m_consumerService;
   MQMessageListener* m_pMessageListener;
   int m_consumeMessageBatchMaxSize;
   int m_maxMsgCacheSize;
diff --git a/include/MQMessage.h b/include/MQMessage.h
index cc65fd12..e03c6645 100755
--- a/include/MQMessage.h
+++ b/include/MQMessage.h
@@ -1,121 +1,133 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __MESSAGE_H__
-#define __MESSAGE_H__
-
-#include <map>
-#include <sstream>
-#include <string>
-#include <vector>
-#include "RocketMQClient.h"
-
-
-namespace rocketmq {
-//<!***************************************************************************
-class ROCKETMQCLIENT_API MQMessage {
- public:
-  MQMessage();
-  MQMessage(const std::string& topic, const std::string& body);
-  MQMessage(const std::string& topic, const std::string& tags, const std::string& body);
-  MQMessage(const std::string& topic, const std::string& tags, const std::string& keys,
-            const std::string& body);
-  MQMessage(const std::string& topic, const std::string& tags, const std::string& keys,
-            const int flag, const std::string& body, bool waitStoreMsgOK);
-
-  virtual ~MQMessage();
-  MQMessage(const MQMessage& other);
-  MQMessage& operator=(const MQMessage& other);
-
-  void setProperty(const std::string& name, const std::string& value) ;
-  const std::string & getProperty(const std::string& name) const;
-
-  const std::string &getTopic() const;
-  void setTopic(const std::string& topic);
-  void setTopic(const char* body, int len);
-
-  const std::string &getTags() const;
-  void setTags(const std::string& tags);
-
-  const std::string &getKeys() const;
-  void setKeys(const std::string& keys);
-  void setKeys(const std::vector<std::string>& keys);
-
-  int getDelayTimeLevel() const;
-  void setDelayTimeLevel(int level);
-
-  bool isWaitStoreMsgOK();
-  void setWaitStoreMsgOK(bool waitStoreMsgOK);
-
-  int getFlag() const;
-  void setFlag(int flag);
-
-  const std::string &getBody() const;
-  void setBody(const char* body, int len);
-  void setBody(const std::string& body);
-
-  std::map<std::string, std::string> getProperties() const;
-  void setProperties(std::map<std::string, std::string>& properties);
-
-  const std::string toString() const {
-    std::stringstream ss;
-    ss << "Message [topic=" << m_topic << ", flag=" << m_flag
-       << ", tag=" << getTags() << "]";
-    return ss.str();
-  }
-
- protected:
-  void Init(const std::string& topic, const std::string& tags, const std::string& keys,
-            const int flag, const std::string& body, bool waitStoreMsgOK);
-
- public:
-  static const std::string PROPERTY_KEYS;
-  static const std::string PROPERTY_TAGS;
-  static const std::string PROPERTY_WAIT_STORE_MSG_OK;
-  static const std::string PROPERTY_DELAY_TIME_LEVEL;
-  static const std::string PROPERTY_RETRY_TOPIC;
-  static const std::string PROPERTY_REAL_TOPIC;
-  static const std::string PROPERTY_REAL_QUEUE_ID;
-  static const std::string PROPERTY_TRANSACTION_PREPARED;
-  static const std::string PROPERTY_PRODUCER_GROUP;
-  static const std::string PROPERTY_MIN_OFFSET;
-  static const std::string PROPERTY_MAX_OFFSET;
-  
-  static const std::string PROPERTY_BUYER_ID;
-  static const std::string PROPERTY_ORIGIN_MESSAGE_ID;
-  static const std::string PROPERTY_TRANSFER_FLAG;
-  static const std::string PROPERTY_CORRECTION_FLAG;
-  static const std::string PROPERTY_MQ2_FLAG;
-  static const std::string PROPERTY_RECONSUME_TIME;
-  static const std::string PROPERTY_MSG_REGION;
-  static const std::string PROPERTY_TRACE_SWITCH;
-  static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX;
-  static const std::string PROPERTY_MAX_RECONSUME_TIMES;
-  static const std::string PROPERTY_CONSUME_START_TIMESTAMP;
-  static const std::string PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET;
-  static const std::string PROPERTY_TRANSACTION_CHECK_TIMES;
-  static const std::string PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS;
-
-  static const std::string KEY_SEPARATOR;
- private:
-  std::string m_topic;
-  int m_flag;
-  std::string m_body;
-  std::map<std::string, std::string> m_properties;
-};
-//<!***************************************************************************
-}  //<!end namespace;
-#endif
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MESSAGE_H__
+#define __MESSAGE_H__
+
+#include <map>
+#include <sstream>
+#include <string>
+#include <vector>
+#include "RocketMQClient.h"
+
+
+namespace rocketmq {
+//<!***************************************************************************
+class ROCKETMQCLIENT_API MQMessage {
+ public:
+  MQMessage();
+  MQMessage(const std::string& topic, const std::string& body);
+  MQMessage(const std::string& topic, const std::string& tags, const std::string& body);
+  MQMessage(const std::string& topic, const std::string& tags, const std::string& keys,
+            const std::string& body);
+  MQMessage(const std::string& topic, const std::string& tags, const std::string& keys,
+            const int flag, const std::string& body, bool waitStoreMsgOK);
+
+  virtual ~MQMessage();
+  MQMessage(const MQMessage& other);
+  MQMessage& operator=(const MQMessage& other);
+
+  void setProperty(const std::string& name, const std::string& value) ;
+  const std::string & getProperty(const std::string& name) const;
+
+  const std::string &getTopic() const;
+  void setTopic(const std::string& topic);
+  void setTopic(const char* body, int len);
+
+  const std::string &getTags() const;
+  void setTags(const std::string& tags);
+
+  const std::string &getKeys() const;
+  void setKeys(const std::string& keys);
+  void setKeys(const std::vector<std::string>& keys);
+
+  int getDelayTimeLevel() const;
+  void setDelayTimeLevel(int level);
+
+  bool isWaitStoreMsgOK();
+  void setWaitStoreMsgOK(bool waitStoreMsgOK);
+
+  int getFlag() const;
+  void setFlag(int flag);
+
+  int getSysFlag() const;
+  void setSysFlag(int sysFlag);
+
+  const std::string &getBody() const;
+
+  void setBody(const char* body, int len);
+  void setBody(const std::string& body);
+
+  std::map<std::string, std::string> getProperties() const;
+  void setProperties(std::map<std::string, std::string>& properties);
+
+  const std::string toString() const {
+    std::stringstream ss;
+    ss << "Message [topic=" << m_topic << ", flag=" << m_flag
+       << ", tag=" << getTags() << "]";
+    return ss.str();
+  }
+
+ protected:
+  friend class MQDecoder;
+  void setPropertyInternal(const std::string& name, const std::string& value);
+  void setPropertiesInternal(std::map<std::string, std::string>& properties);
+
+  void Init(const std::string& topic, const std::string& tags, const std::string& keys,
+            const int flag, const std::string& body, bool waitStoreMsgOK);
+
+ public:
+  static const std::string PROPERTY_KEYS;
+  static const std::string PROPERTY_TAGS;
+  static const std::string PROPERTY_WAIT_STORE_MSG_OK;
+  static const std::string PROPERTY_DELAY_TIME_LEVEL;
+  static const std::string PROPERTY_RETRY_TOPIC;
+  static const std::string PROPERTY_REAL_TOPIC;
+  static const std::string PROPERTY_REAL_QUEUE_ID;
+  static const std::string PROPERTY_TRANSACTION_PREPARED;
+  static const std::string PROPERTY_PRODUCER_GROUP;
+  static const std::string PROPERTY_MIN_OFFSET;
+  static const std::string PROPERTY_MAX_OFFSET;
+  
+  static const std::string PROPERTY_BUYER_ID;
+  static const std::string PROPERTY_ORIGIN_MESSAGE_ID;
+  static const std::string PROPERTY_TRANSFER_FLAG;
+  static const std::string PROPERTY_CORRECTION_FLAG;
+  static const std::string PROPERTY_MQ2_FLAG;
+  static const std::string PROPERTY_RECONSUME_TIME;
+  static const std::string PROPERTY_MSG_REGION;
+  static const std::string PROPERTY_TRACE_SWITCH;
+  static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX;
+  static const std::string PROPERTY_MAX_RECONSUME_TIMES;
+  static const std::string PROPERTY_CONSUME_START_TIMESTAMP;
+  static const std::string PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET;
+  static const std::string PROPERTY_TRANSACTION_CHECK_TIMES;
+  static const std::string PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS;
+
+  static const std::string KEY_SEPARATOR;
+
+ protected:
+  int m_sysFlag;
+
+ private:
+  std::string m_topic;
+  int m_flag;
+  std::string m_body;
+  std::map<std::string, std::string> m_properties;
+};
+//<!***************************************************************************
+}  //<!end namespace;
+#endif
diff --git a/include/MQMessageExt.h b/include/MQMessageExt.h
index d2119c71..fb48daef 100755
--- a/include/MQMessageExt.h
+++ b/include/MQMessageExt.h
@@ -64,9 +64,6 @@ class ROCKETMQCLIENT_API MQMessageExt : public MQMessage {
   const std::string& getOffsetMsgId() const;
   void setOffsetMsgId(const std::string& offsetMsgId);
 
-  int getSysFlag() const;
-  void setSysFlag(int sysFlag);
-
   int getBodyCRC() const;
   void setBodyCRC(int bodyCRC);
 
@@ -108,7 +105,6 @@ class ROCKETMQCLIENT_API MQMessageExt : public MQMessage {
   int64 m_preparedTransactionOffset;
   int m_queueId;
   int m_storeSize;
-  int m_sysFlag;
   int m_bodyCRC;
   int m_reconsumeTimes;
   sockaddr m_bornHost;
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index afa44f0f..70c9b888 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -230,7 +230,7 @@ DefaultMQPushConsumer::~DefaultMQPushConsumer() {
   deleteAndZero(m_pRebalance);
   deleteAndZero(m_pOffsetStore);
   deleteAndZero(m_pPullAPIWrapper);
-  deleteAndZero(m_consumerServeice);
+  deleteAndZero(m_consumerService);
   PullMAP::iterator it = m_PullCallback.begin();
   for (; it != m_PullCallback.end(); ++it) {
     deleteAndZero(it->second);
@@ -308,14 +308,14 @@ void DefaultMQPushConsumer::start() {
         if (m_pMessageListener->getMessageListenerType() ==
             messageListenerOrderly) {
           LOG_INFO("start orderly consume service:%s", getGroupName().c_str());
-          m_consumerServeice = new ConsumeMessageOrderlyService(
+          m_consumerService = new ConsumeMessageOrderlyService(
               this, m_consumeThreadCount, m_pMessageListener);
         } else  // for backward compatible, defaultly and concurrently listeners
                 // are allocating ConsumeMessageConcurrentlyService
         {
           LOG_INFO("start concurrently consume service:%s",
                    getGroupName().c_str());
-          m_consumerServeice = new ConsumeMessageConcurrentlyService(
+          m_consumerService = new ConsumeMessageConcurrentlyService(
               this, m_consumeThreadCount, m_pMessageListener);
         }
       }
@@ -354,7 +354,7 @@ void DefaultMQPushConsumer::start() {
         bStartFailed = true;
         errorMsg = std::string(e.what());
       }
-      m_consumerServeice->start();
+      m_consumerService->start();
 
       getFactory()->start();
 
@@ -389,7 +389,7 @@ void DefaultMQPushConsumer::shutdown() {
       m_pullmsgQueue->close();
       m_pullmsgThread->interrupt();
       m_pullmsgThread->join();
-      m_consumerServeice->shutdown();
+      m_consumerService->shutdown();
       persistConsumerOffset();
       shutdownAsyncPullCallBack();  // delete aync pullMsg resources
       getFactory()->unregisterConsumer(this);
@@ -420,7 +420,7 @@ MessageListenerType DefaultMQPushConsumer::getMessageListenerType() {
 }
 
 ConsumeMsgService* DefaultMQPushConsumer::getConsumerMsgService() const {
-  return m_consumerServeice;
+  return m_consumerService;
 }
 
 OffsetStore* DefaultMQPushConsumer::getOffsetStore() const {
@@ -572,7 +572,7 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* request) {
   }
 
   MQMessageQueue& messageQueue = request->m_messageQueue;
-  if (m_consumerServeice->getConsumeMsgSerivceListenerType() ==
+  if (m_consumerService->getConsumeMsgSerivceListenerType() ==
       messageListenerOrderly) {
     if (!request->isLocked() || request->isLockExpired()) {
       if (!m_pRebalance->lock(messageQueue)) {
@@ -646,8 +646,8 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* request) {
           request->setNextOffset(pullResult.nextBeginOffset);
           request->putMessage(pullResult.msgFoundList);
 
-          m_consumerServeice->submitConsumeRequest(request,
-                                                   pullResult.msgFoundList);
+          m_consumerService->submitConsumeRequest(request,
+                                                  pullResult.msgFoundList);
           producePullMsgTask(request);
 
           LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld",
@@ -760,7 +760,7 @@ void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) {
   }
 
   MQMessageQueue& messageQueue = request->m_messageQueue;
-  if (m_consumerServeice->getConsumeMsgSerivceListenerType() ==
+  if (m_consumerService->getConsumeMsgSerivceListenerType() ==
       messageListenerOrderly) {
     if (!request->isLocked() || request->isLockExpired()) {
       if (!m_pRebalance->lock(messageQueue)) {
@@ -888,7 +888,7 @@ int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {
 ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() {
   ConsumerRunningInfo* info = new ConsumerRunningInfo();
   if (info) {
-    if (m_consumerServeice->getConsumeMsgSerivceListenerType() ==
+    if (m_consumerService->getConsumeMsgSerivceListenerType() ==
         messageListenerOrderly)
       info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
     else
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index 2c3c9c91..0715942a 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -175,7 +175,7 @@ int SetProducerInstanceName(CProducer *producer, const char *instanceName) {
     if (producer == NULL) {
         return NULL_POINTER;
     }
-    ((DefaultMQProducer *) producer)->setGroupName(instanceName);
+    ((DefaultMQProducer *) producer)->setInstanceName(instanceName);
     return OK;
 }
 int SetProducerSessionCredentials(CProducer *producer, const char *accessKey, const char *secretKey,
diff --git a/src/message/MQDecoder.cpp b/src/message/MQDecoder.cpp
index 489cd805..d4f0dd4d 100755
--- a/src/message/MQDecoder.cpp
+++ b/src/message/MQDecoder.cpp
@@ -191,7 +191,7 @@ MQMessageExt* MQDecoder::decode(MemoryInputStream& byteBuffer, bool readBody) {
 
     map<string, string> propertiesMap;
     string2messageProperties(propertiesString, propertiesMap);
-    msgExt->setProperties(propertiesMap);
+    msgExt->setPropertiesInternal(propertiesMap);
     propertiesMap.clear();
   }
 
diff --git a/src/message/MQMessage.cpp b/src/message/MQMessage.cpp
index c8bae4b5..7fb14a7b 100755
--- a/src/message/MQMessage.cpp
+++ b/src/message/MQMessage.cpp
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 #include "MQMessage.h"
+#include "MessageSysFlag.h"
 #include "UtilAll.h"
 
 namespace rocketmq {
@@ -76,6 +77,7 @@ MQMessage::MQMessage(const MQMessage& other) {
   m_body = other.m_body;
   m_topic = other.m_topic;
   m_flag = other.m_flag;
+  m_sysFlag = other.m_sysFlag;
   m_properties = other.m_properties;
 }
 
@@ -84,12 +86,24 @@ MQMessage& MQMessage::operator=(const MQMessage& other) {
     m_body = other.m_body;
     m_topic = other.m_topic;
     m_flag = other.m_flag;
+    m_sysFlag = other.m_sysFlag;
     m_properties = other.m_properties;
   }
   return *this;
 }
 
 void MQMessage::setProperty(const string& name, const string& value) {
+  if (PROPERTY_TRANSACTION_PREPARED == name) {
+    if (!value.empty() && value == "true") {
+      m_sysFlag |= MessageSysFlag::TransactionPreparedType;
+    } else {
+      m_sysFlag &= ~MessageSysFlag::TransactionPreparedType;
+    }
+  }
+  m_properties[name] = value;
+}
+
+void MQMessage::setPropertyInternal(const string& name, const string& value) {
   m_properties[name] = value;
 }
 
@@ -114,13 +128,13 @@ void MQMessage::setTopic(const char* body, int len) {
 const string& MQMessage::getTags() const { return getProperty(PROPERTY_TAGS); }
 
 void MQMessage::setTags(const string& tags) {
-  setProperty(PROPERTY_TAGS, tags);
+  setPropertyInternal(PROPERTY_TAGS, tags);
 }
 
 const string& MQMessage::getKeys() const { return getProperty(PROPERTY_KEYS); }
 
 void MQMessage::setKeys(const string& keys) {
-  setProperty(PROPERTY_KEYS, keys);
+  setPropertyInternal(PROPERTY_KEYS, keys);
 }
 
 void MQMessage::setKeys(const vector<string>& keys) {
@@ -153,7 +167,7 @@ void MQMessage::setDelayTimeLevel(int level) {
   char tmp[16];
   sprintf(tmp, "%d", level);
 
-  setProperty(PROPERTY_DELAY_TIME_LEVEL, tmp);
+  setPropertyInternal(PROPERTY_DELAY_TIME_LEVEL, tmp);
 }
 
 bool MQMessage::isWaitStoreMsgOK() {
@@ -167,9 +181,9 @@ bool MQMessage::isWaitStoreMsgOK() {
 
 void MQMessage::setWaitStoreMsgOK(bool waitStoreMsgOK) {
   if (waitStoreMsgOK) {
-    setProperty(PROPERTY_WAIT_STORE_MSG_OK, "true");
+    setPropertyInternal(PROPERTY_WAIT_STORE_MSG_OK, "true");
   } else {
-    setProperty(PROPERTY_WAIT_STORE_MSG_OK, "false");
+    setPropertyInternal(PROPERTY_WAIT_STORE_MSG_OK, "false");
   }
 }
 
@@ -177,6 +191,10 @@ int MQMessage::getFlag() const { return m_flag; }
 
 void MQMessage::setFlag(int flag) { m_flag = flag; }
 
+int MQMessage::getSysFlag() const { return m_sysFlag; }
+
+void MQMessage::setSysFlag(int sysFlag) { m_sysFlag = sysFlag; }
+
 const string& MQMessage::getBody() const { return m_body; }
 
 void MQMessage::setBody(const char* body, int len) {
@@ -193,6 +211,20 @@ map<string, string> MQMessage::getProperties() const { return m_properties; }
 
 void MQMessage::setProperties(map<string, string>& properties) {
   m_properties = properties;
+
+  map<string, string>::const_iterator it = m_properties.find(PROPERTY_TRANSACTION_PREPARED);
+  if (it != m_properties.end()) {
+    string tranMsg = it->second;
+    if (!tranMsg.empty() && tranMsg == "true") {
+      m_sysFlag |= MessageSysFlag::TransactionPreparedType;
+    } else {
+      m_sysFlag &= ~MessageSysFlag::TransactionPreparedType;
+    }
+  }
+}
+
+void MQMessage::setPropertiesInternal(map<string, string>& properties) {
+  m_properties = properties;
 }
 
 void MQMessage::Init(const string& topic, const string& tags,
@@ -200,6 +232,7 @@ void MQMessage::Init(const string& topic, const string& tags,
                      bool waitStoreMsgOK) {
   m_topic = topic;
   m_flag = flag;
+  m_sysFlag = 0;
   m_body = body;
 
   if (tags.length() > 0) {
diff --git a/src/message/MQMessageExt.cpp b/src/message/MQMessageExt.cpp
index 3f2eae83..43f4bf05 100755
--- a/src/message/MQMessageExt.cpp
+++ b/src/message/MQMessageExt.cpp
@@ -29,7 +29,6 @@ MQMessageExt::MQMessageExt()
       m_preparedTransactionOffset(0),
       m_queueId(0),
       m_storeSize(0),
-      m_sysFlag(0),
       m_bodyCRC(0),
       m_reconsumeTimes(3),
       m_msgId("") {}
@@ -44,7 +43,6 @@ MQMessageExt::MQMessageExt(int queueId, int64 bornTimestamp, sockaddr bornHost,
       m_preparedTransactionOffset(0),
       m_queueId(queueId),
       m_storeSize(0),
-      m_sysFlag(0),
       m_bodyCRC(0),
       m_reconsumeTimes(3),
       m_bornHost(bornHost),
@@ -101,10 +99,6 @@ const string& MQMessageExt::getOffsetMsgId() const { return m_offsetMsgId; }
 
 void MQMessageExt::setOffsetMsgId(const string& offsetMsgId) { m_offsetMsgId = offsetMsgId; }
 
-int MQMessageExt::getSysFlag() const { return m_sysFlag; }
-
-void MQMessageExt::setSysFlag(int sysFlag) { m_sysFlag = sysFlag; }
-
 int MQMessageExt::getBodyCRC() const { return m_bodyCRC; }
 
 void MQMessageExt::setBodyCRC(int bodyCRC) { m_bodyCRC = bodyCRC; }
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 6811590f..4c39bd24 100755
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -26,7 +26,6 @@
 #include "MQClientManager.h"
 #include "MQDecoder.h"
 #include "MQProtos.h"
-#include "MessageSysFlag.h"
 #include "TopicPublishInfo.h"
 #include "Validators.h"
 #include "StringIdMaker.h"
@@ -339,17 +338,8 @@ SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg,
 	  msg.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id);
 
       LOG_DEBUG("produce before:%s to %s", msg.toString().c_str(), mq.toString().c_str());
-	  
-      int sysFlag = 0;
-      if (tryToCompressMessage(msg)) {
-        sysFlag |= MessageSysFlag::CompressedFlag;
-      }
 
-      string tranMsg =
-          msg.getProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED);
-      if (!tranMsg.empty() && tranMsg == "true") {
-        sysFlag |= MessageSysFlag::TransactionPreparedType;
-      }
+      tryToCompressMessage(msg);
 
       SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader();
       requestHeader->producerGroup = getGroupName();
@@ -357,7 +347,7 @@ SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg,
       requestHeader->defaultTopic = DEFAULT_TOPIC;
       requestHeader->defaultTopicQueueNums = 4;
       requestHeader->queueId = (mq.getQueueId());
-      requestHeader->sysFlag = (sysFlag);
+      requestHeader->sysFlag = (msg.getSysFlag());
       requestHeader->bornTimestamp = UtilAll::currentTimeMillis();
       requestHeader->flag = (msg.getFlag());
       requestHeader->properties =
@@ -471,11 +461,17 @@ SendResult DefaultMQProducer::sendAutoRetrySelectImpl(
 }
 
 bool DefaultMQProducer::tryToCompressMessage(MQMessage& msg) {
+  int sysFlag = msg.getSysFlag();
+  if ((sysFlag & MessageSysFlag::CompressedFlag) == MessageSysFlag::CompressedFlag) {
+    return true;
+  }
+
   string body = msg.getBody();
   if ((int)body.length() >= getCompressMsgBodyOverHowmuch()) {
     string outBody;
     if (UtilAll::deflate(body, outBody, getCompressLevel())) {
       msg.setBody(outBody);
+      msg.setSysFlag(sysFlag | MessageSysFlag::CompressedFlag);
       return true;
     }
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services