You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/21 07:27:13 UTC

[rocketmq-clients] branch master updated: Collect stats for local cache (#70)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c21cbb  Collect stats for local cache (#70)
8c21cbb is described below

commit 8c21cbbcd8d8aaf230f018b27578f035195fecff
Author: Zhanhui Li <li...@gmail.com>
AuthorDate: Thu Jul 21 15:27:08 2022 +0800

    Collect stats for local cache (#70)
---
 .../rocketmq/AsyncReceiveMessageCallback.cpp       |  3 +-
 cpp/source/rocketmq/ProcessQueueImpl.cpp           | 59 ++++++----------------
 cpp/source/rocketmq/PushConsumerImpl.cpp           | 59 ++++++++++++++++++++++
 cpp/source/rocketmq/include/ProcessQueue.h         | 15 +++---
 cpp/source/rocketmq/include/ProcessQueueImpl.h     | 48 +++---------------
 cpp/source/rocketmq/include/PushConsumerImpl.h     |  4 ++
 6 files changed, 91 insertions(+), 97 deletions(-)

diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
index c1b57d4..30cdb7a 100644
--- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -51,8 +51,7 @@ void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const
 
   SPDLOG_DEBUG("Receive messages from broker[host={}] returns with status=FOUND, msgListSize={}, queue={}",
                result.source_host, result.messages.size(), process_queue->simpleName());
-  process_queue->cacheMessages(result.messages);
-
+  process_queue->accountCache(result.messages);
   consumer->getConsumeMessageService()->dispatch(process_queue, result.messages);
   checkThrottleThenReceive();
 }
diff --git a/cpp/source/rocketmq/ProcessQueueImpl.cpp b/cpp/source/rocketmq/ProcessQueueImpl.cpp
index 70ab77b..16574e6 100644
--- a/cpp/source/rocketmq/ProcessQueueImpl.cpp
+++ b/cpp/source/rocketmq/ProcessQueueImpl.cpp
@@ -62,6 +62,14 @@ bool ProcessQueueImpl::expired() const {
   return false;
 }
 
+std::uint64_t ProcessQueueImpl::cachedMessageQuantity() const {
+  return cached_message_quantity_.load(std::memory_order_relaxed);
+}
+
+std::uint64_t ProcessQueueImpl::cachedMessageMemory() const {
+  return cached_message_memory_.load(std::memory_order_relaxed);
+}
+
 bool ProcessQueueImpl::shouldThrottle() const {
   auto consumer = consumer_.lock();
   if (!consumer) {
@@ -124,57 +132,20 @@ void ProcessQueueImpl::popMessage() {
       absl::ToChronoMilliseconds(consumer_client->config().subscriber.polling_timeout), callback);
 }
 
-bool ProcessQueueImpl::hasPendingMessages() const {
-  absl::MutexLock lk(&messages_mtx_);
-  return !cached_messages_.empty();
-}
-
-void ProcessQueueImpl::cacheMessages(const std::vector<MessageConstSharedPtr>& messages) {
+void ProcessQueueImpl::accountCache(const std::vector<MessageConstSharedPtr>& messages) {
   auto consumer = consumer_.lock();
   if (!consumer) {
     return;
   }
 
-  {
-    absl::MutexLock messages_lock_guard(&messages_mtx_);
-    for (const auto& message : messages) {
-      const std::string& msg_id = message->id();
-      if (!filter_expression_.accept(*message)) {
-        const std::string& topic = message->topic();
-        auto callback = [topic, msg_id](const std::error_code& ec) {
-          if (ec) {
-            SPDLOG_WARN(
-                "Failed to ack message[Topic={}, MsgId={}] directly as it fails to pass filter expression. Cause: {}",
-                topic, msg_id, ec.message());
-          } else {
-            SPDLOG_DEBUG("Ack message[Topic={}, MsgId={}] directly as it fails to pass filter expression", topic,
-                         msg_id);
-          }
-        };
-        consumer->ack(*message, callback);
-        continue;
-      }
-      cached_messages_.emplace_back(message);
-      cached_message_quantity_.fetch_add(1, std::memory_order_relaxed);
-      cached_message_memory_.fetch_add(message->body().size(), std::memory_order_relaxed);
-    }
-  }
-}
-
-bool ProcessQueueImpl::take(uint32_t batch_size, std::vector<MessageConstSharedPtr>& messages) {
-  absl::MutexLock lock(&messages_mtx_);
-  if (cached_messages_.empty()) {
-    return false;
+  for (const auto& message : messages) {
+    cached_message_quantity_.fetch_add(1, std::memory_order_relaxed);
+    cached_message_memory_.fetch_add(message->body().size(), std::memory_order_relaxed);
   }
 
-  for (auto it = cached_messages_.begin(); it != cached_messages_.end();) {
-    if (0 == batch_size--) {
-      break;
-    }
-    messages.push_back(*it);
-    it = cached_messages_.erase(it);
-  }
-  return !cached_messages_.empty();
+  SPDLOG_DEBUG("Cache of process-queue={} has {} messages, body of them taking up {} bytes", simple_name_,
+               cached_message_quantity_.load(std::memory_order_relaxed),
+               cached_message_memory_.load(std::memory_order_relaxed));
 }
 
 void ProcessQueueImpl::release(uint64_t body_size) {
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp b/cpp/source/rocketmq/PushConsumerImpl.cpp
index 55e9728..c6e67bd 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -19,7 +19,9 @@
 #include <atomic>
 #include <cassert>
 #include <chrono>
+#include <cstdint>
 #include <cstdlib>
+#include <string>
 #include <system_error>
 
 #include "AsyncReceiveMessageCallback.h"
@@ -29,7 +31,9 @@
 #include "Protocol.h"
 #include "RpcClient.h"
 #include "Signature.h"
+#include "Tag.h"
 #include "google/protobuf/util/time_util.h"
+#include "opencensus/stats/stats.h"
 #include "rocketmq/MQClientException.h"
 #include "rocketmq/MessageListener.h"
 
@@ -89,9 +93,20 @@ void PushConsumerImpl::start() {
   scan_assignment_handle_ = client_manager_->getScheduler()->schedule(
       scan_assignment_functor, SCAN_ASSIGNMENT_TASK_NAME, std::chrono::milliseconds(100), std::chrono::seconds(5));
   SPDLOG_INFO("PushConsumer started, groupName={}", client_config_.subscriber.group.name());
+
+  auto collect_stats_functor = [consumer_weak_ptr] {
+    auto consumer = consumer_weak_ptr.lock();
+    if (consumer) {
+      consumer->collectCacheStats();
+    }
+  };
+
+  collect_stats_handle_ = client_manager_->getScheduler()->schedule(collect_stats_functor, COLLECT_STATS_TASK_NAME,
+                                                                    std::chrono::seconds(3), std::chrono::seconds(3));
 }
 
 const char* PushConsumerImpl::SCAN_ASSIGNMENT_TASK_NAME = "scan-assignment-task";
+const char* PushConsumerImpl::COLLECT_STATS_TASK_NAME = "collect-stats-task";
 
 void PushConsumerImpl::shutdown() {
   State expecting = State::STARTED;
@@ -101,6 +116,11 @@ void PushConsumerImpl::shutdown() {
       SPDLOG_DEBUG("Scan assignment periodic task cancelled");
     }
 
+    if (collect_stats_handle_) {
+      client_manager_->getScheduler()->cancel(collect_stats_handle_);
+      SPDLOG_DEBUG("Collect cache stats periodic task cancelled");
+    }
+
     {
       absl::MutexLock lock(&process_queue_table_mtx_);
       process_queue_table_.clear();
@@ -558,4 +578,43 @@ void PushConsumerImpl::onVerifyMessage(MessageConstSharedPtr message, std::funct
   }
 }
 
+void PushConsumerImpl::collectCacheStats() {
+  absl::flat_hash_map<std::string, std::uint64_t> topic_count;
+  absl::flat_hash_map<std::string, std::uint64_t> topic_memory;
+
+  {
+    absl::MutexLock lk(&process_queue_table_mtx_);
+    for (const auto& entry : process_queue_table_) {
+      auto&& topic = entry.second->topic();
+      std::uint64_t cnt = entry.second->cachedMessageQuantity();
+      std::uint64_t memory = entry.second->cachedMessageMemory();
+      auto it = topic_count.find(topic);
+      if (it == topic_count.end()) {
+        topic_count.insert_or_assign(topic, cnt);
+      } else {
+        it->second += cnt;
+      }
+
+      it = topic_memory.find(topic);
+      if (it == topic_memory.end()) {
+        topic_memory.insert_or_assign(topic, memory);
+      } else {
+        it->second += memory;
+      }
+    }
+  }
+
+  for (const auto& entry : topic_count) {
+    opencensus::stats::Record({{stats_.cachedMessageQuantity(), entry.second}},
+                              {{Tag::topicTag(), entry.first}, {Tag::clientIdTag(), client_config_.client_id}});
+    SPDLOG_DEBUG("Cache on Quantity {} --> {}", entry.first, entry.second);
+  }
+
+  for (const auto& entry : topic_memory) {
+    opencensus::stats::Record({{stats_.cachedMessageBytes(), entry.second}},
+                              {{Tag::topicTag(), entry.first}, {Tag::clientIdTag(), client_config_.client_id}});
+    SPDLOG_DEBUG("Cache on Memory {} --> {}", entry.first, entry.second);
+  }
+}
+
 ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/rocketmq/include/ProcessQueue.h b/cpp/source/rocketmq/include/ProcessQueue.h
index ac3576d..0e8ce74 100644
--- a/cpp/source/rocketmq/include/ProcessQueue.h
+++ b/cpp/source/rocketmq/include/ProcessQueue.h
@@ -16,6 +16,7 @@
  */
 #pragma once
 
+#include <cstdint>
 #include <memory>
 
 #include "AsyncReceiveMessageCallback.h"
@@ -39,19 +40,19 @@ public:
 
   virtual void receiveMessage() = 0;
 
-  virtual bool hasPendingMessages() const = 0;
-
   virtual std::string topic() const = 0;
 
-  virtual bool take(uint32_t batch_size, std::vector<MessageConstSharedPtr>& messages) = 0;
-
   virtual std::weak_ptr<PushConsumerImpl> getConsumer() = 0;
 
   virtual const std::string& simpleName() const = 0;
 
   virtual void release(uint64_t body_size) = 0;
 
-  virtual void cacheMessages(const std::vector<MessageConstSharedPtr>& messages) = 0;
+  virtual void accountCache(const std::vector<MessageConstSharedPtr>& messages) = 0;
+
+  virtual std::uint64_t cachedMessageQuantity() const = 0;
+
+  virtual std::uint64_t cachedMessageMemory() const = 0;
 
   virtual bool shouldThrottle() const = 0;
 
@@ -61,10 +62,6 @@ public:
 
   virtual const FilterExpression& getFilterExpression() const = 0;
 
-  virtual bool bindFifoConsumeTask() = 0;
-
-  virtual bool unbindFifoConsumeTask() = 0;
-
   virtual const rmq::MessageQueue& messageQueue() const = 0;
 };
 
diff --git a/cpp/source/rocketmq/include/ProcessQueueImpl.h b/cpp/source/rocketmq/include/ProcessQueueImpl.h
index 6479718..36464fc 100644
--- a/cpp/source/rocketmq/include/ProcessQueueImpl.h
+++ b/cpp/source/rocketmq/include/ProcessQueueImpl.h
@@ -56,7 +56,7 @@ public:
 
   bool expired() const override;
 
-  bool shouldThrottle() const override LOCKS_EXCLUDED(messages_mtx_);
+  bool shouldThrottle() const override;
 
   const FilterExpression& getFilterExpression() const override;
 
@@ -74,46 +74,22 @@ public:
     return message_queue_.topic().name();
   }
 
-  bool hasPendingMessages() const override LOCKS_EXCLUDED(messages_mtx_);
+   std::uint64_t cachedMessageQuantity() const override;
+
+   std::uint64_t cachedMessageMemory() const override;
 
   /**
    * Put message fetched from broker into cache.
    *
    * @param messages
    */
-  void cacheMessages(const std::vector<MessageConstSharedPtr>& messages) override LOCKS_EXCLUDED(messages_mtx_);
-
-  /**
-   * @return Number of messages that is not yet dispatched to thread pool, likely, due to topic-rate-limiting.
-   */
-  uint32_t cachedMessagesSize() const LOCKS_EXCLUDED(messages_mtx_) {
-    absl::MutexLock lk(&messages_mtx_);
-    return cached_messages_.size();
-  }
-
-  /**
-   * Dispatch messages from cache to thread pool in form of consumeTask.
-   * @param batch_size
-   * @param messages
-   * @return true if there are more messages to consume in cache
-   */
-  bool take(uint32_t batch_size, std::vector<MessageConstSharedPtr>& messages) override LOCKS_EXCLUDED(messages_mtx_);
+  void accountCache(const std::vector<MessageConstSharedPtr>& messages) override;
 
   void syncIdleState() override {
     idle_since_ = std::chrono::steady_clock::now();
   }
 
-  void release(uint64_t body_size) override LOCKS_EXCLUDED(messages_mtx_);
-
-  bool unbindFifoConsumeTask() override {
-    bool expected = true;
-    return has_fifo_task_bound_.compare_exchange_strong(expected, false, std::memory_order_relaxed);
-  }
-
-  bool bindFifoConsumeTask() override {
-    bool expected = false;
-    return has_fifo_task_bound_.compare_exchange_strong(expected, true, std::memory_order_relaxed);
-  }
+  void release(uint64_t body_size) override;
 
   const rmq::MessageQueue& messageQueue() const override {
     return message_queue_;
@@ -140,13 +116,6 @@ private:
 
   std::shared_ptr<AsyncReceiveMessageCallback> receive_callback_;
 
-  /**
-   * Messages that are pending to be submitted to thread pool.
-   */
-  mutable std::vector<MessageConstSharedPtr> cached_messages_ GUARDED_BY(messages_mtx_);
-
-  mutable absl::Mutex messages_mtx_;
-
   /**
    * @brief Quantity of the cached messages.
    *
@@ -159,11 +128,6 @@ private:
    */
   std::atomic<uint64_t> cached_message_memory_;
 
-  /**
-   * If this process queue is used in FIFO scenario, this field marks if there is an task in thread pool.
-   */
-  std::atomic_bool has_fifo_task_bound_{false};
-
   void popMessage();
   void wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata,
                              rmq::ReceiveMessageRequest& request);
diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h b/cpp/source/rocketmq/include/PushConsumerImpl.h
index 95a07f2..cfc13a2 100644
--- a/cpp/source/rocketmq/include/PushConsumerImpl.h
+++ b/cpp/source/rocketmq/include/PushConsumerImpl.h
@@ -212,11 +212,15 @@ private:
   int32_t max_delivery_attempts_{MixAll::DEFAULT_MAX_DELIVERY_ATTEMPTS};
 
   ConsumeStats stats_;
+  std::uintptr_t collect_stats_handle_{0};
+  static const char* COLLECT_STATS_TASK_NAME;
 
   void fetchRoutes() LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
 
   std::chrono::milliseconds invisibleDuration(std::size_t attempt);
 
+  void collectCacheStats() LOCKS_EXCLUDED(process_queue_table_mtx_);
+
   friend class ConsumeMessageService;
   friend class ConsumeFifoMessageService;
   friend class ConsumeStandardMessageService;