You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/21 07:23:18 UTC

[rocketmq-clients] branch feature_collect_cache_stats created (now 3a68784)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a change to branch feature_collect_cache_stats
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


      at 3a68784  Collect stats for local cache

This branch includes the following new commits:

     new 3a68784  Collect stats for local cache

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[rocketmq-clients] 01/01: Collect stats for local cache

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch feature_collect_cache_stats
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 3a68784d55ab8a91be171b2eec4ddc5efa05d5b5
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jul 21 15:21:00 2022 +0800

    Collect stats for local cache
---
 .../rocketmq/AsyncReceiveMessageCallback.cpp       |  3 +-
 cpp/source/rocketmq/ProcessQueueImpl.cpp           | 59 ++++++----------------
 cpp/source/rocketmq/PushConsumerImpl.cpp           | 59 ++++++++++++++++++++++
 cpp/source/rocketmq/include/ProcessQueue.h         | 15 +++---
 cpp/source/rocketmq/include/ProcessQueueImpl.h     | 48 +++---------------
 cpp/source/rocketmq/include/PushConsumerImpl.h     |  4 ++
 6 files changed, 91 insertions(+), 97 deletions(-)

diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
index c1b57d4..30cdb7a 100644
--- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -51,8 +51,7 @@ void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const
 
   SPDLOG_DEBUG("Receive messages from broker[host={}] returns with status=FOUND, msgListSize={}, queue={}",
                result.source_host, result.messages.size(), process_queue->simpleName());
-  process_queue->cacheMessages(result.messages);
-
+  process_queue->accountCache(result.messages);
   consumer->getConsumeMessageService()->dispatch(process_queue, result.messages);
   checkThrottleThenReceive();
 }
diff --git a/cpp/source/rocketmq/ProcessQueueImpl.cpp b/cpp/source/rocketmq/ProcessQueueImpl.cpp
index 70ab77b..16574e6 100644
--- a/cpp/source/rocketmq/ProcessQueueImpl.cpp
+++ b/cpp/source/rocketmq/ProcessQueueImpl.cpp
@@ -62,6 +62,14 @@ bool ProcessQueueImpl::expired() const {
   return false;
 }
 
+std::uint64_t ProcessQueueImpl::cachedMessageQuantity() const {
+  return cached_message_quantity_.load(std::memory_order_relaxed);
+}
+
+std::uint64_t ProcessQueueImpl::cachedMessageMemory() const {
+  return cached_message_memory_.load(std::memory_order_relaxed);
+}
+
 bool ProcessQueueImpl::shouldThrottle() const {
   auto consumer = consumer_.lock();
   if (!consumer) {
@@ -124,57 +132,20 @@ void ProcessQueueImpl::popMessage() {
       absl::ToChronoMilliseconds(consumer_client->config().subscriber.polling_timeout), callback);
 }
 
-bool ProcessQueueImpl::hasPendingMessages() const {
-  absl::MutexLock lk(&messages_mtx_);
-  return !cached_messages_.empty();
-}
-
-void ProcessQueueImpl::cacheMessages(const std::vector<MessageConstSharedPtr>& messages) {
+void ProcessQueueImpl::accountCache(const std::vector<MessageConstSharedPtr>& messages) {
   auto consumer = consumer_.lock();
   if (!consumer) {
     return;
   }
 
-  {
-    absl::MutexLock messages_lock_guard(&messages_mtx_);
-    for (const auto& message : messages) {
-      const std::string& msg_id = message->id();
-      if (!filter_expression_.accept(*message)) {
-        const std::string& topic = message->topic();
-        auto callback = [topic, msg_id](const std::error_code& ec) {
-          if (ec) {
-            SPDLOG_WARN(
-                "Failed to ack message[Topic={}, MsgId={}] directly as it fails to pass filter expression. Cause: {}",
-                topic, msg_id, ec.message());
-          } else {
-            SPDLOG_DEBUG("Ack message[Topic={}, MsgId={}] directly as it fails to pass filter expression", topic,
-                         msg_id);
-          }
-        };
-        consumer->ack(*message, callback);
-        continue;
-      }
-      cached_messages_.emplace_back(message);
-      cached_message_quantity_.fetch_add(1, std::memory_order_relaxed);
-      cached_message_memory_.fetch_add(message->body().size(), std::memory_order_relaxed);
-    }
-  }
-}
-
-bool ProcessQueueImpl::take(uint32_t batch_size, std::vector<MessageConstSharedPtr>& messages) {
-  absl::MutexLock lock(&messages_mtx_);
-  if (cached_messages_.empty()) {
-    return false;
+  for (const auto& message : messages) {
+    cached_message_quantity_.fetch_add(1, std::memory_order_relaxed);
+    cached_message_memory_.fetch_add(message->body().size(), std::memory_order_relaxed);
   }
 
-  for (auto it = cached_messages_.begin(); it != cached_messages_.end();) {
-    if (0 == batch_size--) {
-      break;
-    }
-    messages.push_back(*it);
-    it = cached_messages_.erase(it);
-  }
-  return !cached_messages_.empty();
+  SPDLOG_DEBUG("Cache of process-queue={} has {} messages, body of them taking up {} bytes", simple_name_,
+               cached_message_quantity_.load(std::memory_order_relaxed),
+               cached_message_memory_.load(std::memory_order_relaxed));
 }
 
 void ProcessQueueImpl::release(uint64_t body_size) {
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp b/cpp/source/rocketmq/PushConsumerImpl.cpp
index 55e9728..c6e67bd 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -19,7 +19,9 @@
 #include <atomic>
 #include <cassert>
 #include <chrono>
+#include <cstdint>
 #include <cstdlib>
+#include <string>
 #include <system_error>
 
 #include "AsyncReceiveMessageCallback.h"
@@ -29,7 +31,9 @@
 #include "Protocol.h"
 #include "RpcClient.h"
 #include "Signature.h"
+#include "Tag.h"
 #include "google/protobuf/util/time_util.h"
+#include "opencensus/stats/stats.h"
 #include "rocketmq/MQClientException.h"
 #include "rocketmq/MessageListener.h"
 
@@ -89,9 +93,20 @@ void PushConsumerImpl::start() {
   scan_assignment_handle_ = client_manager_->getScheduler()->schedule(
       scan_assignment_functor, SCAN_ASSIGNMENT_TASK_NAME, std::chrono::milliseconds(100), std::chrono::seconds(5));
   SPDLOG_INFO("PushConsumer started, groupName={}", client_config_.subscriber.group.name());
+
+  auto collect_stats_functor = [consumer_weak_ptr] {
+    auto consumer = consumer_weak_ptr.lock();
+    if (consumer) {
+      consumer->collectCacheStats();
+    }
+  };
+
+  collect_stats_handle_ = client_manager_->getScheduler()->schedule(collect_stats_functor, COLLECT_STATS_TASK_NAME,
+                                                                    std::chrono::seconds(3), std::chrono::seconds(3));
 }
 
 const char* PushConsumerImpl::SCAN_ASSIGNMENT_TASK_NAME = "scan-assignment-task";
+const char* PushConsumerImpl::COLLECT_STATS_TASK_NAME = "collect-stats-task";
 
 void PushConsumerImpl::shutdown() {
   State expecting = State::STARTED;
@@ -101,6 +116,11 @@ void PushConsumerImpl::shutdown() {
       SPDLOG_DEBUG("Scan assignment periodic task cancelled");
     }
 
+    if (collect_stats_handle_) {
+      client_manager_->getScheduler()->cancel(collect_stats_handle_);
+      SPDLOG_DEBUG("Collect cache stats periodic task cancelled");
+    }
+
     {
       absl::MutexLock lock(&process_queue_table_mtx_);
       process_queue_table_.clear();
@@ -558,4 +578,43 @@ void PushConsumerImpl::onVerifyMessage(MessageConstSharedPtr message, std::funct
   }
 }
 
+void PushConsumerImpl::collectCacheStats() {
+  absl::flat_hash_map<std::string, std::uint64_t> topic_count;
+  absl::flat_hash_map<std::string, std::uint64_t> topic_memory;
+
+  {
+    absl::MutexLock lk(&process_queue_table_mtx_);
+    for (const auto& entry : process_queue_table_) {
+      auto&& topic = entry.second->topic();
+      std::uint64_t cnt = entry.second->cachedMessageQuantity();
+      std::uint64_t memory = entry.second->cachedMessageMemory();
+      auto it = topic_count.find(topic);
+      if (it == topic_count.end()) {
+        topic_count.insert_or_assign(topic, cnt);
+      } else {
+        it->second += cnt;
+      }
+
+      it = topic_memory.find(topic);
+      if (it == topic_memory.end()) {
+        topic_memory.insert_or_assign(topic, memory);
+      } else {
+        it->second += memory;
+      }
+    }
+  }
+
+  for (const auto& entry : topic_count) {
+    opencensus::stats::Record({{stats_.cachedMessageQuantity(), entry.second}},
+                              {{Tag::topicTag(), entry.first}, {Tag::clientIdTag(), client_config_.client_id}});
+    SPDLOG_DEBUG("Cache on Quantity {} --> {}", entry.first, entry.second);
+  }
+
+  for (const auto& entry : topic_memory) {
+    opencensus::stats::Record({{stats_.cachedMessageBytes(), entry.second}},
+                              {{Tag::topicTag(), entry.first}, {Tag::clientIdTag(), client_config_.client_id}});
+    SPDLOG_DEBUG("Cache on Memory {} --> {}", entry.first, entry.second);
+  }
+}
+
 ROCKETMQ_NAMESPACE_END
diff --git a/cpp/source/rocketmq/include/ProcessQueue.h b/cpp/source/rocketmq/include/ProcessQueue.h
index ac3576d..0e8ce74 100644
--- a/cpp/source/rocketmq/include/ProcessQueue.h
+++ b/cpp/source/rocketmq/include/ProcessQueue.h
@@ -16,6 +16,7 @@
  */
 #pragma once
 
+#include <cstdint>
 #include <memory>
 
 #include "AsyncReceiveMessageCallback.h"
@@ -39,19 +40,19 @@ public:
 
   virtual void receiveMessage() = 0;
 
-  virtual bool hasPendingMessages() const = 0;
-
   virtual std::string topic() const = 0;
 
-  virtual bool take(uint32_t batch_size, std::vector<MessageConstSharedPtr>& messages) = 0;
-
   virtual std::weak_ptr<PushConsumerImpl> getConsumer() = 0;
 
   virtual const std::string& simpleName() const = 0;
 
   virtual void release(uint64_t body_size) = 0;
 
-  virtual void cacheMessages(const std::vector<MessageConstSharedPtr>& messages) = 0;
+  virtual void accountCache(const std::vector<MessageConstSharedPtr>& messages) = 0;
+
+  virtual std::uint64_t cachedMessageQuantity() const = 0;
+
+  virtual std::uint64_t cachedMessageMemory() const = 0;
 
   virtual bool shouldThrottle() const = 0;
 
@@ -61,10 +62,6 @@ public:
 
   virtual const FilterExpression& getFilterExpression() const = 0;
 
-  virtual bool bindFifoConsumeTask() = 0;
-
-  virtual bool unbindFifoConsumeTask() = 0;
-
   virtual const rmq::MessageQueue& messageQueue() const = 0;
 };
 
diff --git a/cpp/source/rocketmq/include/ProcessQueueImpl.h b/cpp/source/rocketmq/include/ProcessQueueImpl.h
index 6479718..36464fc 100644
--- a/cpp/source/rocketmq/include/ProcessQueueImpl.h
+++ b/cpp/source/rocketmq/include/ProcessQueueImpl.h
@@ -56,7 +56,7 @@ public:
 
   bool expired() const override;
 
-  bool shouldThrottle() const override LOCKS_EXCLUDED(messages_mtx_);
+  bool shouldThrottle() const override;
 
   const FilterExpression& getFilterExpression() const override;
 
@@ -74,46 +74,22 @@ public:
     return message_queue_.topic().name();
   }
 
-  bool hasPendingMessages() const override LOCKS_EXCLUDED(messages_mtx_);
+   std::uint64_t cachedMessageQuantity() const override;
+
+   std::uint64_t cachedMessageMemory() const override;
 
   /**
    * Put message fetched from broker into cache.
    *
    * @param messages
    */
-  void cacheMessages(const std::vector<MessageConstSharedPtr>& messages) override LOCKS_EXCLUDED(messages_mtx_);
-
-  /**
-   * @return Number of messages that is not yet dispatched to thread pool, likely, due to topic-rate-limiting.
-   */
-  uint32_t cachedMessagesSize() const LOCKS_EXCLUDED(messages_mtx_) {
-    absl::MutexLock lk(&messages_mtx_);
-    return cached_messages_.size();
-  }
-
-  /**
-   * Dispatch messages from cache to thread pool in form of consumeTask.
-   * @param batch_size
-   * @param messages
-   * @return true if there are more messages to consume in cache
-   */
-  bool take(uint32_t batch_size, std::vector<MessageConstSharedPtr>& messages) override LOCKS_EXCLUDED(messages_mtx_);
+  void accountCache(const std::vector<MessageConstSharedPtr>& messages) override;
 
   void syncIdleState() override {
     idle_since_ = std::chrono::steady_clock::now();
   }
 
-  void release(uint64_t body_size) override LOCKS_EXCLUDED(messages_mtx_);
-
-  bool unbindFifoConsumeTask() override {
-    bool expected = true;
-    return has_fifo_task_bound_.compare_exchange_strong(expected, false, std::memory_order_relaxed);
-  }
-
-  bool bindFifoConsumeTask() override {
-    bool expected = false;
-    return has_fifo_task_bound_.compare_exchange_strong(expected, true, std::memory_order_relaxed);
-  }
+  void release(uint64_t body_size) override;
 
   const rmq::MessageQueue& messageQueue() const override {
     return message_queue_;
@@ -140,13 +116,6 @@ private:
 
   std::shared_ptr<AsyncReceiveMessageCallback> receive_callback_;
 
-  /**
-   * Messages that are pending to be submitted to thread pool.
-   */
-  mutable std::vector<MessageConstSharedPtr> cached_messages_ GUARDED_BY(messages_mtx_);
-
-  mutable absl::Mutex messages_mtx_;
-
   /**
    * @brief Quantity of the cached messages.
    *
@@ -159,11 +128,6 @@ private:
    */
   std::atomic<uint64_t> cached_message_memory_;
 
-  /**
-   * If this process queue is used in FIFO scenario, this field marks if there is an task in thread pool.
-   */
-  std::atomic_bool has_fifo_task_bound_{false};
-
   void popMessage();
   void wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& metadata,
                              rmq::ReceiveMessageRequest& request);
diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h b/cpp/source/rocketmq/include/PushConsumerImpl.h
index 95a07f2..cfc13a2 100644
--- a/cpp/source/rocketmq/include/PushConsumerImpl.h
+++ b/cpp/source/rocketmq/include/PushConsumerImpl.h
@@ -212,11 +212,15 @@ private:
   int32_t max_delivery_attempts_{MixAll::DEFAULT_MAX_DELIVERY_ATTEMPTS};
 
   ConsumeStats stats_;
+  std::uintptr_t collect_stats_handle_{0};
+  static const char* COLLECT_STATS_TASK_NAME;
 
   void fetchRoutes() LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
 
   std::chrono::milliseconds invisibleDuration(std::size_t attempt);
 
+  void collectCacheStats() LOCKS_EXCLUDED(process_queue_table_mtx_);
+
   friend class ConsumeMessageService;
   friend class ConsumeFifoMessageService;
   friend class ConsumeStandardMessageService;