You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2020/12/29 03:36:31 UTC
[rocketmq-client-cpp] 14/29: style: protocol
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit 9c1c68959983e9b360eb9422bfba860636626602
Author: James Yin <yw...@hotmail.com>
AuthorDate: Thu Jul 30 16:03:56 2020 +0800
style: protocol
---
src/ClientRemotingProcessor.cpp | 32 ++--
src/MQClientAPIImpl.cpp | 3 +-
src/MQClientAPIImpl.h | 7 +-
src/MQClientInstance.cpp | 28 ++--
src/MQClientInstance.h | 6 +-
src/common/FilterAPI.hpp | 18 +--
src/common/UtilAll.cpp | 1 +
src/consumer/DefaultMQPushConsumerImpl.cpp | 4 +-
src/consumer/DefaultMQPushConsumerImpl.h | 2 +-
src/consumer/LocalFileOffsetStore.cpp | 2 +-
src/consumer/MQConsumerInner.h | 4 +-
src/consumer/ProcessQueue.cpp | 2 +-
src/consumer/PullAPIWrapper.cpp | 2 +-
src/consumer/PullAPIWrapper.h | 2 +-
src/consumer/RebalanceImpl.cpp | 25 ++-
src/consumer/RebalanceImpl.h | 2 +-
src/consumer/RemoteBrokerOffsetStore.cpp | 2 +-
src/consumer/SubscriptionData.cpp | 87 ----------
src/log/Logging.h | 4 +
src/producer/TopicPublishInfo.hpp | 2 +-
src/protocol/HeartbeatData.h | 109 -------------
src/protocol/MessageQueue.cpp | 29 ----
src/protocol/{MessageQueue.h => MessageQueue.hpp} | 14 +-
src/protocol/TopicList.h | 6 +-
src/protocol/{ => body}/ConsumerRunningInfo.cpp | 0
src/protocol/{ => body}/ConsumerRunningInfo.h | 6 +-
src/protocol/body/LockBatchBody.cpp | 116 --------------
src/protocol/body/LockBatchBody.h | 80 ----------
src/protocol/body/LockBatchRequestBody.hpp | 61 +++++++
src/protocol/body/LockBatchResponseBody.hpp | 57 +++++++
.../ProcessQueueInfo.hpp} | 6 +-
src/protocol/body/ResetOffsetBody.cpp | 45 ------
.../{ResetOffsetBody.h => ResetOffsetBody.hpp} | 27 +++-
src/protocol/{ => body}/TopicRouteData.hpp | 10 +-
src/protocol/body/UnlockBatchRequestBody.hpp | 61 +++++++
src/protocol/header/ReplyMessageRequestHeader.cpp | 175 ---------------------
src/protocol/header/ReplyMessageRequestHeader.h | 92 -----------
src/protocol/header/ReplyMessageRequestHeader.hpp | 132 ++++++++++++++++
src/protocol/heartbeat/ConsumerData.hpp | 83 ++++++++++
src/protocol/heartbeat/HeartbeatData.hpp | 66 ++++++++
.../ProducerData.hpp} | 30 ++--
.../heartbeat/SubscriptionData.hpp} | 76 ++++++---
test/src/protocol/ConsumerRunningInfoTest.cpp | 3 -
test/src/protocol/HeartbeatDataTest.cpp | 51 ++----
test/src/protocol/LockBatchBodyTest.cpp | 18 ++-
test/src/protocol/MessageQueueTest.cpp | 2 +-
test/src/protocol/TopicRouteDataTest.cpp | 2 +-
test/src/transport/ClientRemotingProcessorTest.cpp | 2 +-
48 files changed, 678 insertions(+), 916 deletions(-)
diff --git a/src/ClientRemotingProcessor.cpp b/src/ClientRemotingProcessor.cpp
index 518e464..cbc8363 100644
--- a/src/ClientRemotingProcessor.cpp
+++ b/src/ClientRemotingProcessor.cpp
@@ -16,16 +16,16 @@
*/
#include "ClientRemotingProcessor.h"
-#include "ConsumerRunningInfo.h"
#include "MessageDecoder.h"
#include "MQProtos.h"
#include "MessageAccessor.hpp"
#include "MessageSysFlag.h"
#include "RequestFutureTable.h"
#include "SocketUtil.h"
-#include "protocol/body/ResetOffsetBody.h"
+#include "protocol/body/ConsumerRunningInfo.h"
+#include "protocol/body/ResetOffsetBody.hpp"
#include "protocol/header/CommandHeader.h"
-#include "protocol/header/ReplyMessageRequestHeader.h"
+#include "protocol/header/ReplyMessageRequestHeader.hpp"
namespace rocketmq {
@@ -106,7 +106,7 @@ RemotingCommand* ClientRemotingProcessor::resetOffset(RemotingCommand* request)
if (requestBody != nullptr && requestBody->size() > 0) {
std::unique_ptr<ResetOffsetBody> body(ResetOffsetBody::Decode(*requestBody));
if (body != nullptr) {
- client_instance_->resetOffset(responseHeader->getGroup(), responseHeader->getTopic(), body->getOffsetTable());
+ client_instance_->resetOffset(responseHeader->getGroup(), responseHeader->getTopic(), body->offset_table());
} else {
LOG_ERROR("resetOffset failed as received data could not be unserialized");
}
@@ -148,20 +148,20 @@ RemotingCommand* ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* r
try {
std::unique_ptr<MQMessageExt> msg(new MQMessageExt);
- msg->set_topic(requestHeader->getTopic());
- msg->set_queue_id(requestHeader->getQueueId());
- msg->set_store_timestamp(requestHeader->getStoreTimestamp());
+ msg->set_topic(requestHeader->topic());
+ msg->set_queue_id(requestHeader->queue_id());
+ msg->set_store_timestamp(requestHeader->store_timestamp());
- if (!requestHeader->getBornHost().empty()) {
- msg->set_born_host(string2SocketAddress(requestHeader->getBornHost()));
+ if (!requestHeader->born_host().empty()) {
+ msg->set_born_host(string2SocketAddress(requestHeader->born_host()));
}
- if (!requestHeader->getStoreHost().empty()) {
- msg->set_store_host(string2SocketAddress(requestHeader->getStoreHost()));
+ if (!requestHeader->store_host().empty()) {
+ msg->set_store_host(string2SocketAddress(requestHeader->store_host()));
}
auto body = request->body();
- if ((requestHeader->getSysFlag() & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG) {
+ if ((requestHeader->sys_flag() & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG) {
std::string origin_body;
if (UtilAll::inflate(*body, origin_body)) {
msg->set_body(std::move(origin_body));
@@ -172,12 +172,12 @@ RemotingCommand* ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* r
msg->set_body(std::string(body->array(), body->size()));
}
- msg->set_flag(requestHeader->getFlag());
- MessageAccessor::setProperties(*msg, MessageDecoder::string2messageProperties(requestHeader->getProperties()));
+ msg->set_flag(requestHeader->flag());
+ MessageAccessor::setProperties(*msg, MessageDecoder::string2messageProperties(requestHeader->properties()));
MessageAccessor::putProperty(*msg, MQMessageConst::PROPERTY_REPLY_MESSAGE_ARRIVE_TIME,
UtilAll::to_string(receiveTime));
- msg->set_born_timestamp(requestHeader->getBornTimestamp());
- msg->set_reconsume_times(requestHeader->getReconsumeTimes());
+ msg->set_born_timestamp(requestHeader->born_timestamp());
+ msg->set_reconsume_times(requestHeader->reconsume_times());
LOG_DEBUG_NEW("receive reply message:{}", msg->toString());
processReplyMessage(std::move(msg));
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index cb17dfd..c3e010f 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -28,6 +28,7 @@
#include "PullResultExt.hpp"
#include "SendCallbackWrap.h"
#include "TcpRemotingClient.h"
+#include "protocol/body/LockBatchResponseBody.hpp"
namespace rocketmq {
@@ -595,7 +596,7 @@ void MQClientAPIImpl::lockBatchMQ(const std::string& addr,
auto requestBody = response->body();
if (requestBody != nullptr && requestBody->size() > 0) {
std::unique_ptr<LockBatchResponseBody> body(LockBatchResponseBody::Decode(*requestBody));
- mqs = body->getLockOKMQSet();
+ mqs = std::move(body->lock_ok_mq_set());
} else {
mqs.clear();
}
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index c947692..b105ffe 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -19,7 +19,6 @@
#include "CommunicationMode.h"
#include "DefaultMQProducerImpl.h"
-#include "HeartbeatData.h"
#include "KVTable.h"
#include "MQException.h"
#include "MQClientInstance.h"
@@ -30,9 +29,11 @@
#include "TopicConfig.h"
#include "TopicList.h"
#include "TopicPublishInfo.hpp"
-#include "TopicRouteData.hpp"
-#include "protocol/body/LockBatchBody.h"
+#include "protocol/body/TopicRouteData.hpp"
+#include "protocol/body/LockBatchRequestBody.hpp"
+#include "protocol/body/UnlockBatchRequestBody.hpp"
#include "protocol/header/CommandHeader.h"
+#include "protocol/heartbeat/HeartbeatData.hpp"
namespace rocketmq {
diff --git a/src/MQClientInstance.cpp b/src/MQClientInstance.cpp
index f10f188..443b7c4 100644
--- a/src/MQClientInstance.cpp
+++ b/src/MQClientInstance.cpp
@@ -19,7 +19,7 @@
#include <typeindex>
#include "ClientRemotingProcessor.h"
-#include "ConsumerRunningInfo.h"
+#include "protocol/body/ConsumerRunningInfo.h"
#include "Logging.h"
#include "MQAdminImpl.h"
#include "MQClientAPIImpl.h"
@@ -375,8 +375,8 @@ void MQClientInstance::persistAllConsumerOffset() {
void MQClientInstance::sendHeartbeatToAllBroker() {
std::unique_ptr<HeartbeatData> heartbeatData(prepareHeartbeatData());
- bool producerEmpty = heartbeatData->isProducerDataSetEmpty();
- bool consumerEmpty = heartbeatData->isConsumerDataSetEmpty();
+ bool producerEmpty = heartbeatData->producer_data_set().empty();
+ bool consumerEmpty = heartbeatData->consumer_data_set().empty();
if (producerEmpty && consumerEmpty) {
LOG_WARN_NEW("sending heartbeat, but no consumer and no producer");
return;
@@ -481,7 +481,7 @@ HeartbeatData* MQClientInstance::prepareHeartbeatData() {
HeartbeatData* pHeartbeatData = new HeartbeatData();
// clientID
- pHeartbeatData->setClientID(client_id_);
+ pHeartbeatData->set_client_id(client_id_);
// Consumer
insertConsumerInfoToHeartBeatData(pHeartbeatData);
@@ -492,29 +492,21 @@ HeartbeatData* MQClientInstance::prepareHeartbeatData() {
return pHeartbeatData;
}
-void MQClientInstance::insertConsumerInfoToHeartBeatData(HeartbeatData* pHeartbeatData) {
+void MQClientInstance::insertConsumerInfoToHeartBeatData(HeartbeatData* heartbeatData) {
std::lock_guard<std::mutex> lock(consumer_table_mutex_);
for (const auto& it : consumer_table_) {
const auto* consumer = it.second;
- ConsumerData consumerData;
- consumerData.groupName = consumer->groupName();
- consumerData.consumeType = consumer->consumeType();
- consumerData.messageModel = consumer->messageModel();
- consumerData.consumeFromWhere = consumer->consumeFromWhere();
- std::vector<SubscriptionData> result = consumer->subscriptions();
- consumerData.subscriptionDataSet.swap(result);
// TODO: unitMode
-
- pHeartbeatData->insertDataToConsumerDataSet(consumerData);
+ heartbeatData->consumer_data_set().emplace_back(consumer->groupName(), consumer->consumeType(),
+ consumer->messageModel(), consumer->consumeFromWhere(),
+ consumer->subscriptions());
}
}
-void MQClientInstance::insertProducerInfoToHeartBeatData(HeartbeatData* pHeartbeatData) {
+void MQClientInstance::insertProducerInfoToHeartBeatData(HeartbeatData* heartbeatData) {
std::lock_guard<std::mutex> lock(producer_table_mutex_);
for (const auto& it : producer_table_) {
- ProducerData producerData;
- producerData.groupName = it.first;
- pHeartbeatData->insertDataToProducerDataSet(producerData);
+ heartbeatData->producer_data_set().emplace_back(it.first);
}
}
diff --git a/src/MQClientInstance.h b/src/MQClientInstance.h
index dc583ef..5a589aa 100644
--- a/src/MQClientInstance.h
+++ b/src/MQClientInstance.h
@@ -22,9 +22,8 @@
#include <set>
#include <utility>
-#include "ConsumerRunningInfo.h"
+#include "protocol/body/ConsumerRunningInfo.h"
#include "FindBrokerResult.hpp"
-#include "HeartbeatData.h"
#include "MQClientConfig.h"
#include "MQException.h"
#include "MQConsumerInner.h"
@@ -32,7 +31,8 @@
#include "MQProducerInner.h"
#include "ServiceState.h"
#include "TopicPublishInfo.hpp"
-#include "TopicRouteData.hpp"
+#include "protocol/body/TopicRouteData.hpp"
+#include "protocol/heartbeat/HeartbeatData.hpp"
#include "concurrent/executor.hpp"
namespace rocketmq {
diff --git a/src/common/FilterAPI.hpp b/src/common/FilterAPI.hpp
index aa40eb5..9bb18bb 100644
--- a/src/common/FilterAPI.hpp
+++ b/src/common/FilterAPI.hpp
@@ -20,30 +20,30 @@
#include <string> // std::string
#include "MQException.h"
-#include "SubscriptionData.h"
+#include "protocol/heartbeat/SubscriptionData.hpp"
#include "UtilAll.h"
namespace rocketmq {
class FilterAPI {
public:
- static SubscriptionData* buildSubscriptionData(const std::string& topic, const std::string& subString) {
+ static SubscriptionData* buildSubscriptionData(const std::string& topic, const std::string& sub_string) {
// delete in Rebalance
- std::unique_ptr<SubscriptionData> subscriptionData(new SubscriptionData(topic, subString));
+ std::unique_ptr<SubscriptionData> subscription_data(new SubscriptionData(topic, sub_string));
- if (subString.empty() || SUB_ALL == subString) {
- subscriptionData->set_sub_string(SUB_ALL);
+ if (sub_string.empty() || SUB_ALL == sub_string) {
+ subscription_data->set_sub_string(SUB_ALL);
} else {
std::vector<std::string> tags;
- UtilAll::Split(tags, subString, "||");
+ UtilAll::Split(tags, sub_string, "||");
if (!tags.empty()) {
for (auto tag : tags) {
if (!tag.empty()) {
UtilAll::Trim(tag);
if (!tag.empty()) {
- subscriptionData->put_tag(tag);
- subscriptionData->put_code(UtilAll::hash_code(tag));
+ subscription_data->code_set().push_back(UtilAll::hash_code(tag));
+ subscription_data->tags_set().push_back(std::move(tag));
}
}
}
@@ -52,7 +52,7 @@ class FilterAPI {
}
}
- return subscriptionData.release();
+ return subscription_data.release();
}
};
diff --git a/src/common/UtilAll.cpp b/src/common/UtilAll.cpp
index f326d95..658695f 100644
--- a/src/common/UtilAll.cpp
+++ b/src/common/UtilAll.cpp
@@ -18,6 +18,7 @@
#include <chrono>
#include <iostream>
+#include <thread>
#ifndef WIN32
#include <netdb.h> // gethostbyname
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index de90889..740e18b 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -22,7 +22,7 @@
#include "CommunicationMode.h"
#include "ConsumeMsgService.h"
-#include "ConsumerRunningInfo.h"
+#include "protocol/body/ConsumerRunningInfo.h"
#include "FilterAPI.hpp"
#include "Logging.h"
#include "MQAdminImpl.h"
@@ -579,7 +579,7 @@ int DefaultMQPushConsumerImpl::getMaxReconsumeTimes() {
}
}
-std::string DefaultMQPushConsumerImpl::groupName() const {
+const std::string& DefaultMQPushConsumerImpl::groupName() const {
return client_config_->group_name();
}
diff --git a/src/consumer/DefaultMQPushConsumerImpl.h b/src/consumer/DefaultMQPushConsumerImpl.h
index e6bee69..1e3fdc7 100755
--- a/src/consumer/DefaultMQPushConsumerImpl.h
+++ b/src/consumer/DefaultMQPushConsumerImpl.h
@@ -85,7 +85,7 @@ class DefaultMQPushConsumerImpl : public std::enable_shared_from_this<DefaultMQP
void resume() override;
public: // MQConsumerInner
- std::string groupName() const override;
+ const std::string& groupName() const override;
MessageModel messageModel() const override;
ConsumeType consumeType() const override;
ConsumeFromWhere consumeFromWhere() const override;
diff --git a/src/consumer/LocalFileOffsetStore.cpp b/src/consumer/LocalFileOffsetStore.cpp
index b2da588..ac66ece 100644
--- a/src/consumer/LocalFileOffsetStore.cpp
+++ b/src/consumer/LocalFileOffsetStore.cpp
@@ -20,7 +20,7 @@
#include "Logging.h"
#include "MQClientInstance.h"
-#include "MessageQueue.h"
+#include "MessageQueue.hpp"
#include "UtilAll.h"
namespace rocketmq {
diff --git a/src/consumer/MQConsumerInner.h b/src/consumer/MQConsumerInner.h
index 31a126b..b851316 100644
--- a/src/consumer/MQConsumerInner.h
+++ b/src/consumer/MQConsumerInner.h
@@ -21,7 +21,7 @@
#include <vector>
#include "ConsumeType.h"
-#include "SubscriptionData.h"
+#include "protocol/heartbeat/SubscriptionData.hpp"
namespace rocketmq {
@@ -32,7 +32,7 @@ class MQConsumerInner {
virtual ~MQConsumerInner() = default;
public: // MQConsumerInner in Java Client
- virtual std::string groupName() const = 0;
+ virtual const std::string& groupName() const = 0;
virtual MessageModel messageModel() const = 0;
virtual ConsumeType consumeType() const = 0;
virtual ConsumeFromWhere consumeFromWhere() const = 0;
diff --git a/src/consumer/ProcessQueue.cpp b/src/consumer/ProcessQueue.cpp
index bc6f29d..e2d3a39 100644
--- a/src/consumer/ProcessQueue.cpp
+++ b/src/consumer/ProcessQueue.cpp
@@ -17,7 +17,7 @@
#include "ProcessQueue.h"
#include "Logging.h"
-#include "ProcessQueueInfo.h"
+#include "protocol/body/ProcessQueueInfo.hpp"
#include "UtilAll.h"
static const uint64_t PULL_MAX_IDLE_TIME = 120000; // ms
diff --git a/src/consumer/PullAPIWrapper.cpp b/src/consumer/PullAPIWrapper.cpp
index 54a5512..e671118 100644
--- a/src/consumer/PullAPIWrapper.cpp
+++ b/src/consumer/PullAPIWrapper.cpp
@@ -70,7 +70,7 @@ PullResult PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
msgListFilterAgain.reserve(msgList.size());
for (const auto& msg : msgList) {
const auto& msgTag = msg->tags();
- if (subscriptionData->contain_tag(msgTag)) {
+ if (subscriptionData->containsTag(msgTag)) {
msgListFilterAgain.push_back(msg);
}
}
diff --git a/src/consumer/PullAPIWrapper.h b/src/consumer/PullAPIWrapper.h
index 02055d0..7edc227 100644
--- a/src/consumer/PullAPIWrapper.h
+++ b/src/consumer/PullAPIWrapper.h
@@ -23,7 +23,7 @@
#include "MQClientInstance.h"
#include "MQMessageQueue.h"
#include "PullCallback.h"
-#include "SubscriptionData.h"
+#include "protocol/heartbeat/SubscriptionData.hpp"
namespace rocketmq {
diff --git a/src/consumer/RebalanceImpl.cpp b/src/consumer/RebalanceImpl.cpp
index 9459d5e..a3988cd 100644
--- a/src/consumer/RebalanceImpl.cpp
+++ b/src/consumer/RebalanceImpl.cpp
@@ -18,7 +18,6 @@
#include "MQClientAPIImpl.h"
#include "MQClientInstance.h"
-#include "protocol/body/LockBatchBody.h"
namespace rocketmq {
@@ -42,9 +41,9 @@ void RebalanceImpl::unlock(MQMessageQueue mq, const bool oneway) {
client_instance_->findBrokerAddressInSubscribe(mq.broker_name(), MASTER_ID, true));
if (findBrokerResult) {
std::unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(new UnlockBatchRequestBody());
- unlockBatchRequest->setConsumerGroup(consumer_group_);
- unlockBatchRequest->setClientId(client_instance_->getClientId());
- unlockBatchRequest->getMqSet().push_back(mq);
+ unlockBatchRequest->set_consumer_group(consumer_group_);
+ unlockBatchRequest->set_client_id(client_instance_->getClientId());
+ unlockBatchRequest->mq_set().push_back(mq);
try {
client_instance_->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult->broker_addr(), unlockBatchRequest.get(),
@@ -81,9 +80,9 @@ void RebalanceImpl::unlockAll(const bool oneway) {
client_instance_->findBrokerAddressInSubscribe(brokerName, MASTER_ID, true));
if (findBrokerResult) {
std::unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(new UnlockBatchRequestBody());
- unlockBatchRequest->setConsumerGroup(consumer_group_);
- unlockBatchRequest->setClientId(client_instance_->getClientId());
- unlockBatchRequest->setMqSet(mqs);
+ unlockBatchRequest->set_consumer_group(consumer_group_);
+ unlockBatchRequest->set_client_id(client_instance_->getClientId());
+ unlockBatchRequest->set_mq_set(mqs);
try {
client_instance_->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult->broker_addr(), unlockBatchRequest.get(),
@@ -125,9 +124,9 @@ bool RebalanceImpl::lock(MQMessageQueue mq) {
client_instance_->findBrokerAddressInSubscribe(mq.broker_name(), MASTER_ID, true));
if (findBrokerResult) {
std::unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
- lockBatchRequest->setConsumerGroup(consumer_group_);
- lockBatchRequest->setClientId(client_instance_->getClientId());
- lockBatchRequest->getMqSet().push_back(mq);
+ lockBatchRequest->set_consumer_group(consumer_group_);
+ lockBatchRequest->set_client_id(client_instance_->getClientId());
+ lockBatchRequest->mq_set().push_back(mq);
try {
LOG_DEBUG("try to lock mq:%s", mq.toString().c_str());
@@ -182,9 +181,9 @@ void RebalanceImpl::lockAll() {
client_instance_->findBrokerAddressInSubscribe(brokerName, MASTER_ID, true));
if (findBrokerResult) {
std::unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
- lockBatchRequest->setConsumerGroup(consumer_group_);
- lockBatchRequest->setClientId(client_instance_->getClientId());
- lockBatchRequest->setMqSet(mqs);
+ lockBatchRequest->set_consumer_group(consumer_group_);
+ lockBatchRequest->set_client_id(client_instance_->getClientId());
+ lockBatchRequest->set_mq_set(mqs);
LOG_INFO("try to lock:" SIZET_FMT " mqs of broker:%s", mqs.size(), brokerName.c_str());
try {
diff --git a/src/consumer/RebalanceImpl.h b/src/consumer/RebalanceImpl.h
index 8648aa2..0ef1a9a 100755
--- a/src/consumer/RebalanceImpl.h
+++ b/src/consumer/RebalanceImpl.h
@@ -26,7 +26,7 @@
#include "MQMessageQueue.h"
#include "ProcessQueue.h"
#include "PullRequest.h"
-#include "SubscriptionData.h"
+#include "protocol/heartbeat/SubscriptionData.hpp"
namespace rocketmq {
diff --git a/src/consumer/RemoteBrokerOffsetStore.cpp b/src/consumer/RemoteBrokerOffsetStore.cpp
index 8754d8b..537cf7f 100644
--- a/src/consumer/RemoteBrokerOffsetStore.cpp
+++ b/src/consumer/RemoteBrokerOffsetStore.cpp
@@ -19,7 +19,7 @@
#include "Logging.h"
#include "MQClientAPIImpl.h"
#include "MQClientInstance.h"
-#include "MessageQueue.h"
+#include "MessageQueue.hpp"
#include "UtilAll.h"
namespace rocketmq {
diff --git a/src/consumer/SubscriptionData.cpp b/src/consumer/SubscriptionData.cpp
deleted file mode 100644
index ad1f21f..0000000
--- a/src/consumer/SubscriptionData.cpp
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "SubscriptionData.h"
-
-#include <algorithm>
-#include <sstream>
-#include <vector>
-
-#include "Logging.h"
-#include "UtilAll.h"
-
-namespace rocketmq {
-
-SubscriptionData::SubscriptionData() {
- sub_version_ = UtilAll::currentTimeMillis();
-}
-
-SubscriptionData::SubscriptionData(const std::string& topic, const std::string& subString)
- : topic_(topic), sub_string_(subString) {
- sub_version_ = UtilAll::currentTimeMillis();
-}
-
-SubscriptionData::SubscriptionData(const SubscriptionData& other) {
- sub_string_ = other.sub_string_;
- sub_version_ = other.sub_version_;
- tag_set_ = other.tag_set_;
- topic_ = other.topic_;
- code_set_ = other.code_set_;
-}
-
-bool SubscriptionData::operator==(const SubscriptionData& other) const {
- if (topic_ != other.topic_) {
- return false;
- }
- if (sub_string_ != other.sub_string_) {
- return false;
- }
- if (sub_version_ != other.sub_version_) {
- return false;
- }
- if (tag_set_.size() != other.tag_set_.size()) {
- return false;
- }
- return true;
-}
-
-bool SubscriptionData::operator<(const SubscriptionData& other) const {
- int ret = topic_.compare(other.topic_);
- if (ret == 0) {
- return sub_string_.compare(other.sub_string_) < 0;
- } else {
- return ret < 0;
- }
-}
-
-Json::Value SubscriptionData::toJson() const {
- Json::Value outJson;
- outJson["topic"] = topic_;
- outJson["subString"] = sub_string_;
- outJson["subVersion"] = UtilAll::to_string(sub_version_);
-
- for (const auto& tag : tag_set_) {
- outJson["tagsSet"].append(tag);
- }
-
- for (const auto& code : code_set_) {
- outJson["codeSet"].append(code);
- }
-
- return outJson;
-}
-
-} // namespace rocketmq
diff --git a/src/log/Logging.h b/src/log/Logging.h
index 6c361e5..5afcc56 100644
--- a/src/log/Logging.h
+++ b/src/log/Logging.h
@@ -23,7 +23,11 @@
// clang-format off
#include <spdlog/spdlog.h>
+#if !defined(SPDLOG_FMT_EXTERNAL)
#include <spdlog/fmt/bundled/printf.h>
+#else
+#include <fmt/printf.h>
+#endif
// clang-format on
namespace rocketmq {
diff --git a/src/producer/TopicPublishInfo.hpp b/src/producer/TopicPublishInfo.hpp
index 341e066..94c5375 100644
--- a/src/producer/TopicPublishInfo.hpp
+++ b/src/producer/TopicPublishInfo.hpp
@@ -23,7 +23,7 @@
#include "MQException.h"
#include "MQMessageQueue.h"
-#include "TopicRouteData.hpp"
+#include "protocol/body/TopicRouteData.hpp"
namespace rocketmq {
diff --git a/src/protocol/HeartbeatData.h b/src/protocol/HeartbeatData.h
deleted file mode 100644
index 1453b63..0000000
--- a/src/protocol/HeartbeatData.h
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __HEARTBEAT_DATA_H__
-#define __HEARTBEAT_DATA_H__
-
-#include <string>
-#include <vector>
-
-#include "ConsumeType.h"
-#include "RemotingSerializable.h"
-#include "SubscriptionData.h"
-
-namespace rocketmq {
-
-class ProducerData {
- public:
- bool operator<(const ProducerData& pd) const { return groupName < pd.groupName; }
-
- Json::Value toJson() const {
- Json::Value outJson;
- outJson["groupName"] = groupName;
- return outJson;
- }
-
- public:
- std::string groupName;
-};
-
-class ConsumerData {
- public:
- bool operator<(const ConsumerData& cd) const { return groupName < cd.groupName; }
-
- Json::Value toJson() const {
- Json::Value outJson;
- outJson["groupName"] = groupName;
- outJson["consumeFromWhere"] = consumeFromWhere;
- outJson["consumeType"] = consumeType;
- outJson["messageModel"] = messageModel;
-
- for (const auto& sd : subscriptionDataSet) {
- outJson["subscriptionDataSet"].append(sd.toJson());
- }
-
- return outJson;
- }
-
- public:
- std::string groupName;
- ConsumeType consumeType;
- MessageModel messageModel;
- ConsumeFromWhere consumeFromWhere;
- std::vector<SubscriptionData> subscriptionDataSet;
-};
-
-class HeartbeatData : public RemotingSerializable {
- public:
- std::string encode() {
- Json::Value root;
-
- // id
- root["clientID"] = m_clientID;
-
- // consumer
- for (const auto& cd : m_consumerDataSet) {
- root["consumerDataSet"].append(cd.toJson());
- }
-
- // producer
- for (const auto& pd : m_producerDataSet) {
- root["producerDataSet"].append(pd.toJson());
- }
-
- // output
- return RemotingSerializable::toJson(root);
- }
-
- void setClientID(const std::string& clientID) { m_clientID = clientID; }
-
- bool isProducerDataSetEmpty() { return m_producerDataSet.empty(); }
-
- void insertDataToProducerDataSet(ProducerData& producerData) { m_producerDataSet.push_back(producerData); }
-
- bool isConsumerDataSetEmpty() { return m_consumerDataSet.empty(); }
-
- void insertDataToConsumerDataSet(ConsumerData& consumerData) { m_consumerDataSet.push_back(consumerData); }
-
- private:
- std::string m_clientID;
- std::vector<ProducerData> m_producerDataSet;
- std::vector<ConsumerData> m_consumerDataSet;
-};
-
-} // namespace rocketmq
-
-#endif // __HEARTBEAT_DATA_H__
diff --git a/src/protocol/MessageQueue.cpp b/src/protocol/MessageQueue.cpp
deleted file mode 100644
index ab9af65..0000000
--- a/src/protocol/MessageQueue.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "MessageQueue.h"
-
-namespace rocketmq {
-
-Json::Value toJson(const MQMessageQueue& mq) {
- Json::Value outJson;
- outJson["topic"] = mq.topic();
- outJson["brokerName"] = mq.broker_name();
- outJson["queueId"] = mq.queue_id();
- return outJson;
-}
-
-} // namespace rocketmq
diff --git a/src/protocol/MessageQueue.h b/src/protocol/MessageQueue.hpp
similarity index 73%
rename from src/protocol/MessageQueue.h
rename to src/protocol/MessageQueue.hpp
index 50ce546..9f6af19 100644
--- a/src/protocol/MessageQueue.h
+++ b/src/protocol/MessageQueue.hpp
@@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef __MESSAGE_QUEUE_H__
-#define __MESSAGE_QUEUE_H__
+#ifndef ROCKETMQ_PROTOCOL_MESSAGEQUEUE_H_
+#define ROCKETMQ_PROTOCOL_MESSAGEQUEUE_H_
#include <json/json.h>
@@ -23,8 +23,14 @@
namespace rocketmq {
-Json::Value toJson(const MQMessageQueue& mq);
+inline Json::Value toJson(const MQMessageQueue& mq) {
+ Json::Value root;
+ root["topic"] = mq.topic();
+ root["brokerName"] = mq.broker_name();
+ root["queueId"] = mq.queue_id();
+ return root;
+}
} // namespace rocketmq
-#endif // __MESSAGE_QUEUE_H__
+#endif // ROCKETMQ_PROTOCOL_MESSAGEQUEUE_H_
diff --git a/src/protocol/TopicList.h b/src/protocol/TopicList.h
index 450e6b2..bbcb7cc 100644
--- a/src/protocol/TopicList.h
+++ b/src/protocol/TopicList.h
@@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef __TOPIC_LIST_H__
-#define __TOPIC_LIST_H__
+#ifndef ROCKETMQ_PROTOCOL_TOPICLIST_H_
+#define ROCKETMQ_PROTOCOL_TOPICLIST_H_
#include <string> // std::string
#include <vector> // std::vector
@@ -34,4 +34,4 @@ class TopicList {
} // namespace rocketmq
-#endif // __TOPIC_LIST_H__
+#endif // ROCKETMQ_PROTOCOL_TOPICLIST_H_
diff --git a/src/protocol/ConsumerRunningInfo.cpp b/src/protocol/body/ConsumerRunningInfo.cpp
similarity index 100%
rename from src/protocol/ConsumerRunningInfo.cpp
rename to src/protocol/body/ConsumerRunningInfo.cpp
diff --git a/src/protocol/ConsumerRunningInfo.h b/src/protocol/body/ConsumerRunningInfo.h
similarity index 96%
rename from src/protocol/ConsumerRunningInfo.h
rename to src/protocol/body/ConsumerRunningInfo.h
index 63d4c5d..5bfb886 100644
--- a/src/protocol/ConsumerRunningInfo.h
+++ b/src/protocol/body/ConsumerRunningInfo.h
@@ -17,9 +17,9 @@
#ifndef ROCKETMQ_PROTOCOL_CONSUMERRUNNINGINFO_H_
#define ROCKETMQ_PROTOCOL_CONSUMERRUNNINGINFO_H_
-#include "MessageQueue.h"
-#include "ProcessQueueInfo.h"
-#include "SubscriptionData.h"
+#include "MessageQueue.hpp"
+#include "ProcessQueueInfo.hpp"
+#include "protocol/heartbeat/SubscriptionData.hpp"
namespace rocketmq {
diff --git a/src/protocol/body/LockBatchBody.cpp b/src/protocol/body/LockBatchBody.cpp
deleted file mode 100644
index fde8dcb..0000000
--- a/src/protocol/body/LockBatchBody.cpp
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "LockBatchBody.h"
-
-#include "Logging.h"
-#include "MessageQueue.h"
-
-namespace rocketmq {
-
-std::string LockBatchRequestBody::getConsumerGroup() {
- return m_consumerGroup;
-}
-
-void LockBatchRequestBody::setConsumerGroup(std::string consumerGroup) {
- m_consumerGroup = consumerGroup;
-}
-
-std::string LockBatchRequestBody::getClientId() {
- return m_clientId;
-}
-
-void LockBatchRequestBody::setClientId(std::string clientId) {
- m_clientId = clientId;
-}
-
-std::vector<MQMessageQueue>& LockBatchRequestBody::getMqSet() {
- return m_mqSet;
-}
-
-void LockBatchRequestBody::setMqSet(std::vector<MQMessageQueue> mqSet) {
- m_mqSet.swap(mqSet);
-}
-
-std::string LockBatchRequestBody::encode() {
- Json::Value root;
- root["consumerGroup"] = m_consumerGroup;
- root["clientId"] = m_clientId;
-
- for (const auto& mq : m_mqSet) {
- root["mqSet"].append(rocketmq::toJson(mq));
- }
-
- return RemotingSerializable::toJson(root);
-}
-
-const std::vector<MQMessageQueue>& LockBatchResponseBody::getLockOKMQSet() {
- return m_lockOKMQSet;
-}
-
-void LockBatchResponseBody::setLockOKMQSet(std::vector<MQMessageQueue> lockOKMQSet) {
- m_lockOKMQSet.swap(lockOKMQSet);
-}
-
-LockBatchResponseBody* LockBatchResponseBody::Decode(const ByteArray& bodyData) {
- Json::Value root = RemotingSerializable::fromJson(bodyData);
- auto& mqs = root["lockOKMQSet"];
- std::unique_ptr<LockBatchResponseBody> body(new LockBatchResponseBody());
- for (const auto& qd : mqs) {
- MQMessageQueue mq(qd["topic"].asString(), qd["brokerName"].asString(), qd["queueId"].asInt());
- LOG_INFO("LockBatchResponseBody MQ:%s", mq.toString().c_str());
- body->m_lockOKMQSet.push_back(std::move(mq));
- }
- return body.release();
-}
-
-std::string UnlockBatchRequestBody::getConsumerGroup() {
- return m_consumerGroup;
-}
-
-void UnlockBatchRequestBody::setConsumerGroup(std::string consumerGroup) {
- m_consumerGroup = consumerGroup;
-}
-
-std::string UnlockBatchRequestBody::getClientId() {
- return m_clientId;
-}
-
-void UnlockBatchRequestBody::setClientId(std::string clientId) {
- m_clientId = clientId;
-}
-
-std::vector<MQMessageQueue>& UnlockBatchRequestBody::getMqSet() {
- return m_mqSet;
-}
-
-void UnlockBatchRequestBody::setMqSet(std::vector<MQMessageQueue> mqSet) {
- m_mqSet.swap(mqSet);
-}
-
-std::string UnlockBatchRequestBody::encode() {
- Json::Value root;
- root["consumerGroup"] = m_consumerGroup;
- root["clientId"] = m_clientId;
-
- for (const auto& mq : m_mqSet) {
- root["mqSet"].append(rocketmq::toJson(mq));
- }
-
- return RemotingSerializable::toJson(root);
-}
-
-} // namespace rocketmq
diff --git a/src/protocol/body/LockBatchBody.h b/src/protocol/body/LockBatchBody.h
deleted file mode 100644
index ce38974..0000000
--- a/src/protocol/body/LockBatchBody.h
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __LOCK_BATCH_BODY_H__
-#define __LOCK_BATCH_BODY_H__
-
-#include <set>
-#include <string>
-
-#include "MQMessageQueue.h"
-#include "RemotingSerializable.h"
-#include "UtilAll.h"
-
-namespace rocketmq {
-
-class LockBatchRequestBody : public RemotingSerializable {
- public:
- std::string getConsumerGroup();
- void setConsumerGroup(std::string consumerGroup);
-
- std::string getClientId();
- void setClientId(std::string clientId);
-
- std::vector<MQMessageQueue>& getMqSet();
- void setMqSet(std::vector<MQMessageQueue> mqSet);
-
- std::string encode() override;
-
- private:
- std::string m_consumerGroup;
- std::string m_clientId;
- std::vector<MQMessageQueue> m_mqSet;
-};
-
-class LockBatchResponseBody {
- public:
- static LockBatchResponseBody* Decode(const ByteArray& bodyData);
-
- const std::vector<MQMessageQueue>& getLockOKMQSet();
- void setLockOKMQSet(std::vector<MQMessageQueue> lockOKMQSet);
-
- private:
- std::vector<MQMessageQueue> m_lockOKMQSet;
-};
-
-class UnlockBatchRequestBody : public RemotingSerializable {
- public:
- std::string getConsumerGroup();
- void setConsumerGroup(std::string consumerGroup);
-
- std::string getClientId();
- void setClientId(std::string clientId);
-
- std::vector<MQMessageQueue>& getMqSet();
- void setMqSet(std::vector<MQMessageQueue> mqSet);
-
- std::string encode() override;
-
- private:
- std::string m_consumerGroup;
- std::string m_clientId;
- std::vector<MQMessageQueue> m_mqSet;
-};
-
-} // namespace rocketmq
-
-#endif // __LOCK_BATCH_BODY_H__
diff --git a/src/protocol/body/LockBatchRequestBody.hpp b/src/protocol/body/LockBatchRequestBody.hpp
new file mode 100644
index 0000000..6b75be4
--- /dev/null
+++ b/src/protocol/body/LockBatchRequestBody.hpp
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_PROTOCOL_BODY_LOCKBATCHREQUESTBODY_HPP_
+#define ROCKETMQ_PROTOCOL_BODY_LOCKBATCHREQUESTBODY_HPP_
+
+#include <algorithm> // std::move
+#include <vector> // std::vector
+
+#include "MessageQueue.hpp"
+#include "RemotingSerializable.h"
+
+namespace rocketmq {
+
+class LockBatchRequestBody : public RemotingSerializable {
+ public:
+ std::string encode() override {
+ Json::Value root;
+ root["consumerGroup"] = consumer_group_;
+ root["clientId"] = client_id_;
+
+ for (const auto& mq : mq_set_) {
+ root["mqSet"].append(rocketmq::toJson(mq));
+ }
+
+ return RemotingSerializable::toJson(root);
+ }
+
+ public:
+ inline const std::string& consumer_group() { return consumer_group_; }
+ inline void set_consumer_group(const std::string& consumerGroup) { consumer_group_ = consumerGroup; }
+
+ inline const std::string& client_id() { return client_id_; }
+ inline void set_client_id(const std::string& clientId) { client_id_ = clientId; }
+
+ inline std::vector<MQMessageQueue>& mq_set() { return mq_set_; }
+ inline void set_mq_set(const std::vector<MQMessageQueue>& mq_set) { mq_set_ = mq_set; }
+ inline void set_mq_set(std::vector<MQMessageQueue>&& mq_set) { mq_set_ = std::move(mq_set); }
+
+ private:
+ std::string consumer_group_;
+ std::string client_id_;
+ std::vector<MQMessageQueue> mq_set_;
+};
+
+} // namespace rocketmq
+
+#endif // ROCKETMQ_PROTOCOL_BODY_LOCKBATCHREQUESTBODY_HPP_
diff --git a/src/protocol/body/LockBatchResponseBody.hpp b/src/protocol/body/LockBatchResponseBody.hpp
new file mode 100644
index 0000000..af16bf2
--- /dev/null
+++ b/src/protocol/body/LockBatchResponseBody.hpp
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_PROTOCOL_BODY_LOCKBATCHRESPONSEBODY_HPP_
+#define ROCKETMQ_PROTOCOL_BODY_LOCKBATCHRESPONSEBODY_HPP_
+
+#include <algorithm> // std::move
+#include <vector> // std::vector
+
+#include "Logging.h"
+#include "MQMessageQueue.h"
+#include "RemotingSerializable.h"
+
+namespace rocketmq {
+
+class LockBatchResponseBody {
+ public:
+ static LockBatchResponseBody* Decode(const ByteArray& bodyData) {
+ Json::Value root = RemotingSerializable::fromJson(bodyData);
+ auto& mqs = root["lockOKMQSet"];
+ std::unique_ptr<LockBatchResponseBody> body(new LockBatchResponseBody());
+ for (const auto& qd : mqs) {
+ MQMessageQueue mq(qd["topic"].asString(), qd["brokerName"].asString(), qd["queueId"].asInt());
+ LOG_INFO_NEW("LockBatchResponseBody MQ:{}", mq.toString());
+ body->lock_ok_mq_set().push_back(std::move(mq));
+ }
+ return body.release();
+ }
+
+ public:
+ inline const std::vector<MQMessageQueue>& lock_ok_mq_set() const { return lock_ok_mq_set_; }
+ inline std::vector<MQMessageQueue>& lock_ok_mq_set() { return lock_ok_mq_set_; }
+ inline void set_lock_ok_mq_set(const std::vector<MQMessageQueue>& lockOKMQSet) { lock_ok_mq_set_ = lockOKMQSet; }
+ inline void set_lock_ok_mq_set(std::vector<MQMessageQueue>&& lockOKMQSet) {
+ lock_ok_mq_set_ = std::move(lockOKMQSet);
+ }
+
+ private:
+ std::vector<MQMessageQueue> lock_ok_mq_set_;
+};
+
+} // namespace rocketmq
+
+#endif // ROCKETMQ_PROTOCOL_BODY_LOCKBATCHRESPONSEBODY_HPP_
diff --git a/src/protocol/ProcessQueueInfo.h b/src/protocol/body/ProcessQueueInfo.hpp
similarity index 94%
rename from src/protocol/ProcessQueueInfo.h
rename to src/protocol/body/ProcessQueueInfo.hpp
index 3d2ea6d..64c66c2 100644
--- a/src/protocol/ProcessQueueInfo.h
+++ b/src/protocol/body/ProcessQueueInfo.hpp
@@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef __PROCESS_QUEUE_INFO_H__
-#define __PROCESS_QUEUE_INFO_H__
+#ifndef ROCKETMQ_PROTOCOL_BODY_PROCESS_QUEUE_INFO_HPP_
+#define ROCKETMQ_PROTOCOL_BODY_PROCESS_QUEUE_INFO_HPP_
#include <json/json.h>
@@ -92,4 +92,4 @@ class ProcessQueueInfo {
} // namespace rocketmq
-#endif // __PROCESS_QUEUE_INFO_H__
+#endif // ROCKETMQ_PROTOCOL_BODY_PROCESS_QUEUE_INFO_HPP_
diff --git a/src/protocol/body/ResetOffsetBody.cpp b/src/protocol/body/ResetOffsetBody.cpp
deleted file mode 100644
index 497f591..0000000
--- a/src/protocol/body/ResetOffsetBody.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ResetOffsetBody.h"
-
-#include "RemotingSerializable.h"
-
-namespace rocketmq {
-
-ResetOffsetBody* ResetOffsetBody::Decode(const ByteArray& bodyData) {
- // FIXME: object as key
- Json::Value root = RemotingSerializable::fromJson(bodyData);
- Json::Value qds = root["offsetTable"];
- std::unique_ptr<ResetOffsetBody> body(new ResetOffsetBody());
- for (unsigned int i = 0; i < qds.size(); i++) {
- Json::Value qd = qds[i];
- MQMessageQueue mq(qd["brokerName"].asString(), qd["topic"].asString(), qd["queueId"].asInt());
- int64_t offset = qd["offset"].asInt64();
- body->setOffsetTable(mq, offset);
- }
- return body.release();
-}
-
-std::map<MQMessageQueue, int64_t> ResetOffsetBody::getOffsetTable() {
- return offset_table_;
-}
-
-void ResetOffsetBody::setOffsetTable(const MQMessageQueue& mq, int64_t offset) {
- offset_table_[mq] = offset;
-}
-
-} // namespace rocketmq
diff --git a/src/protocol/body/ResetOffsetBody.h b/src/protocol/body/ResetOffsetBody.hpp
similarity index 51%
copy from src/protocol/body/ResetOffsetBody.h
copy to src/protocol/body/ResetOffsetBody.hpp
index f1024b9..157e567 100644
--- a/src/protocol/body/ResetOffsetBody.h
+++ b/src/protocol/body/ResetOffsetBody.hpp
@@ -14,22 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef __RESET_OFFSET_BODY__
-#define __RESET_OFFSET_BODY__
+#ifndef ROCKETMQ_PROTOCOL_BODY_RESETOFFSETBODY_HPP_
+#define ROCKETMQ_PROTOCOL_BODY_RESETOFFSETBODY_HPP_
#include <map> // std::map
-#include "ByteArray.h"
#include "MQMessageQueue.h"
+#include "RemotingSerializable.h"
namespace rocketmq {
class ResetOffsetBody {
public:
- static ResetOffsetBody* Decode(const ByteArray& bodyData);
+ static ResetOffsetBody* Decode(const ByteArray& bodyData) {
+ // FIXME: object as key
+ Json::Value root = RemotingSerializable::fromJson(bodyData);
+ auto& qds = root["offsetTable"];
+ std::unique_ptr<ResetOffsetBody> body(new ResetOffsetBody());
+ Json::Value::Members members = qds.getMemberNames();
+ for (const auto& member : members) {
+ Json::Value key = RemotingSerializable::fromJson(member);
+ MQMessageQueue mq(key["topic"].asString(), key["brokerName"].asString(), key["queueId"].asInt());
+ int64_t offset = qds[member].asInt64();
+ body->offset_table_.emplace(std::move(mq), offset);
+ }
+ return body.release();
+ }
- std::map<MQMessageQueue, int64_t> getOffsetTable();
- void setOffsetTable(const MQMessageQueue& mq, int64_t offset);
+ public:
+ std::map<MQMessageQueue, int64_t>& offset_table() { return offset_table_; }
private:
std::map<MQMessageQueue, int64_t> offset_table_;
@@ -37,4 +50,4 @@ class ResetOffsetBody {
} // namespace rocketmq
-#endif // __RESET_OFFSET_BODY__
+#endif // ROCKETMQ_PROTOCOL_BODY_RESETOFFSETBODY_HPP_
diff --git a/src/protocol/TopicRouteData.hpp b/src/protocol/body/TopicRouteData.hpp
similarity index 96%
rename from src/protocol/TopicRouteData.hpp
rename to src/protocol/body/TopicRouteData.hpp
index 6d4b64c..38e1889 100644
--- a/src/protocol/TopicRouteData.hpp
+++ b/src/protocol/body/TopicRouteData.hpp
@@ -123,12 +123,12 @@ class TopicRouteData {
for (auto bd : bds) {
std::string broker_name = bd["brokerName"].asString();
LOG_DEBUG_NEW("brokerName:{}", broker_name);
+ auto& bas = bd["brokerAddrs"];
+ Json::Value::Members members = bas.getMemberNames();
std::map<int, std::string> broker_addrs;
- Json::Value bas = bd["brokerAddrs"];
- Json::Value::Members mbs = bas.getMemberNames();
- for (const auto& key : mbs) {
- int id = std::stoi(key);
- std::string addr = bas[key].asString();
+ for (const auto& member : members) {
+ int id = std::stoi(member);
+ std::string addr = bas[member].asString();
broker_addrs.emplace(id, std::move(addr));
LOG_DEBUG_NEW("brokerId:{}, brokerAddr:{}", id, addr);
}
diff --git a/src/protocol/body/UnlockBatchRequestBody.hpp b/src/protocol/body/UnlockBatchRequestBody.hpp
new file mode 100644
index 0000000..ffee533
--- /dev/null
+++ b/src/protocol/body/UnlockBatchRequestBody.hpp
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_PROTOCOL_BODY_UNLOCKBATCHREQUESTBODY_HPP_
+#define ROCKETMQ_PROTOCOL_BODY_UNLOCKBATCHREQUESTBODY_HPP_
+
+#include <algorithm> // std::move
+#include <vector> // std::vector
+
+#include "MessageQueue.hpp"
+#include "RemotingSerializable.h"
+
+namespace rocketmq {
+
+class UnlockBatchRequestBody : public RemotingSerializable {
+ public:
+ std::string encode() override {
+ Json::Value root;
+ root["consumerGroup"] = consumer_group_;
+ root["clientId"] = client_id_;
+
+ for (const auto& mq : mq_set_) {
+ root["mqSet"].append(rocketmq::toJson(mq));
+ }
+
+ return RemotingSerializable::toJson(root);
+ }
+
+ public:
+ inline const std::string& consumer_group() { return consumer_group_; }
+ inline void set_consumer_group(const std::string& consumerGroup) { consumer_group_ = consumerGroup; }
+
+ inline const std::string& client_id() { return client_id_; }
+ inline void set_client_id(const std::string& clientId) { client_id_ = clientId; }
+
+ inline std::vector<MQMessageQueue>& mq_set() { return mq_set_; }
+ inline void set_mq_set(const std::vector<MQMessageQueue>& mq_set) { mq_set_ = mq_set; }
+ inline void set_mq_set(std::vector<MQMessageQueue>&& mq_set) { mq_set_ = std::move(mq_set); }
+
+ private:
+ std::string consumer_group_;
+ std::string client_id_;
+ std::vector<MQMessageQueue> mq_set_;
+};
+
+} // namespace rocketmq
+
+#endif // ROCKETMQ_PROTOCOL_BODY_UNLOCKBATCHREQUESTBODY_HPP_
diff --git a/src/protocol/header/ReplyMessageRequestHeader.cpp b/src/protocol/header/ReplyMessageRequestHeader.cpp
deleted file mode 100644
index 7fd3625..0000000
--- a/src/protocol/header/ReplyMessageRequestHeader.cpp
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ReplyMessageRequestHeader.h"
-
-#include <memory>
-
-#include "UtilAll.h"
-
-namespace rocketmq {
-
-ReplyMessageRequestHeader* ReplyMessageRequestHeader::Decode(std::map<std::string, std::string>& extFields) {
- std::unique_ptr<ReplyMessageRequestHeader> header(new ReplyMessageRequestHeader());
-
- header->producerGroup = extFields.at("producerGroup");
- header->topic = extFields.at("topic");
- header->defaultTopic = extFields.at("defaultTopic");
- header->defaultTopicQueueNums = std::stoi(extFields.at("defaultTopicQueueNums"));
- header->queueId = std::stoi(extFields.at("queueId"));
- header->sysFlag = std::stoi(extFields.at("sysFlag"));
- header->bornTimestamp = std::stoll(extFields.at("bornTimestamp"));
- header->flag = std::stoi(extFields.at("flag"));
-
- auto it = extFields.find("properties");
- if (it != extFields.end()) {
- header->properties = it->second;
- }
-
- it = extFields.find("reconsumeTimes");
- if (it != extFields.end()) {
- header->reconsumeTimes = std::stoi(it->second);
- } else {
- header->reconsumeTimes = 0;
- }
-
- it = extFields.find("unitMode");
- if (it != extFields.end()) {
- header->unitMode = UtilAll::stob(it->second);
- } else {
- header->unitMode = false;
- }
-
- header->bornHost = extFields.at("bornHost");
- header->storeHost = extFields.at("storeHost");
- header->storeTimestamp = std::stoll(extFields.at("storeTimestamp"));
-
- return header.release();
-}
-
-const std::string& ReplyMessageRequestHeader::getProducerGroup() const {
- return this->producerGroup;
-}
-
-void ReplyMessageRequestHeader::setProducerGroup(const std::string& producerGroup) {
- this->producerGroup = producerGroup;
-}
-
-const std::string& ReplyMessageRequestHeader::getTopic() const {
- return this->topic;
-}
-
-void ReplyMessageRequestHeader::setTopic(const std::string& topic) {
- this->topic = topic;
-}
-
-const std::string& ReplyMessageRequestHeader::getDefaultTopic() const {
- return this->defaultTopic;
-}
-
-void ReplyMessageRequestHeader::setDefaultTopic(const std::string& defaultTopic) {
- this->defaultTopic = defaultTopic;
-}
-
-int32_t ReplyMessageRequestHeader::getDefaultTopicQueueNums() const {
- return this->defaultTopicQueueNums;
-}
-
-void ReplyMessageRequestHeader::setDefaultTopicQueueNums(int32_t defaultTopicQueueNums) {
- this->defaultTopicQueueNums = defaultTopicQueueNums;
-}
-
-int32_t ReplyMessageRequestHeader::getQueueId() const {
- return this->queueId;
-}
-
-void ReplyMessageRequestHeader::setQueueId(int32_t queueId) {
- this->queueId = queueId;
-}
-
-int32_t ReplyMessageRequestHeader::getSysFlag() const {
- return this->sysFlag;
-}
-
-void ReplyMessageRequestHeader::setSysFlag(int32_t sysFlag) {
- this->sysFlag = sysFlag;
-}
-
-int64_t ReplyMessageRequestHeader::getBornTimestamp() const {
- return this->bornTimestamp;
-}
-
-void ReplyMessageRequestHeader::setBornTimestamp(int64_t bornTimestamp) {
- this->bornTimestamp = bornTimestamp;
-}
-
-int32_t ReplyMessageRequestHeader::getFlag() const {
- return this->flag;
-}
-
-void ReplyMessageRequestHeader::setFlag(int32_t flag) {
- this->flag = flag;
-}
-
-const std::string& ReplyMessageRequestHeader::getProperties() const {
- return this->properties;
-}
-
-void ReplyMessageRequestHeader::setProperties(const std::string& properties) {
- this->properties = properties;
-}
-
-int32_t ReplyMessageRequestHeader::getReconsumeTimes() const {
- return this->reconsumeTimes;
-}
-
-void ReplyMessageRequestHeader::setReconsumeTimes(int32_t reconsumeTimes) {
- this->reconsumeTimes = reconsumeTimes;
-}
-
-bool ReplyMessageRequestHeader::getUnitMode() const {
- return this->unitMode;
-}
-
-void ReplyMessageRequestHeader::setUnitMode(bool unitMode) {
- this->unitMode = unitMode;
-}
-
-const std::string& ReplyMessageRequestHeader::getBornHost() const {
- return this->bornHost;
-}
-
-void ReplyMessageRequestHeader::setBornHost(const std::string& bornHost) {
- this->bornHost = bornHost;
-}
-
-const std::string& ReplyMessageRequestHeader::getStoreHost() const {
- return this->storeHost;
-}
-
-void ReplyMessageRequestHeader::setStoreHost(const std::string& storeHost) {
- this->storeHost = storeHost;
-}
-
-int64_t ReplyMessageRequestHeader::getStoreTimestamp() const {
- return this->storeTimestamp;
-}
-
-void ReplyMessageRequestHeader::setStoreTimestamp(int64_t storeTimestamp) {
- this->storeTimestamp = storeTimestamp;
-}
-
-} // namespace rocketmq
diff --git a/src/protocol/header/ReplyMessageRequestHeader.h b/src/protocol/header/ReplyMessageRequestHeader.h
deleted file mode 100644
index 5fd341e..0000000
--- a/src/protocol/header/ReplyMessageRequestHeader.h
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __REPLY_MESSAGE_REQUEST_HEADER_H__
-#define __REPLY_MESSAGE_REQUEST_HEADER_H__
-
-#include <vector>
-
-#include "CommandCustomHeader.h"
-
-namespace rocketmq {
-
-class ReplyMessageRequestHeader : public CommandCustomHeader {
- public:
- static ReplyMessageRequestHeader* Decode(std::map<std::string, std::string>& extFields);
-
- const std::string& getProducerGroup() const;
- void setProducerGroup(const std::string& producerGroup);
-
- const std::string& getTopic() const;
- void setTopic(const std::string& topic);
-
- const std::string& getDefaultTopic() const;
- void setDefaultTopic(const std::string& defaultTopic);
-
- int32_t getDefaultTopicQueueNums() const;
- void setDefaultTopicQueueNums(int32_t defaultTopicQueueNums);
-
- int32_t getQueueId() const;
- void setQueueId(int32_t queueId);
-
- int32_t getSysFlag() const;
- void setSysFlag(int32_t sysFlag);
-
- int64_t getBornTimestamp() const;
- void setBornTimestamp(int64_t bornTimestamp);
-
- int32_t getFlag() const;
- void setFlag(int32_t flag);
-
- const std::string& getProperties() const;
- void setProperties(const std::string& properties);
-
- int32_t getReconsumeTimes() const;
- void setReconsumeTimes(int32_t reconsumeTimes);
-
- bool getUnitMode() const;
- void setUnitMode(bool unitMode);
-
- const std::string& getBornHost() const;
- void setBornHost(const std::string& bornHost);
-
- const std::string& getStoreHost() const;
- void setStoreHost(const std::string& storeHost);
-
- int64_t getStoreTimestamp() const;
- void setStoreTimestamp(int64_t stroeTimestamp);
-
- private:
- std::string producerGroup;
- std::string topic;
- std::string defaultTopic;
- int32_t defaultTopicQueueNums;
- int32_t queueId;
- int32_t sysFlag;
- int64_t bornTimestamp;
- int32_t flag;
- std::string properties; // nullable
- int32_t reconsumeTimes; // nullable
- bool unitMode; // nullable
-
- std::string bornHost;
- std::string storeHost;
- int64_t storeTimestamp;
-};
-
-} // namespace rocketmq
-
-#endif // __REPLY_MESSAGE_REQUEST_HEADER_H__
diff --git a/src/protocol/header/ReplyMessageRequestHeader.hpp b/src/protocol/header/ReplyMessageRequestHeader.hpp
new file mode 100644
index 0000000..3aaab7b
--- /dev/null
+++ b/src/protocol/header/ReplyMessageRequestHeader.hpp
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_PROTOCOL_HEADER_REPLY_MESSAGE_REQUEST_HEADER_HPP_
+#define ROCKETMQ_PROTOCOL_HEADER_REPLY_MESSAGE_REQUEST_HEADER_HPP_
+
+#include <vector>
+
+#include "CommandCustomHeader.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+class ReplyMessageRequestHeader : public CommandCustomHeader {
+ public:
+ static ReplyMessageRequestHeader* Decode(std::map<std::string, std::string>& extFields) {
+ std::unique_ptr<ReplyMessageRequestHeader> header(new ReplyMessageRequestHeader());
+
+ header->producer_group_ = extFields.at("producerGroup");
+ header->topic_ = extFields.at("topic");
+ header->default_topic_ = extFields.at("defaultTopic");
+ header->default_topic_queue_nums_ = std::stoi(extFields.at("defaultTopicQueueNums"));
+ header->queue_id_ = std::stoi(extFields.at("queueId"));
+ header->sys_flag_ = std::stoi(extFields.at("sysFlag"));
+ header->born_timestamp_ = std::stoll(extFields.at("bornTimestamp"));
+ header->flag_ = std::stoi(extFields.at("flag"));
+
+ auto it = extFields.find("properties");
+ if (it != extFields.end()) {
+ header->properties_ = it->second;
+ }
+
+ it = extFields.find("reconsumeTimes");
+ if (it != extFields.end()) {
+ header->reconsume_times_ = std::stoi(it->second);
+ } else {
+ header->reconsume_times_ = 0;
+ }
+
+ it = extFields.find("unitMode");
+ if (it != extFields.end()) {
+ header->unit_mode_ = UtilAll::stob(it->second);
+ } else {
+ header->unit_mode_ = false;
+ }
+
+ header->born_host_ = extFields.at("bornHost");
+ header->store_host_ = extFields.at("storeHost");
+ header->store_timestamp_ = std::stoll(extFields.at("storeTimestamp"));
+
+ return header.release();
+ }
+
+ public:
+ inline const std::string& producer_group() const { return this->producer_group_; }
+ inline void set_producer_group(const std::string& producerGroup) { this->producer_group_ = producerGroup; }
+
+ inline const std::string& topic() const { return this->topic_; }
+ inline void set_topic(const std::string& topic) { this->topic_ = topic; }
+
+ inline const std::string& default_topic() const { return this->default_topic_; }
+ inline void set_default_topic(const std::string& defaultTopic) { this->default_topic_ = defaultTopic; }
+
+ inline int32_t default_topic_queue_nums() const { return this->default_topic_queue_nums_; }
+ inline void set_default_topic_queue_nums(int32_t defaultTopicQueueNums) {
+ this->default_topic_queue_nums_ = defaultTopicQueueNums;
+ }
+
+ inline int32_t queue_id() const { return this->queue_id_; }
+ inline void set_queue_id(int32_t queueId) { this->queue_id_ = queueId; }
+
+ inline int32_t sys_flag() const { return this->sys_flag_; }
+ inline void set_sys_flag(int32_t sysFlag) { this->sys_flag_ = sysFlag; }
+
+ inline int64_t born_timestamp() const { return this->born_timestamp_; }
+ inline void set_born_timestamp(int64_t bornTimestamp) { this->born_timestamp_ = bornTimestamp; }
+
+ inline int32_t flag() const { return this->flag_; }
+ inline void set_flag(int32_t flag) { this->flag_ = flag; }
+
+ inline const std::string& properties() const { return this->properties_; }
+ inline void set_properties(const std::string& properties) { this->properties_ = properties; }
+
+ inline int32_t reconsume_times() const { return this->reconsume_times_; }
+ inline void set_reconsume_times(int32_t reconsumeTimes) { this->reconsume_times_ = reconsumeTimes; }
+
+ inline bool unit_mode() const { return this->unit_mode_; }
+ inline void set_unit_mode(bool unitMode) { this->unit_mode_ = unitMode; }
+
+ inline const std::string& born_host() const { return this->born_host_; }
+ inline void set_born_host(const std::string& bornHost) { this->born_host_ = bornHost; }
+
+ inline const std::string& store_host() const { return this->store_host_; }
+ inline void set_store_host(const std::string& storeHost) { this->store_host_ = storeHost; }
+
+ inline int64_t store_timestamp() const { return this->store_timestamp_; }
+ inline void set_store_timestamp(int64_t storeTimestamp) { this->store_timestamp_ = storeTimestamp; }
+
+ private:
+ std::string producer_group_;
+ std::string topic_;
+ std::string default_topic_;
+ int32_t default_topic_queue_nums_;
+ int32_t queue_id_;
+ int32_t sys_flag_;
+ int64_t born_timestamp_;
+ int32_t flag_;
+ std::string properties_; // nullable
+ int32_t reconsume_times_; // nullable
+ bool unit_mode_; // nullable
+
+ std::string born_host_;
+ std::string store_host_;
+ int64_t store_timestamp_;
+};
+
+} // namespace rocketmq
+
+#endif // ROCKETMQ_PROTOCOL_HEADER_REPLY_MESSAGE_REQUEST_HEADER_HPP_
diff --git a/src/protocol/heartbeat/ConsumerData.hpp b/src/protocol/heartbeat/ConsumerData.hpp
new file mode 100644
index 0000000..470d78b
--- /dev/null
+++ b/src/protocol/heartbeat/ConsumerData.hpp
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_PROTOCOL_HEARTBEAT_CONSUMERDATA_H_
+#define ROCKETMQ_PROTOCOL_HEARTBEAT_CONSUMERDATA_H_
+
+#include <algorithm> // std::move
+#include <string> // std::string
+#include <vector> // std::vector
+
+#include <json/json.h>
+
+#include "ConsumeType.h"
+#include "SubscriptionData.hpp"
+
+namespace rocketmq {
+
+class ConsumerData {
+ public:
+ ConsumerData(const std::string& group_name,
+ ConsumeType consume_type,
+ MessageModel message_model,
+ ConsumeFromWhere consume_from_where,
+ std::vector<SubscriptionData>&& subscription_data_set)
+ : group_name_(group_name),
+ consume_type_(consume_type),
+ message_model_(message_model),
+ consume_from_where_(consume_from_where),
+ subscription_data_set_(std::move(subscription_data_set)) {}
+
+ bool operator<(const ConsumerData& other) const { return group_name_ < other.group_name_; }
+
+ Json::Value toJson() const {
+ Json::Value root;
+ root["groupName"] = group_name_;
+ root["consumeType"] = consume_type_;
+ root["messageModel"] = message_model_;
+ root["consumeFromWhere"] = consume_from_where_;
+
+ for (const auto& sd : subscription_data_set_) {
+ root["subscriptionDataSet"].append(sd.toJson());
+ }
+
+ return root;
+ }
+
+ public:
+ inline const std::string& group_name() const { return group_name_; }
+ inline void set_group_name(const std::string& group_name) { group_name_ = group_name; }
+
+ inline ConsumeType consume_type() const { return consume_type_; }
+ inline void set_consume_type(ConsumeType consume_type) { consume_type_ = consume_type; }
+
+ inline MessageModel message_model() const { return message_model_; }
+
+ inline ConsumeFromWhere consume_from_where() const { return consume_from_where_; }
+
+ inline std::vector<SubscriptionData> subscription_data_set() { return subscription_data_set_; }
+
+ private:
+ std::string group_name_;
+ ConsumeType consume_type_;
+ MessageModel message_model_;
+ ConsumeFromWhere consume_from_where_;
+ std::vector<SubscriptionData> subscription_data_set_;
+};
+
+} // namespace rocketmq
+
+#endif // ROCKETMQ_PROTOCOL_HEARTBEAT_CONSUMERDATA_H_
diff --git a/src/protocol/heartbeat/HeartbeatData.hpp b/src/protocol/heartbeat/HeartbeatData.hpp
new file mode 100644
index 0000000..f4af3ca
--- /dev/null
+++ b/src/protocol/heartbeat/HeartbeatData.hpp
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ROCKETMQ_PROTOCOL_HEARTBEAT_HEARTBEATDATA_H_
+#define ROCKETMQ_PROTOCOL_HEARTBEAT_HEARTBEATDATA_H_
+
+#include <string> // std::string
+#include <vector> // std::vector
+
+#include "RemotingSerializable.h"
+#include "ConsumerData.hpp"
+#include "ProducerData.hpp"
+
+namespace rocketmq {
+
+class HeartbeatData : public RemotingSerializable {
+ public:
+ std::string encode() {
+ Json::Value root;
+
+ // id
+ root["clientID"] = client_id_;
+
+ // consumer
+ for (const auto& cd : consumer_data_set_) {
+ root["consumerDataSet"].append(cd.toJson());
+ }
+
+ // producer
+ for (const auto& pd : producer_data_set_) {
+ root["producerDataSet"].append(pd.toJson());
+ }
+
+ // output
+ return RemotingSerializable::toJson(root);
+ }
+
+ public:
+ inline void set_client_id(const std::string& clientID) { client_id_ = clientID; }
+
+ inline std::vector<ConsumerData>& consumer_data_set() { return consumer_data_set_; }
+
+ inline std::vector<ProducerData>& producer_data_set() { return producer_data_set_; }
+
+ private:
+ std::string client_id_;
+ std::vector<ConsumerData> consumer_data_set_;
+ std::vector<ProducerData> producer_data_set_;
+};
+
+} // namespace rocketmq
+
+#endif // ROCKETMQ_PROTOCOL_HEARTBEAT_HEARTBEATDATA_H_
diff --git a/src/protocol/body/ResetOffsetBody.h b/src/protocol/heartbeat/ProducerData.hpp
similarity index 55%
rename from src/protocol/body/ResetOffsetBody.h
rename to src/protocol/heartbeat/ProducerData.hpp
index f1024b9..751a87f 100644
--- a/src/protocol/body/ResetOffsetBody.h
+++ b/src/protocol/heartbeat/ProducerData.hpp
@@ -14,27 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef __RESET_OFFSET_BODY__
-#define __RESET_OFFSET_BODY__
+#ifndef ROCKETMQ_PROTOCOL_HEARTBEAT_PRODUCERDATA_H_
+#define ROCKETMQ_PROTOCOL_HEARTBEAT_PRODUCERDATA_H_
-#include <map> // std::map
+#include <string> // std::string
-#include "ByteArray.h"
-#include "MQMessageQueue.h"
+#include <json/json.h>
namespace rocketmq {
-class ResetOffsetBody {
+class ProducerData {
public:
- static ResetOffsetBody* Decode(const ByteArray& bodyData);
+ ProducerData(const std::string& group_name) : group_name_(group_name) {}
- std::map<MQMessageQueue, int64_t> getOffsetTable();
- void setOffsetTable(const MQMessageQueue& mq, int64_t offset);
+ bool operator<(const ProducerData& other) const { return group_name_ < other.group_name_; }
+
+ Json::Value toJson() const {
+ Json::Value root;
+ root["groupName"] = group_name_;
+ return root;
+ }
+
+ public:
+ inline const std::string& group_name() const { return group_name_; }
+ inline void group_name(const std::string& group_name) { group_name_ = group_name; }
private:
- std::map<MQMessageQueue, int64_t> offset_table_;
+ std::string group_name_;
};
} // namespace rocketmq
-#endif // __RESET_OFFSET_BODY__
+#endif // ROCKETMQ_PROTOCOL_HEARTBEAT_PRODUCERDATA_H_
diff --git a/src/common/SubscriptionData.h b/src/protocol/heartbeat/SubscriptionData.hpp
similarity index 50%
rename from src/common/SubscriptionData.h
rename to src/protocol/heartbeat/SubscriptionData.hpp
index 277403c..82f8334 100644
--- a/src/common/SubscriptionData.h
+++ b/src/protocol/heartbeat/SubscriptionData.hpp
@@ -14,32 +14,72 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef ROCKETMQ_SUBSCRIPTIONDATA_H_
-#define ROCKETMQ_SUBSCRIPTIONDATA_H_
+#ifndef ROCKETMQ_PROTOCOL_HEARTBEAT_SUBSCRIPTIONDATA_HPP_
+#define ROCKETMQ_PROTOCOL_HEARTBEAT_SUBSCRIPTIONDATA_HPP_
-#include <json/json.h>
+#include <cstdint> // int64_t
+
+#include <string> // std::string
+#include <vector> // std::vector
-#include <string>
-#include <vector>
+#include <json/json.h>
-#include "RocketMQClient.h"
+#include "UtilAll.h"
namespace rocketmq {
-class ROCKETMQCLIENT_API SubscriptionData {
+class SubscriptionData {
public:
- SubscriptionData();
- SubscriptionData(const std::string& topic, const std::string& subString);
- SubscriptionData(const SubscriptionData& other);
+ SubscriptionData() : sub_version_(UtilAll::currentTimeMillis()) {}
+ SubscriptionData(const std::string& topic, const std::string& subString)
+ : topic_(topic), sub_string_(subString), sub_version_(UtilAll::currentTimeMillis()) {}
+
+ SubscriptionData(const SubscriptionData& other) {
+ sub_string_ = other.sub_string_;
+ sub_version_ = other.sub_version_;
+ tag_set_ = other.tag_set_;
+ topic_ = other.topic_;
+ code_set_ = other.code_set_;
+ }
virtual ~SubscriptionData() = default;
- bool operator==(const SubscriptionData& other) const;
+ bool operator==(const SubscriptionData& other) const {
+ // FIXME: tags
+ return topic_ == other.topic_ && sub_string_ == other.sub_string_ && sub_version_ == other.sub_version_ &&
+ tag_set_.size() == other.tag_set_.size();
+ }
bool operator!=(const SubscriptionData& other) const { return !operator==(other); }
- bool operator<(const SubscriptionData& other) const;
+ bool operator<(const SubscriptionData& other) const {
+ int ret = topic_.compare(other.topic_);
+ if (ret == 0) {
+ return sub_string_.compare(other.sub_string_) < 0;
+ } else {
+ return ret < 0;
+ }
+ }
+
+ inline bool containsTag(const std::string& tag) const {
+ return std::find(tag_set_.begin(), tag_set_.end(), tag) != tag_set_.end();
+ }
- Json::Value toJson() const;
+ Json::Value toJson() const {
+ Json::Value root;
+ root["topic"] = topic_;
+ root["subString"] = sub_string_;
+ root["subVersion"] = UtilAll::to_string(sub_version_);
+
+ for (const auto& tag : tag_set_) {
+ root["tagsSet"].append(tag);
+ }
+
+ for (const auto& code : code_set_) {
+ root["codeSet"].append(code);
+ }
+
+ return root;
+ }
public:
inline const std::string& topic() const { return topic_; }
@@ -51,13 +91,7 @@ class ROCKETMQCLIENT_API SubscriptionData {
inline std::vector<std::string>& tags_set() { return tag_set_; }
- inline void put_tag(const std::string& tag) { tag_set_.push_back(tag); }
-
- inline bool contain_tag(const std::string& tag) const {
- return std::find(tag_set_.begin(), tag_set_.end(), tag) != tag_set_.end();
- }
-
- inline void put_code(int32_t code) { code_set_.push_back(code); }
+ inline std::vector<int32_t>& code_set() { return code_set_; }
private:
std::string topic_;
@@ -69,4 +103,4 @@ class ROCKETMQCLIENT_API SubscriptionData {
} // namespace rocketmq
-#endif // ROCKETMQ_SUBSCRIPTIONDATA_H_
+#endif // ROCKETMQ_PROTOCOL_HEARTBEAT_SUBSCRIPTIONDATA_HPP_
diff --git a/test/src/protocol/ConsumerRunningInfoTest.cpp b/test/src/protocol/ConsumerRunningInfoTest.cpp
index 7c427fd..7c3dc8f 100644
--- a/test/src/protocol/ConsumerRunningInfoTest.cpp
+++ b/test/src/protocol/ConsumerRunningInfoTest.cpp
@@ -24,9 +24,6 @@
#include <string>
#include "ConsumerRunningInfo.h"
-#include "MessageQueue.h"
-#include "ProcessQueueInfo.h"
-#include "SubscriptionData.h"
using std::map;
using std::string;
diff --git a/test/src/protocol/HeartbeatDataTest.cpp b/test/src/protocol/HeartbeatDataTest.cpp
index a1e5393..87f76b2 100644
--- a/test/src/protocol/HeartbeatDataTest.cpp
+++ b/test/src/protocol/HeartbeatDataTest.cpp
@@ -19,11 +19,7 @@
#include <vector>
-#include "ConsumeType.h"
-#include "HeartbeatData.h"
-#include "SubscriptionData.h"
-
-using std::vector;
+#include "protocol/heartbeat/HeartbeatData.hpp"
using testing::InitGoogleMock;
using testing::InitGoogleTest;
@@ -38,24 +34,16 @@ using rocketmq::ProducerData;
using rocketmq::SubscriptionData;
TEST(HeartbeatDataTest, ProducerData) {
- ProducerData producerData;
- producerData.groupName = "testGroup";
+ ProducerData producerData("testGroup");
Json::Value outJson = producerData.toJson();
EXPECT_EQ(outJson["groupName"], "testGroup");
}
TEST(HeartbeatDataTest, ConsumerData) {
- ConsumerData consumerData;
- consumerData.groupName = "testGroup";
- consumerData.consumeType = ConsumeType::CONSUME_ACTIVELY;
- consumerData.messageModel = MessageModel::BROADCASTING;
- consumerData.consumeFromWhere = ConsumeFromWhere::CONSUME_FROM_TIMESTAMP;
-
- vector<SubscriptionData> subs;
- subs.push_back(SubscriptionData("testTopic", "sub"));
-
- consumerData.subscriptionDataSet = subs;
+ ConsumerData consumerData("testGroup", ConsumeType::CONSUME_ACTIVELY, MessageModel::BROADCASTING,
+ ConsumeFromWhere::CONSUME_FROM_TIMESTAMP,
+ std::vector<SubscriptionData>{SubscriptionData("testTopic", "sub")});
Json::Value outJson = consumerData.toJson();
@@ -72,28 +60,17 @@ TEST(HeartbeatDataTest, ConsumerData) {
TEST(HeartbeatDataTest, HeartbeatData) {
HeartbeatData heartbeatData;
- heartbeatData.setClientID("testClientId");
-
- ProducerData producerData;
- producerData.groupName = "testGroup";
-
- EXPECT_TRUE(heartbeatData.isProducerDataSetEmpty());
- heartbeatData.insertDataToProducerDataSet(producerData);
- EXPECT_FALSE(heartbeatData.isProducerDataSetEmpty());
-
- ConsumerData consumerData;
- consumerData.groupName = "testGroup";
- consumerData.consumeType = ConsumeType::CONSUME_ACTIVELY;
- consumerData.messageModel = MessageModel::BROADCASTING;
- consumerData.consumeFromWhere = ConsumeFromWhere::CONSUME_FROM_TIMESTAMP;
+ heartbeatData.set_client_id("testClientId");
- vector<SubscriptionData> subs;
- subs.push_back(SubscriptionData("testTopic", "sub"));
+ EXPECT_TRUE(heartbeatData.producer_data_set().empty());
+ heartbeatData.producer_data_set().emplace_back("testGroup");
+ EXPECT_FALSE(heartbeatData.producer_data_set().empty());
- consumerData.subscriptionDataSet = subs;
- EXPECT_TRUE(heartbeatData.isConsumerDataSetEmpty());
- heartbeatData.insertDataToConsumerDataSet(consumerData);
- EXPECT_FALSE(heartbeatData.isConsumerDataSetEmpty());
+ EXPECT_TRUE(heartbeatData.consumer_data_set().empty());
+ heartbeatData.consumer_data_set().emplace_back("testGroup", ConsumeType::CONSUME_ACTIVELY, MessageModel::BROADCASTING,
+ ConsumeFromWhere::CONSUME_FROM_TIMESTAMP,
+ std::vector<SubscriptionData>{SubscriptionData("testTopic", "sub")});
+ EXPECT_FALSE(heartbeatData.consumer_data_set().empty());
std::string outData = heartbeatData.encode();
diff --git a/test/src/protocol/LockBatchBodyTest.cpp b/test/src/protocol/LockBatchBodyTest.cpp
index 64da9fb..8536489 100644
--- a/test/src/protocol/LockBatchBodyTest.cpp
+++ b/test/src/protocol/LockBatchBodyTest.cpp
@@ -21,7 +21,9 @@
#include "ByteArray.h"
#include "MQMessageQueue.h"
-#include "protocol/body/LockBatchBody.h"
+#include "protocol/body/LockBatchRequestBody.hpp"
+#include "protocol/body/LockBatchResponseBody.hpp"
+#include "protocol/body/UnlockBatchRequestBody.hpp"
using testing::InitGoogleMock;
using testing::InitGoogleTest;
@@ -36,18 +38,18 @@ using rocketmq::UnlockBatchRequestBody;
TEST(LockBatchBodyTest, LockBatchRequestBody) {
LockBatchRequestBody lockBatchRequestBody;
- lockBatchRequestBody.setClientId("testClientId");
- EXPECT_EQ(lockBatchRequestBody.getClientId(), "testClientId");
+ lockBatchRequestBody.set_client_id("testClientId");
+ EXPECT_EQ(lockBatchRequestBody.client_id(), "testClientId");
- lockBatchRequestBody.setConsumerGroup("testGroup");
- EXPECT_EQ(lockBatchRequestBody.getConsumerGroup(), "testGroup");
+ lockBatchRequestBody.set_consumer_group("testGroup");
+ EXPECT_EQ(lockBatchRequestBody.consumer_group(), "testGroup");
std::vector<MQMessageQueue> messageQueueList;
messageQueueList.push_back(MQMessageQueue("testTopic", "testBroker", 1));
messageQueueList.push_back(MQMessageQueue("testTopic", "testBroker", 2));
- lockBatchRequestBody.setMqSet(messageQueueList);
- EXPECT_EQ(lockBatchRequestBody.getMqSet(), messageQueueList);
+ lockBatchRequestBody.set_mq_set(messageQueueList);
+ EXPECT_EQ(lockBatchRequestBody.mq_set(), messageQueueList);
std::string outData = lockBatchRequestBody.encode();
@@ -81,7 +83,7 @@ TEST(LockBatchBodyTest, LockBatchResponseBody) {
std::unique_ptr<LockBatchResponseBody> lockBatchResponseBody(LockBatchResponseBody::Decode(bodyData));
MQMessageQueue messageQueue("testTopic", "testBroker", 1);
- EXPECT_EQ(messageQueue, lockBatchResponseBody->getLockOKMQSet()[0]);
+ EXPECT_EQ(messageQueue, lockBatchResponseBody->lock_ok_mq_set()[0]);
}
int main(int argc, char* argv[]) {
diff --git a/test/src/protocol/MessageQueueTest.cpp b/test/src/protocol/MessageQueueTest.cpp
index 0f6f5fc..072b4f2 100644
--- a/test/src/protocol/MessageQueueTest.cpp
+++ b/test/src/protocol/MessageQueueTest.cpp
@@ -17,7 +17,7 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-#include "MessageQueue.h"
+#include "MessageQueue.hpp"
using testing::InitGoogleMock;
using testing::InitGoogleTest;
diff --git a/test/src/protocol/TopicRouteDataTest.cpp b/test/src/protocol/TopicRouteDataTest.cpp
index cb70c10..ef94cdb 100644
--- a/test/src/protocol/TopicRouteDataTest.cpp
+++ b/test/src/protocol/TopicRouteDataTest.cpp
@@ -20,7 +20,7 @@
#include <memory>
#include "ByteArray.h"
-#include "TopicRouteData.hpp"
+#include "protocol/body/TopicRouteData.hpp"
using testing::InitGoogleMock;
using testing::InitGoogleTest;
diff --git a/test/src/transport/ClientRemotingProcessorTest.cpp b/test/src/transport/ClientRemotingProcessorTest.cpp
index 34f7c6a..1a840d9 100644
--- a/test/src/transport/ClientRemotingProcessorTest.cpp
+++ b/test/src/transport/ClientRemotingProcessorTest.cpp
@@ -34,7 +34,7 @@
#include "SessionCredentials.h"
#include "TcpTransport.h"
#include "UtilAll.h"
-#include "protocol/body/ResetOffsetBody.h"
+#include "protocol/body/ResetOffsetBody.hpp"
#include "protocol/header/CommandHeader.h"
using testing::_;