You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/21 07:27:13 UTC
[rocketmq-clients] branch master updated: Collect stats for local cache (#70)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 8c21cbb Collect stats for local cache (#70)
8c21cbb is described below
commit 8c21cbbcd8d8aaf230f018b27578f035195fecff
Author: Zhanhui Li <li...@gmail.com>
AuthorDate: Thu Jul 21 15:27:08 2022 +0800
Collect stats for local cache (#70)
---
.../rocketmq/AsyncReceiveMessageCallback.cpp | 3 +-
cpp/source/rocketmq/ProcessQueueImpl.cpp | 59 ++++++----------------
cpp/source/rocketmq/PushConsumerImpl.cpp | 59 ++++++++++++++++++++++
cpp/source/rocketmq/include/ProcessQueue.h | 15 +++---
cpp/source/rocketmq/include/ProcessQueueImpl.h | 48 +++---------------
cpp/source/rocketmq/include/PushConsumerImpl.h | 4 ++
6 files changed, 91 insertions(+), 97 deletions(-)
diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
index c1b57d4..30cdb7a 100644
--- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -51,8 +51,7 @@ void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const
SPDLOG_DEBUG("Receive messages from broker[host={}] returns with status=FOUND, msgListSize={}, queue={}",
result.source_host, result.messages.size(), process_queue->simpleName());
- process_queue->cacheMessages(result.messages);
-
+ process_queue->accountCache(result.messages);
consumer->getConsumeMessageService()->dispatch(process_queue, result.messages);
checkThrottleThenReceive();
}
diff --git a/cpp/source/rocketmq/ProcessQueueImpl.cpp b/cpp/source/rocketmq/ProcessQueueImpl.cpp
index 70ab77b..16574e6 100644
--- a/cpp/source/rocketmq/ProcessQueueImpl.cpp
+++ b/cpp/source/rocketmq/ProcessQueueImpl.cpp
@@ -62,6 +62,14 @@ bool ProcessQueueImpl::expired() const {
return false;
}
+std::uint64_t ProcessQueueImpl::cachedMessageQuantity() const {
+ return cached_message_quantity_.load(std::memory_order_relaxed);
+}
+
+std::uint64_t ProcessQueueImpl::cachedMessageMemory() const {
+ return cached_message_memory_.load(std::memory_order_relaxed);
+}
+
bool ProcessQueueImpl::shouldThrottle() const {
auto consumer = consumer_.lock();
if (!consumer) {
@@ -124,57 +132,20 @@ void ProcessQueueImpl::popMessage() {
absl::ToChronoMilliseconds(consumer_client->config().subscriber.polling_timeout), callback);
}
-bool ProcessQueueImpl::hasPendingMessages() const {
- absl::MutexLock lk(&messages_mtx_);
- return !cached_messages_.empty();
-}
-
-void ProcessQueueImpl::cacheMessages(const std::vector<MessageConstSharedPtr>& messages) {
+void ProcessQueueImpl::accountCache(const std::vector<MessageConstSharedPtr>& messages) {
auto consumer = consumer_.lock();
if (!consumer) {
return;
}
- {
- absl::MutexLock messages_lock_guard(&messages_mtx_);
- for (const auto& message : messages) {
- const std::string& msg_id = message->id();
- if (!filter_expression_.accept(*message)) {
- const std::string& topic = message->topic();
- auto callback = [topic, msg_id](const std::error_code& ec) {
- if (ec) {
- SPDLOG_WARN(
- "Failed to ack message[Topic={}, MsgId={}] directly as it fails to pass filter expression. Cause: {}",
- topic, msg_id, ec.message());
- } else {
- SPDLOG_DEBUG("Ack message[Topic={}, MsgId={}] directly as it fails to pass filter expression", topic,
- msg_id);
- }
- };
- consumer->ack(*message, callback);
- continue;
- }
- cached_messages_.emplace_back(message);
- cached_message_quantity_.fetch_add(1, std::memory_order_relaxed);
- cached_message_memory_.fetch_add(message->body().size(), std::memory_order_relaxed);
- }
- }
-}
-
-bool ProcessQueueImpl::take(uint32_t batch_size, std::vector<MessageConstSharedPtr>& messages) {
- absl::MutexLock lock(&messages_mtx_);
- if (cached_messages_.empty()) {
- return false;
+ for (const auto& message : messages) {
+ cached_message_quantity_.fetch_add(1, std::memory_order_relaxed);
+ cached_message_memory_.fetch_add(message->body().size(), std::memory_order_relaxed);
}
- for (auto it = cached_messages_.begin(); it != cached_messages_.end();) {
- if (0 == batch_size--) {
- break;
- }
- messages.push_back(*it);
- it = cached_messages_.erase(it);
- }
- return !cached_messages_.empty();
+ SPDLOG_DEBUG("Cache of process-queue={} has {} messages, body of them taking up {} bytes", simple_name_,
+ cached_message_quantity_.load(std::memory_order_relaxed),
+ cached_message_memory_.load(std::memory_order_relaxed));
}
void ProcessQueueImpl::release(uint64_t body_size) {
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp b/cpp/source/rocketmq/PushConsumerImpl.cpp
index 55e9728..c6e67bd 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -19,7 +19,9 @@
#include <atomic>
#include <cassert>
#include <chrono>
+#include <cstdint>
#include <cstdlib>
+#include <string>
#include <system_error>
#include "AsyncReceiveMessageCallback.h"
@@ -29,7 +31,9 @@
#include "Protocol.h"
#include "RpcClient.h"
#include "Signature.h"
+#include "Tag.h"
#include "google/protobuf/util/time_util.h"
+#include "opencensus/stats/stats.h"
#include "rocketmq/MQClientException.h"
#include "rocketmq/MessageListener.h"
@@ -89,9 +93,20 @@ void PushConsumerImpl::start() {
scan_assignment_handle_ = client_manager_->getScheduler()->schedule(
scan_assignment_functor, SCAN_ASSIGNMENT_TASK_NAME, std::chrono::milliseconds(100), std::chrono::seconds(5));
SPDLOG_INFO("PushConsumer started, groupName={}", client_config_.subscriber.group.name());
+
+ auto collect_stats_functor = [consumer_weak_ptr] {
+ auto consumer = consumer_weak_ptr.lock();
+ if (consumer) {
+ consumer->collectCacheStats();
+ }
+ };
+
+ collect_stats_handle_ = client_manager_->getScheduler()->schedule(collect_stats_functor, COLLECT_STATS_TASK_NAME,
+ std::chrono::seconds(3), std::chrono::seconds(3));
}
const char* PushConsumerImpl::SCAN_ASSIGNMENT_TASK_NAME = "scan-assignment-task";
+const char* PushConsumerImpl::COLLECT_STATS_TASK_NAME = "collect-stats-task";
void PushConsumerImpl::shutdown() {
State expecting = State::STARTED;
@@ -101,6 +116,11 @@ void PushConsumerImpl::shutdown() {
SPDLOG_DEBUG("Scan assignment periodic task cancelled");
}
+ if (collect_stats_handle_) {
+ client_manager_->getScheduler()->cancel(collect_stats_handle_);
+ SPDLOG_DEBUG("Collect cache stats periodic task cancelled");
+ }
+
{
absl::MutexLock lock(&process_queue_table_mtx_);
process_queue_table_.clear();
@@ -558,4 +578,43 @@ void PushConsumerImpl::onVerifyMessage(MessageConstSharedPtr message, std::funct
}
}
+void PushConsumerImpl::collectCacheStats() {
+ absl::flat_hash_map<std::string, std::uint64_t> topic_count;
+ absl::flat_hash_map<std::string, std::uint64_t> topic_memory;
+
+ {
+ absl::MutexLock lk(&process_queue_table_mtx_);
+ for (const auto& entry : process_queue_table_) {
+ auto&& topic = entry.second->topic();
+ std::uint64_t cnt = entry.second->cachedMessageQuantity();
+ std::uint64_t memory = entry.second->cachedMessageMemory();
+ auto it = topic_count.find(topic);
+ if (it == topic_count.end()) {
+ topic_count.insert_or_assign(topic, cnt);
+ } else {
+ it->second += cnt;
+ }
+
+ it = topic_memory.find(topic);
+ if (it == topic_memory.end()) {
+ topic_memory.insert_or_assign(topic, memory);
+ } else {
+ it->second += memory;
+ }
+ }
+ }
+
+ for (const auto& entry : topic_count) {
+ opencensus::stats::Record({{stats_.cachedMessageQuantity(), entry.second}},
+ {{Tag::topicTag(), entry.first}, {Tag::clientIdTag(), client_config_.client_id}});
+ SPDLOG_DEBUG("Cache on Quantity {} --> {}", entry.first, entry.second);
+ }
+
+ for (const auto& entry : topic_memory) {
+ opencensus::stats::Record({{stats_.cachedMessageBytes(), entry.second}},
+ {{Tag::topicTag(), entry.first}, {Tag::clientIdTag(), client_config_.client_id}});
+ SPDLOG_DEBUG("Cache on Memory {} --> {}", entry.first, entry.second);
+ }
+}
+
ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/rocketmq/include/ProcessQueue.h b/cpp/source/rocketmq/include/ProcessQueue.h
index ac3576d..0e8ce74 100644
--- a/cpp/source/rocketmq/include/ProcessQueue.h
+++ b/cpp/source/rocketmq/include/ProcessQueue.h
@@ -16,6 +16,7 @@
*/
#pragma once
+#include <cstdint>
#include <memory>
#include "AsyncReceiveMessageCallback.h"
@@ -39,19 +40,19 @@ public:
virtual void receiveMessage() = 0;
- virtual bool hasPendingMessages() const = 0;
-
virtual std::string topic() const = 0;
- virtual bool take(uint32_t batch_size, std::vector<MessageConstSharedPtr>& messages) = 0;
-
virtual std::weak_ptr<PushConsumerImpl> getConsumer() = 0;
virtual const std::string& simpleName() const = 0;
virtual void release(uint64_t body_size) = 0;
- virtual void cacheMessages(const std::vector<MessageConstSharedPtr>& messages) = 0;
+ virtual void accountCache(const std::vector<MessageConstSharedPtr>& messages) = 0;
+
+ virtual std::uint64_t cachedMessageQuantity() const = 0;
+
+ virtual std::uint64_t cachedMessageMemory() const = 0;
virtual bool shouldThrottle() const = 0;
@@ -61,10 +62,6 @@ public:
virtual const FilterExpression& getFilterExpression() const = 0;
- virtual bool bindFifoConsumeTask() = 0;
-
- virtual bool unbindFifoConsumeTask() = 0;
-
virtual const rmq::MessageQueue& messageQueue() const = 0;
};
diff --git a/cpp/source/rocketmq/include/ProcessQueueImpl.h b/cpp/source/rocketmq/include/ProcessQueueImpl.h
index 6479718..36464fc 100644
--- a/cpp/source/rocketmq/include/ProcessQueueImpl.h
+++ b/cpp/source/rocketmq/include/ProcessQueueImpl.h
@@ -56,7 +56,7 @@ public:
bool expired() const override;
- bool shouldThrottle() const override LOCKS_EXCLUDED(messages_mtx_);
+ bool shouldThrottle() const override;
const FilterExpression& getFilterExpression() const override;
@@ -74,46 +74,22 @@ public:
return message_queue_.topic().name();
}
- bool hasPendingMessages() const override LOCKS_EXCLUDED(messages_mtx_);
+ std::uint64_t cachedMessageQuantity() const override;
+
+ std::uint64_t cachedMessageMemory() const override;
/**
* Put message fetched from broker into cache.
*
* @param messages
*/
- void cacheMessages(const std::vector<MessageConstSharedPtr>& messages) override LOCKS_EXCLUDED(messages_mtx_);
-
- /**
- * @return Number of messages that is not yet dispatched to thread pool, likely, due to topic-rate-limiting.
- */
- uint32_t cachedMessagesSize() const LOCKS_EXCLUDED(messages_mtx_) {
- absl::MutexLock lk(&messages_mtx_);
- return cached_messages_.size();
- }
-
- /**
- * Dispatch messages from cache to thread pool in form of consumeTask.
- * @param batch_size
- * @param messages
- * @return true if there are more messages to consume in cache
- */
- bool take(uint32_t batch_size, std::vector<MessageConstSharedPtr>& messages) override LOCKS_EXCLUDED(messages_mtx_);
+ void accountCache(const std::vector<MessageConstSharedPtr>& messages) override;
void syncIdleState() override {
idle_since_ = std::chrono::steady_clock::now();
}
- void release(uint64_t body_size) override LOCKS_EXCLUDED(messages_mtx_);
-
- bool unbindFifoConsumeTask() override {
- bool expected = true;
- return has_fifo_task_bound_.compare_exchange_strong(expected, false, std::memory_order_relaxed);
- }
-
- bool bindFifoConsumeTask() override {
- bool expected = false;
- return has_fifo_task_bound_.compare_exchange_strong(expected, true, std::memory_order_relaxed);
- }
+ void release(uint64_t body_size) override;
const rmq::MessageQueue& messageQueue() const override {
return message_queue_;
@@ -140,13 +116,6 @@ private:
std::shared_ptr<AsyncReceiveMessageCallback> receive_callback_;
- /**
- * Messages that are pending to be submitted to thread pool.
- */
- mutable std::vector<MessageConstSharedPtr> cached_messages_ GUARDED_BY(messages_mtx_);
-
- mutable absl::Mutex messages_mtx_;
-
/**
* @brief Quantity of the cached messages.
*
@@ -159,11 +128,6 @@ private:
*/
std::atomic<uint64_t> cached_message_memory_;
- /**
- * If this process queue is used in FIFO scenario, this field marks if there is an task in thread pool.
- */
- std::atomic_bool has_fifo_task_bound_{false};
-
void popMessage();
void wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata,
rmq::ReceiveMessageRequest& request);
diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h b/cpp/source/rocketmq/include/PushConsumerImpl.h
index 95a07f2..cfc13a2 100644
--- a/cpp/source/rocketmq/include/PushConsumerImpl.h
+++ b/cpp/source/rocketmq/include/PushConsumerImpl.h
@@ -212,11 +212,15 @@ private:
int32_t max_delivery_attempts_{MixAll::DEFAULT_MAX_DELIVERY_ATTEMPTS};
ConsumeStats stats_;
+ std::uintptr_t collect_stats_handle_{0};
+ static const char* COLLECT_STATS_TASK_NAME;
void fetchRoutes() LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
std::chrono::milliseconds invisibleDuration(std::size_t attempt);
+ void collectCacheStats() LOCKS_EXCLUDED(process_queue_table_mtx_);
+
friend class ConsumeMessageService;
friend class ConsumeFifoMessageService;
friend class ConsumeStandardMessageService;