You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/04/07 08:58:55 UTC
[rocketmq-client-cpp] branch re_dev updated: fix: return nullptr
when decode message failed
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/re_dev by this push:
new 4860dd6 fix: return nullptr when decode message failed
4860dd6 is described below
commit 4860dd650b8f6a3ceb909d6ba5de1c97bab8599b
Author: James Yin <yw...@hotmail.com>
AuthorDate: Wed Apr 7 16:57:03 2021 +0800
fix: return nullptr when decode message failed
---
src/message/MessageDecoder.cpp | 214 +++++++++++++++++++++--------------------
1 file changed, 110 insertions(+), 104 deletions(-)
diff --git a/src/message/MessageDecoder.cpp b/src/message/MessageDecoder.cpp
index 675bac7..fdceea2 100644
--- a/src/message/MessageDecoder.cpp
+++ b/src/message/MessageDecoder.cpp
@@ -17,6 +17,7 @@
#include "MessageDecoder.h"
#include <algorithm> // std::move
+#include <exception> // std::exception
#include <sstream> // std::stringstream
#ifndef WIN32
@@ -87,119 +88,124 @@ MessageExtPtr MessageDecoder::decode(ByteBuffer& byteBuffer, bool readBody) {
}
MessageExtPtr MessageDecoder::decode(ByteBuffer& byteBuffer, bool readBody, bool deCompressBody, bool isClient) {
- auto msgExt = isClient ? std::make_shared<MessageClientExtImpl>() : std::make_shared<MessageExtImpl>();
-
- // 1 TOTALSIZE
- int32_t storeSize = byteBuffer.getInt();
- msgExt->set_store_size(storeSize);
-
- // 2 MAGICCODE sizeof(int)
- byteBuffer.getInt();
-
- // 3 BODYCRC
- int32_t bodyCRC = byteBuffer.getInt();
- msgExt->set_body_crc(bodyCRC);
-
- // 4 QUEUEID
- int32_t queueId = byteBuffer.getInt();
- msgExt->set_queue_id(queueId);
-
- // 5 FLAG
- int32_t flag = byteBuffer.getInt();
- msgExt->set_flag(flag);
-
- // 6 QUEUEOFFSET
- int64_t queueOffset = byteBuffer.getLong();
- msgExt->set_queue_offset(queueOffset);
-
- // 7 PHYSICALOFFSET
- int64_t physicOffset = byteBuffer.getLong();
- msgExt->set_commit_log_offset(physicOffset);
-
- // 8 SYSFLAG
- int32_t sysFlag = byteBuffer.getInt();
- msgExt->set_sys_flag(sysFlag);
-
- // 9 BORNTIMESTAMP
- int64_t bornTimeStamp = byteBuffer.getLong();
- msgExt->set_born_timestamp(bornTimeStamp);
-
- // 10 BORNHOST
- int bornHostLength = (sysFlag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize;
- ByteArray bornHost(bornHostLength);
- byteBuffer.get(bornHost, 0, bornHostLength);
- int32_t bornPort = byteBuffer.getInt();
- msgExt->set_born_host(IPPortToSockaddr(bornHost, bornPort));
-
- // 11 STORETIMESTAMP
- int64_t storeTimestamp = byteBuffer.getLong();
- msgExt->set_store_timestamp(storeTimestamp);
-
- // 12 STOREHOST
- int storeHostLength = (sysFlag & MessageSysFlag::STOREHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize;
- ByteArray storeHost(storeHostLength);
- byteBuffer.get(storeHost, 0, storeHostLength);
- int32_t storePort = byteBuffer.getInt();
- msgExt->set_store_host(IPPortToSockaddr(storeHost, storePort));
-
- // 13 RECONSUMETIMES
- int32_t reconsumeTimes = byteBuffer.getInt();
- msgExt->set_reconsume_times(reconsumeTimes);
-
- // 14 Prepared Transaction Offset
- int64_t preparedTransactionOffset = byteBuffer.getLong();
- msgExt->set_prepared_transaction_offset(preparedTransactionOffset);
-
- // 15 BODY
- int uncompress_failed = false;
- int32_t bodyLen = byteBuffer.getInt();
- if (bodyLen > 0) {
- if (readBody) {
- ByteArray body(byteBuffer.array() + byteBuffer.arrayOffset() + byteBuffer.position(), bodyLen);
- byteBuffer.position(byteBuffer.position() + bodyLen);
-
- // decompress body
- if (deCompressBody && (sysFlag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG) {
- std::string origin_body;
- if (UtilAll::inflate(body, origin_body)) {
- msgExt->set_body(std::move(origin_body));
+ try {
+ auto msgExt = isClient ? std::make_shared<MessageClientExtImpl>() : std::make_shared<MessageExtImpl>();
+
+ // 1 TOTALSIZE
+ int32_t storeSize = byteBuffer.getInt();
+ msgExt->set_store_size(storeSize);
+
+ // 2 MAGICCODE sizeof(int)
+ byteBuffer.getInt();
+
+ // 3 BODYCRC
+ int32_t bodyCRC = byteBuffer.getInt();
+ msgExt->set_body_crc(bodyCRC);
+
+ // 4 QUEUEID
+ int32_t queueId = byteBuffer.getInt();
+ msgExt->set_queue_id(queueId);
+
+ // 5 FLAG
+ int32_t flag = byteBuffer.getInt();
+ msgExt->set_flag(flag);
+
+ // 6 QUEUEOFFSET
+ int64_t queueOffset = byteBuffer.getLong();
+ msgExt->set_queue_offset(queueOffset);
+
+ // 7 PHYSICALOFFSET
+ int64_t physicOffset = byteBuffer.getLong();
+ msgExt->set_commit_log_offset(physicOffset);
+
+ // 8 SYSFLAG
+ int32_t sysFlag = byteBuffer.getInt();
+ msgExt->set_sys_flag(sysFlag);
+
+ // 9 BORNTIMESTAMP
+ int64_t bornTimeStamp = byteBuffer.getLong();
+ msgExt->set_born_timestamp(bornTimeStamp);
+
+ // 10 BORNHOST
+ int bornHostLength = (sysFlag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize;
+ ByteArray bornHost(bornHostLength);
+ byteBuffer.get(bornHost, 0, bornHostLength);
+ int32_t bornPort = byteBuffer.getInt();
+ msgExt->set_born_host(IPPortToSockaddr(bornHost, bornPort));
+
+ // 11 STORETIMESTAMP
+ int64_t storeTimestamp = byteBuffer.getLong();
+ msgExt->set_store_timestamp(storeTimestamp);
+
+ // 12 STOREHOST
+ int storeHostLength = (sysFlag & MessageSysFlag::STOREHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize;
+ ByteArray storeHost(storeHostLength);
+ byteBuffer.get(storeHost, 0, storeHostLength);
+ int32_t storePort = byteBuffer.getInt();
+ msgExt->set_store_host(IPPortToSockaddr(storeHost, storePort));
+
+ // 13 RECONSUMETIMES
+ int32_t reconsumeTimes = byteBuffer.getInt();
+ msgExt->set_reconsume_times(reconsumeTimes);
+
+ // 14 Prepared Transaction Offset
+ int64_t preparedTransactionOffset = byteBuffer.getLong();
+ msgExt->set_prepared_transaction_offset(preparedTransactionOffset);
+
+ // 15 BODY
+ int uncompress_failed = false;
+ int32_t bodyLen = byteBuffer.getInt();
+ if (bodyLen > 0) {
+ if (readBody) {
+ ByteArray body(byteBuffer.array() + byteBuffer.arrayOffset() + byteBuffer.position(), bodyLen);
+ byteBuffer.position(byteBuffer.position() + bodyLen);
+
+ // decompress body
+ if (deCompressBody && (sysFlag & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG) {
+ std::string origin_body;
+ if (UtilAll::inflate(body, origin_body)) {
+ msgExt->set_body(std::move(origin_body));
+ } else {
+ uncompress_failed = true;
+ }
} else {
- uncompress_failed = true;
+ msgExt->set_body(std::string(body.array(), body.size()));
}
} else {
- msgExt->set_body(std::string(body.array(), body.size()));
+ // skip body
+ byteBuffer.position(byteBuffer.position() + bodyLen);
}
- } else {
- // skip body
- byteBuffer.position(byteBuffer.position() + bodyLen);
}
- }
- // 16 TOPIC
- int8_t topicLen = byteBuffer.get();
- ByteArray topic(topicLen);
- byteBuffer.get(topic);
- msgExt->set_topic(topic.array(), topic.size());
-
- // 17 properties
- int16_t propertiesLen = byteBuffer.getShort();
- if (propertiesLen > 0) {
- ByteArray properties(propertiesLen);
- byteBuffer.get(properties);
- std::string propertiesString(properties.array(), properties.size());
- std::map<std::string, std::string> propertiesMap = string2messageProperties(propertiesString);
- MessageAccessor::setProperties(*msgExt, std::move(propertiesMap));
- }
+ // 16 TOPIC
+ int8_t topicLen = byteBuffer.get();
+ ByteArray topic(topicLen);
+ byteBuffer.get(topic);
+ msgExt->set_topic(topic.array(), topic.size());
+
+ // 17 properties
+ int16_t propertiesLen = byteBuffer.getShort();
+ if (propertiesLen > 0) {
+ ByteArray properties(propertiesLen);
+ byteBuffer.get(properties);
+ std::string propertiesString(properties.array(), properties.size());
+ std::map<std::string, std::string> propertiesMap = string2messageProperties(propertiesString);
+ MessageAccessor::setProperties(*msgExt, std::move(propertiesMap));
+ }
- // 18 msg ID
- std::string msgId = createMessageId(msgExt->store_host(), (int64_t)msgExt->commit_log_offset());
- msgExt->MessageExtImpl::set_msg_id(msgId);
+ // 18 msg ID
+ std::string msgId = createMessageId(msgExt->store_host(), (int64_t)msgExt->commit_log_offset());
+ msgExt->MessageExtImpl::set_msg_id(msgId);
- if (uncompress_failed) {
- LOG_WARN_NEW("can not uncompress message, id:{}", msgExt->msg_id());
- }
+ if (uncompress_failed) {
+ LOG_WARN_NEW("can not uncompress message, id:{}", msgExt->msg_id());
+ }
- return msgExt;
+ return msgExt;
+ } catch (const std::exception& e) {
+ byteBuffer.position(byteBuffer.limit());
+ return nullptr;
+ }
}
std::vector<MessageExtPtr> MessageDecoder::decodes(ByteBuffer& byteBuffer) {