You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/11/11 01:46:49 UTC

[rocketmq-client-cpp] branch master updated: fixed typo bug in DefaultMQPushConsumer::getConsumerRunningInfo. (#178)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new b1b2d5b  fixed typo bug in DefaultMQPushConsumer::getConsumerRunningInfo. (#178)
b1b2d5b is described below

commit b1b2d5b5b63ec3db6ad61475da0dac3dbe142b4e
Author: James Yin <yw...@hotmail.com>
AuthorDate: Mon Nov 11 09:46:42 2019 +0800

    fixed typo bug in DefaultMQPushConsumer::getConsumerRunningInfo. (#178)
---
 src/consumer/DefaultMQPushConsumer.cpp | 68 ++++++++++++++++------------------
 1 file changed, 32 insertions(+), 36 deletions(-)

diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 0f51ea1..caa539e 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -855,44 +855,40 @@ int DefaultMQPushConsumer::getMaxCacheMsgSizePerQueue() const {
 }
 
 ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo() {
-  ConsumerRunningInfo* info = new ConsumerRunningInfo();
-  if (info) {
-    if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly)
-      info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
-    else
-      info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "flase");
-    info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE, UtilAll::to_string(m_consumeThreadCount));
-    info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, UtilAll::to_string(m_startTime));
-
-    vector<SubscriptionData> result;
-    getSubscriptions(result);
-    info->setSubscriptionSet(result);
-
-    map<MQMessageQueue, PullRequest*> requestTable = m_pRebalance->getPullRequestTable();
-    map<MQMessageQueue, PullRequest*>::iterator it = requestTable.begin();
-
-    for (; it != requestTable.end(); ++it) {
-      if (!it->second->isDroped()) {
-        map<MessageQueue, ProcessQueueInfo> queueTable;
-        MessageQueue queue((it->first).getTopic(), (it->first).getBrokerName(), (it->first).getQueueId());
-        ProcessQueueInfo processQueue;
-        processQueue.cachedMsgMinOffset = it->second->getCacheMinOffset();
-        processQueue.cachedMsgMaxOffset = it->second->getCacheMaxOffset();
-        processQueue.cachedMsgCount = it->second->getCacheMsgCount();
-        processQueue.setCommitOffset(
-            m_pOffsetStore->readOffset(it->first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));
-        processQueue.setDroped(it->second->isDroped());
-        processQueue.setLocked(it->second->isLocked());
-        processQueue.lastLockTimestamp = it->second->getLastLockTimestamp();
-        processQueue.lastPullTimestamp = it->second->getLastPullTimestamp();
-        processQueue.lastConsumeTimestamp = it->second->getLastConsumeTimestamp();
-        info->setMqTable(queue, processQueue);
-      }
+  auto* info = new ConsumerRunningInfo();
+  if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) {
+    info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "true");
+  } else {
+    info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "false");
+  }
+  info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE, UtilAll::to_string(m_consumeThreadCount));
+  info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, UtilAll::to_string(m_startTime));
+
+  std::vector<SubscriptionData> result;
+  getSubscriptions(result);
+  info->setSubscriptionSet(result);
+
+  std::map<MQMessageQueue, PullRequest*> requestTable = m_pRebalance->getPullRequestTable();
+
+  for (const auto& it : requestTable) {
+    if (!it.second->isDroped()) {
+      MessageQueue queue((it.first).getTopic(), (it.first).getBrokerName(), (it.first).getQueueId());
+      ProcessQueueInfo processQueue;
+      processQueue.cachedMsgMinOffset = it.second->getCacheMinOffset();
+      processQueue.cachedMsgMaxOffset = it.second->getCacheMaxOffset();
+      processQueue.cachedMsgCount = it.second->getCacheMsgCount();
+      processQueue.setCommitOffset(
+          m_pOffsetStore->readOffset(it.first, MEMORY_FIRST_THEN_STORE, getSessionCredentials()));
+      processQueue.setDroped(it.second->isDroped());
+      processQueue.setLocked(it.second->isLocked());
+      processQueue.lastLockTimestamp = it.second->getLastLockTimestamp();
+      processQueue.lastPullTimestamp = it.second->getLastPullTimestamp();
+      processQueue.lastConsumeTimestamp = it.second->getLastConsumeTimestamp();
+      info->setMqTable(queue, processQueue);
     }
-
-    return info;
   }
-  return NULL;
+
+  return info;
 }
 
 //<!************************************************************************