You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/04/26 02:17:25 UTC

[rocketmq-client-cpp] branch master updated: add pull RT and TPS status. (#305)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 79c278c  add pull RT and TPS status. (#305)
79c278c is described below

commit 79c278cbad0b2dfaf42a1d7e79b12a57f718c73c
Author: dinglei <li...@163.com>
AuthorDate: Sun Apr 26 10:17:17 2020 +0800

    add pull RT and TPS status. (#305)
---
 src/consumer/ConsumeMessageConcurrentlyService.cpp | 12 ++++--------
 src/consumer/DefaultMQPushConsumerImpl.cpp         | 20 +++++++++++++++++---
 src/status/StatsServer.cpp                         | 13 ++++++++++++-
 src/status/StatsServer.h                           |  1 +
 4 files changed, 34 insertions(+), 12 deletions(-)

diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index c341b28..ac62659 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -256,17 +256,13 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
     }
     case CLUSTERING: {
       // status consumer tps
-      int okCount = 0;
-      int failedCount = 0;
       if (ackIndex == -1) {
-        failedCount = msgs.size();
+        StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeFailedTPS(
+            request->m_messageQueue.getTopic(), groupName, msgs.size());
       } else {
-        okCount = msgs.size();
+        StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeOKTPS(request->m_messageQueue.getTopic(),
+                                                                                   groupName, msgs.size());
       }
-      StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeOKTPS(request->m_messageQueue.getTopic(),
-                                                                                 groupName, okCount);
-      StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeFailedTPS(request->m_messageQueue.getTopic(),
-                                                                                     groupName, failedCount);
 
       // send back msg to broker;
       for (size_t i = ackIndex + 1; i < msgs.size(); i++) {
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index b114456..1aa0b76 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -65,9 +65,16 @@ class AsyncPullCallback : public PullCallback {
           LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str());
           break;
         }
+
+        uint64 pullRT = UtilAll::currentTimeMillis() - pullRequest->getLastPullTimestamp();
+        StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(
+            pullRequest->m_messageQueue.getTopic(), m_callbackOwner->getGroupName(), pullRT);
         pullRequest->setNextOffset(result.nextBeginOffset);
         pullRequest->putMessage(result.msgFoundList);
-
+        if (!result.msgFoundList.empty()) {
+          StatsServerManager::getInstance()->getConsumeStatServer()->incPullTPS(
+              pullRequest->m_messageQueue.getTopic(), m_callbackOwner->getGroupName(), result.msgFoundList.size());
+        }
         m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(pullRequest, result.msgFoundList);
 
         if (bProducePullRequest) {
@@ -686,7 +693,8 @@ void DefaultMQPushConsumerImpl::pullMessage(boost::weak_ptr<PullRequest> pullReq
     return;
   }
   try {
-    request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+    uint64 startTimeStamp = UtilAll::currentTimeMillis();
+    request->setLastPullTimestamp(startTimeStamp);
     unique_ptr<PullResult> result(m_pPullAPIWrapper->pullKernelImpl(messageQueue,              // 1
                                                                     subExpression,             // 2
                                                                     pSdata->getSubVersion(),   // 3
@@ -702,6 +710,9 @@ void DefaultMQPushConsumerImpl::pullMessage(boost::weak_ptr<PullRequest> pullReq
     PullResult pullResult = m_pPullAPIWrapper->processPullResult(messageQueue, result.get(), pSdata);
     switch (pullResult.pullStatus) {
       case FOUND: {
+        uint64 pullRT = UtilAll::currentTimeMillis() - startTimeStamp;
+        StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(messageQueue.getTopic(), getGroupName(),
+                                                                                pullRT);
         if (request->isDropped()) {
           LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s",
                    messageQueue.toString().c_str());
@@ -710,7 +721,10 @@ void DefaultMQPushConsumerImpl::pullMessage(boost::weak_ptr<PullRequest> pullReq
         // and this request is dropped, and then received pulled msgs.
         request->setNextOffset(pullResult.nextBeginOffset);
         request->putMessage(pullResult.msgFoundList);
-
+        if (!pullResult.msgFoundList.empty()) {
+          StatsServerManager::getInstance()->getConsumeStatServer()->incPullTPS(messageQueue.getTopic(), getGroupName(),
+                                                                                pullResult.msgFoundList.size());
+        }
         m_consumerService->submitConsumeRequest(request, pullResult.msgFoundList);
         producePullMsgTask(request);
 
diff --git a/src/status/StatsServer.cpp b/src/status/StatsServer.cpp
index fd1c0ec..0ac3063 100644
--- a/src/status/StatsServer.cpp
+++ b/src/status/StatsServer.cpp
@@ -132,6 +132,17 @@ void StatsServer::incConsumeFailedTPS(std::string topic, std::string groupName,
     m_consumeStatsItems[key] = item;
   }
   m_consumeStatsItems[key].consumeFailedCount += msgCount;
+  m_consumeStatsItems[key].consumeFailedMsgs += msgCount;
+}
+void StatsServer::incConsumeFailedMsgs(std::string topic, std::string groupName, uint64 msgCount) {
+  std::string key = topic + "@" + groupName;
+  LOG_DEBUG("incConsumeFailedTPS Key:%s, Count: %lld", key.c_str(), msgCount);
+  std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
+  if (m_consumeStatsItems.find(key) == m_consumeStatsItems.end()) {
+    StatsItem item;
+    m_consumeStatsItems[key] = item;
+  }
+  m_consumeStatsItems[key].consumeFailedMsgs += msgCount;
 }
 void StatsServer::startScheduledTask() {
   m_consumer_status_service_thread.reset(new boost::thread(boost::bind(&StatsServer::doStartScheduledTask, this)));
@@ -193,7 +204,7 @@ void StatsServer::samplingInSeconds() {
     LOG_DEBUG("samplingInSeconds Key[%s], consumeFailedTPS:%.2f, Count: %lld", it->first.c_str(),
               consumeStats.consumeFailedTPS, it->second.consumeFailedCount);
     consumeStats.consumeFailedMsgs = it->second.consumeFailedMsgs;
-    it->second.consumeFailedMsgs = 0;
+    // it->second.consumeFailedMsgs = 0;
     updateConsumeStats(it->first, consumeStats);
   }
 }
diff --git a/src/status/StatsServer.h b/src/status/StatsServer.h
index 11f2d67..e105f23 100644
--- a/src/status/StatsServer.h
+++ b/src/status/StatsServer.h
@@ -54,6 +54,7 @@ class StatsServer {
   void incConsumeRT(std::string topic, std::string groupName, uint64 rt, uint64 msgCount = 1);
   void incConsumeOKTPS(std::string topic, std::string groupName, uint64 msgCount);
   void incConsumeFailedTPS(std::string topic, std::string groupName, uint64 msgCount);
+  void incConsumeFailedMsgs(std::string topic, std::string groupName, uint64 msgCount);
 
  private:
   void startScheduledTask();