You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/04/07 08:58:55 UTC

[rocketmq-client-cpp] branch re_dev updated: fix: return nullptr when decode message failed

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/re_dev by this push:
     new 4860dd6  fix: return nullptr when decode message failed
4860dd6 is described below

commit 4860dd650b8f6a3ceb909d6ba5de1c97bab8599b
Author: James Yin <yw...@hotmail.com>
AuthorDate: Wed Apr 7 16:57:03 2021 +0800

    fix: return nullptr when decode message failed
---
 src/message/MessageDecoder.cpp | 214 +++++++++++++++++++++--------------------
 1 file changed, 110 insertions(+), 104 deletions(-)

diff --git a/src/message/MessageDecoder.cpp b/src/message/MessageDecoder.cpp
index 675bac7..fdceea2 100644
--- a/src/message/MessageDecoder.cpp
+++ b/src/message/MessageDecoder.cpp
@@ -17,6 +17,7 @@
 #include "MessageDecoder.h"
 
 #include <algorithm>  // std::move
+#include <exception>  // std::exception
 #include <sstream>    // std::stringstream
 
 #ifndef WIN32
@@ -87,119 +88,124 @@ MessageExtPtr MessageDecoder::decode(ByteBuffer& byteBuffer, bool readBody) {
 }
 
 MessageExtPtr MessageDecoder::decode(ByteBuffer& byteBuffer, bool readBody, bool deCompressBody, bool isClient) {
-  auto msgExt = isClient ? std::make_shared<MessageClientExtImpl>() : std::make_shared<MessageExtImpl>();
-
-  // 1 TOTALSIZE
-  int32_t storeSize = byteBuffer.getInt();
-  msgExt->set_store_size(storeSize);
-
-  // 2 MAGICCODE sizeof(int)
-  byteBuffer.getInt();
-
-  // 3 BODYCRC
-  int32_t bodyCRC = byteBuffer.getInt();
-  msgExt->set_body_crc(bodyCRC);
-
-  // 4 QUEUEID
-  int32_t queueId = byteBuffer.getInt();
-  msgExt->set_queue_id(queueId);
-
-  // 5 FLAG
-  int32_t flag = byteBuffer.getInt();
-  msgExt->set_flag(flag);
-
-  // 6 QUEUEOFFSET
-  int64_t queueOffset = byteBuffer.getLong();
-  msgExt->set_queue_offset(queueOffset);
-
-  // 7 PHYSICALOFFSET
-  int64_t physicOffset = byteBuffer.getLong();
-  msgExt->set_commit_log_offset(physicOffset);
-
-  // 8 SYSFLAG
-  int32_t sysFlag = byteBuffer.getInt();
-  msgExt->set_sys_flag(sysFlag);
-
-  // 9 BORNTIMESTAMP
-  int64_t bornTimeStamp = byteBuffer.getLong();
-  msgExt->set_born_timestamp(bornTimeStamp);
-
-  // 10 BORNHOST
-  int bornHostLength = (sysFlag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize;
-  ByteArray bornHost(bornHostLength);
-  byteBuffer.get(bornHost, 0, bornHostLength);
-  int32_t bornPort = byteBuffer.getInt();
-  msgExt->set_born_host(IPPortToSockaddr(bornHost, bornPort));
-
-  // 11 STORETIMESTAMP
-  int64_t storeTimestamp = byteBuffer.getLong();
-  msgExt->set_store_timestamp(storeTimestamp);
-
-  // 12 STOREHOST
-  int storeHostLength = (sysFlag & MessageSysFlag::STOREHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize;
-  ByteArray storeHost(storeHostLength);
-  byteBuffer.get(storeHost, 0, storeHostLength);
-  int32_t storePort = byteBuffer.getInt();
-  msgExt->set_store_host(IPPortToSockaddr(storeHost, storePort));
-
-  // 13 RECONSUMETIMES
-  int32_t reconsumeTimes = byteBuffer.getInt();
-  msgExt->set_reconsume_times(reconsumeTimes);
-
-  // 14 Prepared Transaction Offset
-  int64_t preparedTransactionOffset = byteBuffer.getLong();
-  msgExt->set_prepared_transaction_offset(preparedTransactionOffset);
-
-  // 15 BODY
-  int uncompress_failed = false;
-  int32_t bodyLen = byteBuffer.getInt();
-  if (bodyLen > 0) {
-    if (readBody) {
-      ByteArray body(byteBuffer.array() + byteBuffer.arrayOffset() + byteBuffer.position(), bodyLen);
-      byteBuffer.position(byteBuffer.position() + bodyLen);
-
-      // decompress body
-      if (deCompressBody && (sysFlag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG) {
-        std::string origin_body;
-        if (UtilAll::inflate(body, origin_body)) {
-          msgExt->set_body(std::move(origin_body));
+  try {
+    auto msgExt = isClient ? std::make_shared<MessageClientExtImpl>() : std::make_shared<MessageExtImpl>();
+
+    // 1 TOTALSIZE
+    int32_t storeSize = byteBuffer.getInt();
+    msgExt->set_store_size(storeSize);
+
+    // 2 MAGICCODE sizeof(int)
+    byteBuffer.getInt();
+
+    // 3 BODYCRC
+    int32_t bodyCRC = byteBuffer.getInt();
+    msgExt->set_body_crc(bodyCRC);
+
+    // 4 QUEUEID
+    int32_t queueId = byteBuffer.getInt();
+    msgExt->set_queue_id(queueId);
+
+    // 5 FLAG
+    int32_t flag = byteBuffer.getInt();
+    msgExt->set_flag(flag);
+
+    // 6 QUEUEOFFSET
+    int64_t queueOffset = byteBuffer.getLong();
+    msgExt->set_queue_offset(queueOffset);
+
+    // 7 PHYSICALOFFSET
+    int64_t physicOffset = byteBuffer.getLong();
+    msgExt->set_commit_log_offset(physicOffset);
+
+    // 8 SYSFLAG
+    int32_t sysFlag = byteBuffer.getInt();
+    msgExt->set_sys_flag(sysFlag);
+
+    // 9 BORNTIMESTAMP
+    int64_t bornTimeStamp = byteBuffer.getLong();
+    msgExt->set_born_timestamp(bornTimeStamp);
+
+    // 10 BORNHOST
+    int bornHostLength = (sysFlag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize;
+    ByteArray bornHost(bornHostLength);
+    byteBuffer.get(bornHost, 0, bornHostLength);
+    int32_t bornPort = byteBuffer.getInt();
+    msgExt->set_born_host(IPPortToSockaddr(bornHost, bornPort));
+
+    // 11 STORETIMESTAMP
+    int64_t storeTimestamp = byteBuffer.getLong();
+    msgExt->set_store_timestamp(storeTimestamp);
+
+    // 12 STOREHOST
+    int storeHostLength = (sysFlag & MessageSysFlag::STOREHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize;
+    ByteArray storeHost(storeHostLength);
+    byteBuffer.get(storeHost, 0, storeHostLength);
+    int32_t storePort = byteBuffer.getInt();
+    msgExt->set_store_host(IPPortToSockaddr(storeHost, storePort));
+
+    // 13 RECONSUMETIMES
+    int32_t reconsumeTimes = byteBuffer.getInt();
+    msgExt->set_reconsume_times(reconsumeTimes);
+
+    // 14 Prepared Transaction Offset
+    int64_t preparedTransactionOffset = byteBuffer.getLong();
+    msgExt->set_prepared_transaction_offset(preparedTransactionOffset);
+
+    // 15 BODY
+    int uncompress_failed = false;
+    int32_t bodyLen = byteBuffer.getInt();
+    if (bodyLen > 0) {
+      if (readBody) {
+        ByteArray body(byteBuffer.array() + byteBuffer.arrayOffset() + byteBuffer.position(), bodyLen);
+        byteBuffer.position(byteBuffer.position() + bodyLen);
+
+        // decompress body
+        if (deCompressBody && (sysFlag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG) {
+          std::string origin_body;
+          if (UtilAll::inflate(body, origin_body)) {
+            msgExt->set_body(std::move(origin_body));
+          } else {
+            uncompress_failed = true;
+          }
         } else {
-          uncompress_failed = true;
+          msgExt->set_body(std::string(body.array(), body.size()));
         }
       } else {
-        msgExt->set_body(std::string(body.array(), body.size()));
+        // skip body
+        byteBuffer.position(byteBuffer.position() + bodyLen);
       }
-    } else {
-      // skip body
-      byteBuffer.position(byteBuffer.position() + bodyLen);
     }
-  }
 
-  // 16 TOPIC
-  int8_t topicLen = byteBuffer.get();
-  ByteArray topic(topicLen);
-  byteBuffer.get(topic);
-  msgExt->set_topic(topic.array(), topic.size());
-
-  // 17 properties
-  int16_t propertiesLen = byteBuffer.getShort();
-  if (propertiesLen > 0) {
-    ByteArray properties(propertiesLen);
-    byteBuffer.get(properties);
-    std::string propertiesString(properties.array(), properties.size());
-    std::map<std::string, std::string> propertiesMap = string2messageProperties(propertiesString);
-    MessageAccessor::setProperties(*msgExt, std::move(propertiesMap));
-  }
+    // 16 TOPIC
+    int8_t topicLen = byteBuffer.get();
+    ByteArray topic(topicLen);
+    byteBuffer.get(topic);
+    msgExt->set_topic(topic.array(), topic.size());
+
+    // 17 properties
+    int16_t propertiesLen = byteBuffer.getShort();
+    if (propertiesLen > 0) {
+      ByteArray properties(propertiesLen);
+      byteBuffer.get(properties);
+      std::string propertiesString(properties.array(), properties.size());
+      std::map<std::string, std::string> propertiesMap = string2messageProperties(propertiesString);
+      MessageAccessor::setProperties(*msgExt, std::move(propertiesMap));
+    }
 
-  // 18 msg ID
-  std::string msgId = createMessageId(msgExt->store_host(), (int64_t)msgExt->commit_log_offset());
-  msgExt->MessageExtImpl::set_msg_id(msgId);
+    // 18 msg ID
+    std::string msgId = createMessageId(msgExt->store_host(), (int64_t)msgExt->commit_log_offset());
+    msgExt->MessageExtImpl::set_msg_id(msgId);
 
-  if (uncompress_failed) {
-    LOG_WARN_NEW("can not uncompress message, id:{}", msgExt->msg_id());
-  }
+    if (uncompress_failed) {
+      LOG_WARN_NEW("can not uncompress message, id:{}", msgExt->msg_id());
+    }
 
-  return msgExt;
+    return msgExt;
+  } catch (const std::exception& e) {
+    byteBuffer.position(byteBuffer.limit());
+    return nullptr;
+  }
 }
 
 std::vector<MessageExtPtr> MessageDecoder::decodes(ByteBuffer& byteBuffer) {