You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/04/26 02:17:25 UTC
[rocketmq-client-cpp] branch master updated: add pull RT and TPS
status. (#305)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 79c278c add pull RT and TPS status. (#305)
79c278c is described below
commit 79c278cbad0b2dfaf42a1d7e79b12a57f718c73c
Author: dinglei <li...@163.com>
AuthorDate: Sun Apr 26 10:17:17 2020 +0800
add pull RT and TPS status. (#305)
---
src/consumer/ConsumeMessageConcurrentlyService.cpp | 12 ++++--------
src/consumer/DefaultMQPushConsumerImpl.cpp | 20 +++++++++++++++++---
src/status/StatsServer.cpp | 13 ++++++++++++-
src/status/StatsServer.h | 1 +
4 files changed, 34 insertions(+), 12 deletions(-)
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index c341b28..ac62659 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -256,17 +256,13 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
}
case CLUSTERING: {
// status consumer tps
- int okCount = 0;
- int failedCount = 0;
if (ackIndex == -1) {
- failedCount = msgs.size();
+ StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeFailedTPS(
+ request->m_messageQueue.getTopic(), groupName, msgs.size());
} else {
- okCount = msgs.size();
+ StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeOKTPS(request->m_messageQueue.getTopic(),
+ groupName, msgs.size());
}
- StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeOKTPS(request->m_messageQueue.getTopic(),
- groupName, okCount);
- StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeFailedTPS(request->m_messageQueue.getTopic(),
- groupName, failedCount);
// send back msg to broker;
for (size_t i = ackIndex + 1; i < msgs.size(); i++) {
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index b114456..1aa0b76 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -65,9 +65,16 @@ class AsyncPullCallback : public PullCallback {
LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str());
break;
}
+
+ uint64 pullRT = UtilAll::currentTimeMillis() - pullRequest->getLastPullTimestamp();
+ StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(
+ pullRequest->m_messageQueue.getTopic(), m_callbackOwner->getGroupName(), pullRT);
pullRequest->setNextOffset(result.nextBeginOffset);
pullRequest->putMessage(result.msgFoundList);
-
+ if (!result.msgFoundList.empty()) {
+ StatsServerManager::getInstance()->getConsumeStatServer()->incPullTPS(
+ pullRequest->m_messageQueue.getTopic(), m_callbackOwner->getGroupName(), result.msgFoundList.size());
+ }
m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(pullRequest, result.msgFoundList);
if (bProducePullRequest) {
@@ -686,7 +693,8 @@ void DefaultMQPushConsumerImpl::pullMessage(boost::weak_ptr<PullRequest> pullReq
return;
}
try {
- request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+ uint64 startTimeStamp = UtilAll::currentTimeMillis();
+ request->setLastPullTimestamp(startTimeStamp);
unique_ptr<PullResult> result(m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1
subExpression, // 2
pSdata->getSubVersion(), // 3
@@ -702,6 +710,9 @@ void DefaultMQPushConsumerImpl::pullMessage(boost::weak_ptr<PullRequest> pullReq
PullResult pullResult = m_pPullAPIWrapper->processPullResult(messageQueue, result.get(), pSdata);
switch (pullResult.pullStatus) {
case FOUND: {
+ uint64 pullRT = UtilAll::currentTimeMillis() - startTimeStamp;
+ StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(messageQueue.getTopic(), getGroupName(),
+ pullRT);
if (request->isDropped()) {
LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s",
messageQueue.toString().c_str());
@@ -710,7 +721,10 @@ void DefaultMQPushConsumerImpl::pullMessage(boost::weak_ptr<PullRequest> pullReq
// and this request is dropped, and then received pulled msgs.
request->setNextOffset(pullResult.nextBeginOffset);
request->putMessage(pullResult.msgFoundList);
-
+ if (!pullResult.msgFoundList.empty()) {
+ StatsServerManager::getInstance()->getConsumeStatServer()->incPullTPS(messageQueue.getTopic(), getGroupName(),
+ pullResult.msgFoundList.size());
+ }
m_consumerService->submitConsumeRequest(request, pullResult.msgFoundList);
producePullMsgTask(request);
diff --git a/src/status/StatsServer.cpp b/src/status/StatsServer.cpp
index fd1c0ec..0ac3063 100644
--- a/src/status/StatsServer.cpp
+++ b/src/status/StatsServer.cpp
@@ -132,6 +132,17 @@ void StatsServer::incConsumeFailedTPS(std::string topic, std::string groupName,
m_consumeStatsItems[key] = item;
}
m_consumeStatsItems[key].consumeFailedCount += msgCount;
+ m_consumeStatsItems[key].consumeFailedMsgs += msgCount;
+}
+void StatsServer::incConsumeFailedMsgs(std::string topic, std::string groupName, uint64 msgCount) {
+ std::string key = topic + "@" + groupName;
+ LOG_DEBUG("incConsumeFailedTPS Key:%s, Count: %lld", key.c_str(), msgCount);
+ std::lock_guard<std::mutex> lock(m_consumeStatsItemMutex);
+ if (m_consumeStatsItems.find(key) == m_consumeStatsItems.end()) {
+ StatsItem item;
+ m_consumeStatsItems[key] = item;
+ }
+ m_consumeStatsItems[key].consumeFailedMsgs += msgCount;
}
void StatsServer::startScheduledTask() {
m_consumer_status_service_thread.reset(new boost::thread(boost::bind(&StatsServer::doStartScheduledTask, this)));
@@ -193,7 +204,7 @@ void StatsServer::samplingInSeconds() {
LOG_DEBUG("samplingInSeconds Key[%s], consumeFailedTPS:%.2f, Count: %lld", it->first.c_str(),
consumeStats.consumeFailedTPS, it->second.consumeFailedCount);
consumeStats.consumeFailedMsgs = it->second.consumeFailedMsgs;
- it->second.consumeFailedMsgs = 0;
+ // it->second.consumeFailedMsgs = 0;
updateConsumeStats(it->first, consumeStats);
}
}
diff --git a/src/status/StatsServer.h b/src/status/StatsServer.h
index 11f2d67..e105f23 100644
--- a/src/status/StatsServer.h
+++ b/src/status/StatsServer.h
@@ -54,6 +54,7 @@ class StatsServer {
void incConsumeRT(std::string topic, std::string groupName, uint64 rt, uint64 msgCount = 1);
void incConsumeOKTPS(std::string topic, std::string groupName, uint64 msgCount);
void incConsumeFailedTPS(std::string topic, std::string groupName, uint64 msgCount);
+ void incConsumeFailedMsgs(std::string topic, std::string groupName, uint64 msgCount);
private:
void startScheduledTask();