You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/11/11 01:46:49 UTC
[rocketmq-client-cpp] branch master updated: fixed typo bug in
DefaultMQPushConsumer::getConsumerRunningInfo. (#178)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new b1b2d5b fixed typo bug in DefaultMQPushConsumer::getConsumerRunningInfo. (#178)
b1b2d5b is described below
commit b1b2d5b5b63ec3db6ad61475da0dac3dbe142b4e
Author: James Yin <yw...@hotmail.com>
AuthorDate: Mon Nov 11 09:46:42 2019 +0800
fixed typo bug in DefaultMQPushConsumer::getConsumerRunningInfo. (#178)
---
src/consumer/DefaultMQPushConsumer.cpp | 68 ++++++++++++++++------------------
1 file changed, 32 insertions(+), 36 deletions(-)
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 0f51ea1..caa539e 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -855,44 +855,40 @@ int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {
}
ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() {
- ConsumerRunningInfo* info = new ConsumerRunningInfo();
- if (info) {
- if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly)
- info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
- else
- info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase");
- info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE, UtilAll::to_string(m_consumeThreadCount));
- info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, UtilAll::to_string(m_startTime));
-
- vector<SubscriptionData> result;
- getSubscriptions(result);
- info->setSubscriptionSet(result);
-
- map<MQMessageQueue, PullRequest*> requestTable = m_pRebalance->getPullRequestTable();
- map<MQMessageQueue, PullRequest*>::iterator it = requestTable.begin();
-
- for (; it != requestTable.end(); ++it) {
- if (!it->second->isDroped()) {
- map<MessageQueue, ProcessQueueInfo> queueTable;
- MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(), (it->first).getQueueId());
- ProcessQueueInfo processQueue;
- processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset();
- processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset();
- processQueue.cachedMsgCount = it->second->getCacheMsgCount();
- processQueue.setCommitOffset(
- m_pOffsetStore->readOffset(it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));
- processQueue.setDroped(it->second->isDroped());
- processQueue.setLocked(it->second->isLocked());
- processQueue.lastLockTimestamp = it->second->getLastLockTimestamp();
- processQueue.lastPullTimestamp = it->second->getLastPullTimestamp();
- processQueue.lastConsumeTimestamp = it->second->getLastConsumeTimestamp();
- info->setMqTable(queue, processQueue);
- }
+ auto* info = new ConsumerRunningInfo();
+ if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) {
+ info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
+ } else {
+ info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "false");
+ }
+ info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE, UtilAll::to_string(m_consumeThreadCount));
+ info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, UtilAll::to_string(m_startTime));
+
+ std::vector<SubscriptionData> result;
+ getSubscriptions(result);
+ info->setSubscriptionSet(result);
+
+ std::map<MQMessageQueue, PullRequest*> requestTable = m_pRebalance->getPullRequestTable();
+
+ for (const auto& it : requestTable) {
+ if (!it.second->isDroped()) {
+ MessageQueue queue((it.first).getTopic(), (it.first).getBrokerName(), (it.first).getQueueId());
+ ProcessQueueInfo processQueue;
+ processQueue.cachedMsgMinOffset = it.second->getCacheMinOffset();
+ processQueue.cachedMsgMaxOffset = it.second->getCacheMaxOffset();
+ processQueue.cachedMsgCount = it.second->getCacheMsgCount();
+ processQueue.setCommitOffset(
+ m_pOffsetStore->readOffset(it.first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));
+ processQueue.setDroped(it.second->isDroped());
+ processQueue.setLocked(it.second->isLocked());
+ processQueue.lastLockTimestamp = it.second->getLastLockTimestamp();
+ processQueue.lastPullTimestamp = it.second->getLastPullTimestamp();
+ processQueue.lastConsumeTimestamp = it.second->getLastConsumeTimestamp();
+ info->setMqTable(queue, processQueue);
}
-
- return info;
}
- return NULL;
+
+ return info;
}
//<!************************************************************************