You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/05/30 07:04:38 UTC

[rocketmq-client-cpp] branch main updated: Instrument with opencensus to collect various metrics (#415)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new e7d188a  Instrument with opencensus to collect various metrics (#415)
e7d188a is described below

commit e7d188a3dc3ec00e11e0c00aba45b5e135b1185b
Author: Zhanhui Li <li...@gmail.com>
AuthorDate: Mon May 30 15:04:33 2022 +0800

    Instrument with opencensus to collect various metrics (#415)
    
    * Sync with protobuf files
    
    * Add more tags
    
    * Collect send success, failure and latency statistics data
    
    * Collect metrics for tx commit/rollback success/failure
    
    * Extract tag definitions to Tag
    
    * Define measures and metrics for consumer
    
    * Collect metrics for PushConsumer
    
    * Collect more metrics
    
    * Use latest opencensus to configure exporter interval
    
    * WIP
    
    * WIP
    
    * WIP
    
    * Remove metrics of committing/rolling-back transactional messages
    
    * Fix trace
---
 .vscode/settings.json                              |   4 +
 api/rocketmq/Message.h                             |   4 +
 bazel/rocketmq_deps.bzl                            |  11 ++
 proto/apache/rocketmq/v2/definition.proto          |   9 +-
 proto/apache/rocketmq/v2/service.proto             |  25 ++-
 src/main/cpp/client/ClientManagerImpl.cpp          |   8 +-
 src/main/cpp/client/TelemetryBidiReactor.cpp       |   8 +
 src/main/cpp/client/include/ClientConfig.h         |   6 +
 .../cpp/rocketmq/ConsumeMessageServiceImpl.cpp     |  29 +++-
 src/main/cpp/rocketmq/ConsumeTask.cpp              |  15 ++
 src/main/cpp/rocketmq/ProducerImpl.cpp             |  63 ++++---
 src/main/cpp/rocketmq/SendContext.cpp              |  48 +++++-
 .../cpp/rocketmq/include/ConsumeMessageService.h   |   2 +
 .../rocketmq/include/ConsumeMessageServiceImpl.h   |   2 +
 src/main/cpp/rocketmq/include/ProducerImpl.h       |   8 +-
 src/main/cpp/rocketmq/include/PushConsumerImpl.h   |   7 +
 src/main/cpp/rocketmq/include/SendContext.h        |   2 +
 src/main/cpp/stats/BUILD.bazel                     |   2 +
 src/main/cpp/stats/ConsumeStats.cpp                | 148 ++++++++++++++++
 src/main/cpp/stats/MetricBidiReactor.cpp           | 116 +++++++++++++
 src/main/cpp/stats/OpencensusExporter.cpp          | 190 +++++++++++++++++++++
 src/main/cpp/stats/PublishStats.cpp                |  20 +--
 .../cpp/stats/{include/PublishStats.h => Tag.cpp}  |  45 ++---
 src/main/cpp/stats/include/ConsumeStats.h          |  87 ++++++++++
 .../stats/include/{PublishStats.h => Exporter.h}   |  26 +--
 src/main/cpp/stats/include/MetricBidiReactor.h     |  78 +++++++++
 src/main/cpp/stats/include/OpencensusExporter.h    |  53 ++++++
 src/main/cpp/stats/include/PublishStats.h          |   9 +-
 .../cpp/stats/include/{PublishStats.h => Tag.h}    |  25 +--
 src/main/cpp/stats/tests/PublishStatsTest.cpp      |  71 +++++++-
 30 files changed, 983 insertions(+), 138 deletions(-)

diff --git a/.vscode/settings.json b/.vscode/settings.json
index cb613f9..9d42abd 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -9,5 +9,9 @@
         "--header-insertion=never", 
         "--compile-commands-dir=${workspaceFolder}/",
         "--query-driver=/**/*",
+    ],
+    "cSpell.words": [
+        "Opencensus",
+        "SPDLOG"
     ]
 }
\ No newline at end of file
diff --git a/api/rocketmq/Message.h b/api/rocketmq/Message.h
index 0537fa4..0c2ef02 100644
--- a/api/rocketmq/Message.h
+++ b/api/rocketmq/Message.h
@@ -78,6 +78,10 @@ public:
     return absl::make_optional(trace_context_);
   }
 
+  void traceContext(std::string &&trace_context) {
+    trace_context_ = std::move(trace_context);
+  }
+
   const std::string& bornHost() const {
     return born_host_;
   }
diff --git a/bazel/rocketmq_deps.bzl b/bazel/rocketmq_deps.bzl
index 7282dd7..39f3642 100644
--- a/bazel/rocketmq_deps.bzl
+++ b/bazel/rocketmq_deps.bzl
@@ -85,6 +85,17 @@ def rocketmq_deps():
             ],
         )
 
+    maybe(
+        http_archive,
+        name = "io_opencensus_cpp",
+        sha256 = "317f2bfdaba469561c7e64b1a55282b87e677c109c9d8877097940e6d5cbca08",
+        urls = [
+            "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/opencensus-cpp/opencensus-cpp-0.4.1.tar.gz",
+            "https://github.com/lizhanhui/opencensus-cpp/archive/refs/tags/v0.4.1.tar.gz",
+        ],
+        strip_prefix = "opencensus-cpp-0.4.1",
+    )
+
     if "com_google_absl" not in native.existing_rules():
         http_archive(
             name = "com_google_absl",
diff --git a/proto/apache/rocketmq/v2/definition.proto b/proto/apache/rocketmq/v2/definition.proto
index 6565a21..58bf06d 100644
--- a/proto/apache/rocketmq/v2/definition.proto
+++ b/proto/apache/rocketmq/v2/definition.proto
@@ -291,12 +291,6 @@ message Assignment {
   MessageQueue message_queue = 1;
 }
 
-message SendReceipt {
-  string message_id = 1;
-  string transaction_id = 2;
-  int64 offset = 3;
-}
-
 enum Code {
   // Success.
   OK = 0;
@@ -395,6 +389,9 @@ enum Code {
   // Client type could not be recognized.
   UNRECOGNIZED_CLIENT_TYPE = 32;
 
+  // Return different results for entries in composite request.
+  MULTIPLE_RESULTS = 33;
+
   // Code indicates that the server encountered an unexpected condition
   // that prevented it from fulfilling the request.
   // This error response is a generic "catch-all" response.
diff --git a/proto/apache/rocketmq/v2/service.proto b/proto/apache/rocketmq/v2/service.proto
index aa688b4..fa2a2f0 100644
--- a/proto/apache/rocketmq/v2/service.proto
+++ b/proto/apache/rocketmq/v2/service.proto
@@ -60,9 +60,19 @@ message SendMessageRequest {
   repeated Message messages = 1;
 }
 
+message SendResultEntry {
+  Status status = 1;
+  string message_id = 2;
+  string transaction_id = 3;
+  int64 offset = 4;
+}
+
 message SendMessageResponse {
   Status status = 1;
-  repeated SendReceipt receipts = 2;
+
+  // Some implementation may have partial failure issues. Client SDK developers are expected to inspect
+  // each entry for best certainty.
+  repeated SendResultEntry entries = 2;
 }
 
 message QueryAssignmentRequest {
@@ -169,7 +179,8 @@ message ThreadStackTrace {
 
 message VerifyMessageCommand {
   string nonce = 1;
-  Message message = 2;
+  MessageQueue message_queue = 2;
+  Message message = 3;
 }
 
 message VerifyMessageResult {
@@ -230,6 +241,14 @@ message Subscription {
   optional google.protobuf.Duration long_polling_timeout = 5;
 }
 
+message Metric {
+  // Indicates that if client should export local metrics to server.
+  bool on = 1;
+
+  // The endpoint that client metrics should be exported to, which is required if the switch is on.
+  optional Endpoints endpoints = 2;
+}
+
 message Settings {
   // Configurations for all clients.
   optional ClientType client_type = 1;
@@ -259,6 +278,8 @@ message Settings {
 
   // User agent details
   UA user_agent = 7;
+
+  Metric metric = 8;
 }
 
 message TelemetryCommand {
diff --git a/src/main/cpp/client/ClientManagerImpl.cpp b/src/main/cpp/client/ClientManagerImpl.cpp
index a5fe96a..0c6e155 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -316,7 +316,7 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met
     auto&& status = invocation_context->response.status();
     switch (invocation_context->response.status().code()) {
       case rmq::Code::OK: {
-        send_receipt.message_id = invocation_context->response.receipts().begin()->message_id();
+        send_receipt.message_id = invocation_context->response.entries().begin()->message_id();
         break;
       }
       case rmq::Code::TOPIC_NOT_FOUND: {
@@ -416,9 +416,9 @@ SendReceipt ClientManagerImpl::processSendResponse(const rmq::MessageQueue& mess
 
   switch (response.status().code()) {
     case rmq::Code::OK: {
-      assert(response.receipts_size() > 0);
-      send_receipt.message_id = response.receipts().begin()->message_id();
-      send_receipt.transaction_id = response.receipts().begin()->transaction_id();
+      assert(response.entries_size() > 0);
+      send_receipt.message_id = response.entries().begin()->message_id();
+      send_receipt.transaction_id = response.entries().begin()->transaction_id();
       return send_receipt;
     }
     case rmq::Code::ILLEGAL_TOPIC: {
diff --git a/src/main/cpp/client/TelemetryBidiReactor.cpp b/src/main/cpp/client/TelemetryBidiReactor.cpp
index f39cdec..cf251e4 100644
--- a/src/main/cpp/client/TelemetryBidiReactor.cpp
+++ b/src/main/cpp/client/TelemetryBidiReactor.cpp
@@ -194,6 +194,14 @@ void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) {
 
   applyBackoffPolicy(settings, ptr);
 
+  // Sync metrics collector configuration
+  if (settings.has_metric()) {
+    const auto& metric = settings.metric();
+    ptr->config().metric.on = metric.on();
+    ptr->config().metric.endpoints.set_scheme(metric.endpoints().scheme());
+    ptr->config().metric.endpoints.mutable_addresses()->CopyFrom(metric.endpoints().addresses());
+  }
+
   switch (settings.pub_sub_case()) {
     case rmq::Settings::PubSubCase::kPublishing: {
       applyPublishingConfig(settings, ptr);
diff --git a/src/main/cpp/client/include/ClientConfig.h b/src/main/cpp/client/include/ClientConfig.h
index 458ca5f..66ca230 100644
--- a/src/main/cpp/client/include/ClientConfig.h
+++ b/src/main/cpp/client/include/ClientConfig.h
@@ -44,6 +44,11 @@ struct SubscriberConfig {
   absl::Duration polling_timeout{absl::Seconds(30)};
 };
 
+struct Metric {
+  bool on{false};
+  rmq::Endpoints endpoints;
+};
+
 struct ClientConfig {
   std::string client_id;
   rmq::ClientType client_type{rmq::ClientType::CLIENT_TYPE_UNSPECIFIED};
@@ -55,6 +60,7 @@ struct ClientConfig {
   std::shared_ptr<CredentialsProvider> credentials_provider;
   PublisherConfig publisher;
   SubscriberConfig subscriber;
+  Metric metric;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp b/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
index 9c9ad49..74d8176 100644
--- a/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -16,9 +16,11 @@
  */
 #include "ConsumeMessageServiceImpl.h"
 
+#include "ConsumeStats.h"
 #include "ConsumeTask.h"
 #include "LoggerImpl.h"
 #include "PushConsumerImpl.h"
+#include "Tag.h"
 #include "ThreadPoolImpl.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
@@ -81,7 +83,24 @@ void ConsumeMessageServiceImpl::ack(const Message& message, std::function<void(c
     return;
   }
 
-  consumer->ack(message, cb);
+  std::weak_ptr<PushConsumerImpl> client(consumer_);
+  const auto& topic = message.topic();
+  // Collect metrics
+  opencensus::stats::Record({{consumer->stats().processSuccess(), 1}},
+                            {{Tag::topicTag(), topic}, {Tag::clientIdTag(), consumer->config().client_id}});
+  auto callback = [cb, client, topic](const std::error_code& ec) {
+    auto consumer = client.lock();
+    if (ec) {
+      opencensus::stats::Record({{consumer->stats().ackFailure(), 1}},
+                                {{Tag::topicTag(), topic}, {Tag::clientIdTag(), consumer->config().client_id}});
+    } else {
+      opencensus::stats::Record({{consumer->stats().ackSuccess(), 1}},
+                                {{Tag::topicTag(), topic}, {Tag::clientIdTag(), consumer->config().client_id}});
+    }
+    cb(ec);
+  };
+
+  consumer->ack(message, callback);
 }
 
 void ConsumeMessageServiceImpl::nack(const Message& message, std::function<void(const std::error_code&)> cb) {
@@ -89,7 +108,9 @@ void ConsumeMessageServiceImpl::nack(const Message& message, std::function<void(
   if (!consumer) {
     return;
   }
-
+  // Collect metrics
+  opencensus::stats::Record({{consumer->stats().processFailure(), 1}},
+                            {{Tag::topicTag(), message.topic()}, {Tag::clientIdTag(), consumer->config().client_id}});
   consumer->nack(message, cb);
 }
 
@@ -114,6 +135,10 @@ std::size_t ConsumeMessageServiceImpl::maxDeliveryAttempt() {
   return consumer->maxDeliveryAttempts();
 }
 
+std::weak_ptr<PushConsumerImpl> ConsumeMessageServiceImpl::consumer() {
+  return consumer_;
+}
+
 bool ConsumeMessageServiceImpl::preHandle(const Message& message) {
   return true;
 }
diff --git a/src/main/cpp/rocketmq/ConsumeTask.cpp b/src/main/cpp/rocketmq/ConsumeTask.cpp
index dede4a4..69d299a 100644
--- a/src/main/cpp/rocketmq/ConsumeTask.cpp
+++ b/src/main/cpp/rocketmq/ConsumeTask.cpp
@@ -17,7 +17,10 @@
 
 #include "ConsumeTask.h"
 
+#include "ConsumeStats.h"
 #include "LoggerImpl.h"
+#include "PushConsumerImpl.h"
+#include "Tag.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -115,6 +118,8 @@ void ConsumeTask::process() {
     return;
   }
 
+  std::shared_ptr<PushConsumerImpl> consumer = svc->consumer().lock();
+
   auto self = shared_from_this();
 
   switch (next_step_) {
@@ -123,7 +128,17 @@ void ConsumeTask::process() {
       auto it = messages_.begin();
       SPDLOG_DEBUG("Start to process message[message-id={}]", (*it)->id());
       svc->preHandle(**it);
+      auto await_time = std::chrono::system_clock::now() - (*it)->extension().decode_time;
+      opencensus::stats::Record(
+          {{consumer->stats().awaitTime(), MixAll::millisecondsOf(await_time)}},
+          {{Tag::topicTag(), (*it)->topic()}, {Tag::clientIdTag(), consumer->config().client_id}});
+
+      std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
       auto result = listener(**it);
+      auto duration = std::chrono::steady_clock::now() - start;
+      opencensus::stats::Record(
+          {{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
+          {{Tag::topicTag(), (*it)->topic()}, {Tag::clientIdTag(), consumer->config().client_id}});
       svc->postHandle(**it, result);
       switch (result) {
         case ConsumeResult::SUCCESS: {
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index 4c203d8..4b6c3f0 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -34,6 +34,7 @@
 #include "SendContext.h"
 #include "SendMessageContext.h"
 #include "Signature.h"
+#include "Tag.h"
 #include "TracingUtility.h"
 #include "TransactionImpl.h"
 #include "UniqueIdGenerator.h"
@@ -287,32 +288,35 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) {
     return;
   }
 
-  // {
-  // Trace Send RPC
-  // auto span_context =
-  // opencensus::trace::propagation::FromTraceParentHeader(context->message_.traceContext().value()); auto span =
-  // opencensus::trace::Span::BlankSpan(); std::string span_name = resourceNamespace() + "/" + context->message_.topic()
-  // + " " +
-  //                         MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION;
-  // if (span_context.IsValid()) {
-  //   span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {&Samplers::always()});
-  // } else {
-  //   span = opencensus::trace::Span::StartSpan(span_name, nullptr, {&Samplers::always()});
-  // }
-  // span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
-  //                   MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION);
-  // span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
-  //                   MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION);
-  // TracingUtility::addUniversalSpanAttributes(context->message_, *this, span);
-  // Note: attempt-time is 0-based
-  // span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, 1 + callback->attemptTime());
-  // if (message.deliveryTimestamp() != absl::ToChronoTime(absl::UnixEpoch())) {
-  //   span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP,
-  //                     absl::FormatTime(absl::FromChrono(message.deliveryTimestamp())));
-  // }
-  // callback->message().traceContext(opencensus::trace::propagation::ToTraceParentHeader(span.context()));
-  // callback->span() = span;
-  // }
+  {
+    // Trace Send RPC
+    if (context->message_->traceContext().has_value()) {
+      auto span_context =
+          opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext().value());
+      auto span = opencensus::trace::Span::BlankSpan();
+      std::string span_name = resourceNamespace() + "/" + context->message_->topic() + " " +
+                              MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION;
+      if (span_context.IsValid()) {
+        span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {traceSampler()});
+      } else {
+        span = opencensus::trace::Span::StartSpan(span_name, nullptr, {traceSampler()});
+      }
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
+                        MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION);
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
+                        MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION);
+      TracingUtility::addUniversalSpanAttributes(*context->message_, config(), span);
+      // Note: attempt-time is 0-based
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, 1 + context->attempt_times_);
+      if (context->message_->deliveryTimestamp().has_value()) {
+        span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP,
+                          absl::FormatTime(absl::FromChrono(context->message_->deliveryTimestamp().value())));
+      }
+      auto ptr = const_cast<Message*>(context->message_.get());
+      ptr->traceContext(opencensus::trace::propagation::ToTraceParentHeader(span.context()));
+      context->span_ = span;
+    }
+  }
 
   SendMessageRequest request;
   wrapSendMessageRequest(*context->message_, request, context->messageQueue());
@@ -354,7 +358,8 @@ void ProducerImpl::send0(MessageConstPtr message, SendCallback callback, std::ve
 
 bool ProducerImpl::endTransaction0(const Transaction& transaction, TransactionState resolution) {
   EndTransactionRequest request;
-  request.mutable_topic()->set_name(transaction.topic());
+  const std::string& topic = transaction.topic();
+  request.mutable_topic()->set_name(topic);
   request.mutable_topic()->set_resource_namespace(resourceNamespace());
   request.set_message_id(transaction.messageId());
   request.set_transaction_id(transaction.messageId());
@@ -396,7 +401,9 @@ bool ProducerImpl::endTransaction0(const Transaction& transaction, TransactionSt
   auto mtx = std::make_shared<absl::Mutex>();
   auto cv = std::make_shared<absl::CondVar>();
   const auto& endpoint = transaction.endpoint();
-  auto cb = [&, span, endpoint, mtx, cv](const std::error_code& ec, const EndTransactionResponse& response) {
+  std::weak_ptr<ProducerImpl> publisher(shared_from_this());
+
+  auto cb = [&, span, endpoint, mtx, cv, topic](const std::error_code& ec, const EndTransactionResponse& response) {
     if (ec) {
       {
         span.SetStatus(opencensus::trace::StatusCode::ABORTED);
diff --git a/src/main/cpp/rocketmq/SendContext.cpp b/src/main/cpp/rocketmq/SendContext.cpp
index 73db9a1..9dee0e8 100644
--- a/src/main/cpp/rocketmq/SendContext.cpp
+++ b/src/main/cpp/rocketmq/SendContext.cpp
@@ -18,14 +18,15 @@
 
 #include <system_error>
 
-#include "opencensus/trace/propagation/trace_context.h"
-#include "opencensus/trace/span.h"
-#include "spdlog/spdlog.h"
-
 #include "ProducerImpl.h"
+#include "PublishStats.h"
+#include "Tag.h"
 #include "TransactionImpl.h"
+#include "opencensus/trace/propagation/trace_context.h"
+#include "opencensus/trace/span.h"
 #include "rocketmq/Logger.h"
 #include "rocketmq/SendReceipt.h"
+#include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -35,6 +36,28 @@ void SendContext::onSuccess(const SendReceipt& send_receipt) noexcept {
     span_.SetStatus(opencensus::trace::StatusCode::OK);
     span_.End();
   }
+
+  auto publisher = producer_.lock();
+  if (!publisher) {
+    return;
+  }
+
+  // Collect metrics
+  {
+    auto duration = std::chrono::steady_clock::now() - request_time_;
+    opencensus::stats::Record({{publisher->stats().latency(), MixAll::millisecondsOf(duration)}},
+                              {
+                                  {Tag::topicTag(), message_->topic()},
+                                  {Tag::clientIdTag(), publisher->config().client_id},
+                              });
+
+    opencensus::stats::Record({{publisher->stats().success(), 1}},
+                              {
+                                  {Tag::topicTag(), message_->topic()},
+                                  {Tag::clientIdTag(), publisher->config().client_id},
+                              });
+  }
+
   // send_receipt.traceContext(opencensus::trace::propagation::ToTraceParentHeader(span_.context()));
   std::error_code ec;
   callback_(ec, send_receipt);
@@ -52,6 +75,22 @@ void SendContext::onFailure(const std::error_code& ec) noexcept {
     return;
   }
 
+  // Collect metrics
+  {
+    auto duration = std::chrono::steady_clock::now() - request_time_;
+    opencensus::stats::Record({{publisher->stats().latency(), MixAll::millisecondsOf(duration)}},
+                              {
+                                  {Tag::topicTag(), message_->topic()},
+                                  {Tag::clientIdTag(), publisher->config().client_id},
+                              });
+
+    opencensus::stats::Record({{publisher->stats().failure(), 1}},
+                              {
+                                  {Tag::topicTag(), message_->topic()},
+                                  {Tag::clientIdTag(), publisher->config().client_id},
+                              });
+  }
+
   if (++attempt_times_ >= publisher->maxAttemptTimes()) {
     SPDLOG_WARN("Retried {} times, which exceeds the limit: {}", attempt_times_, publisher->maxAttemptTimes());
     callback_(ec, {});
@@ -72,6 +111,7 @@ void SendContext::onFailure(const std::error_code& ec) noexcept {
   }
 
   auto message_queue = candidates_[attempt_times_ % candidates_.size()];
+  request_time_ = std::chrono::steady_clock::now();
   producer->sendImpl(shared_from_this());
 }
 
diff --git a/src/main/cpp/rocketmq/include/ConsumeMessageService.h b/src/main/cpp/rocketmq/include/ConsumeMessageService.h
index 39d9d8e..0f8b49a 100644
--- a/src/main/cpp/rocketmq/include/ConsumeMessageService.h
+++ b/src/main/cpp/rocketmq/include/ConsumeMessageService.h
@@ -61,6 +61,8 @@ public:
   virtual void schedule(std::shared_ptr<ConsumeTask> task, std::chrono::milliseconds delay) = 0;
 
   virtual std::size_t maxDeliveryAttempt() = 0;
+
+  virtual std::weak_ptr<PushConsumerImpl> consumer() = 0;
 };
 
 using ConsumeMessageServiceWeakPtr = std::weak_ptr<ConsumeMessageService>;
diff --git a/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h b/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h
index 7d6c34a..632a388 100644
--- a/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h
+++ b/src/main/cpp/rocketmq/include/ConsumeMessageServiceImpl.h
@@ -72,6 +72,8 @@ public:
 
   std::size_t maxDeliveryAttempt() override;
 
+  std::weak_ptr<PushConsumerImpl> consumer() override;
+
 protected:
   std::atomic<State> state_;
 
diff --git a/src/main/cpp/rocketmq/include/ProducerImpl.h b/src/main/cpp/rocketmq/include/ProducerImpl.h
index b765fa6..a50c6ef 100644
--- a/src/main/cpp/rocketmq/include/ProducerImpl.h
+++ b/src/main/cpp/rocketmq/include/ProducerImpl.h
@@ -37,6 +37,7 @@
 #include "rocketmq/SendReceipt.h"
 #include "rocketmq/State.h"
 #include "rocketmq/TransactionChecker.h"
+#include "PublishStats.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -105,7 +106,10 @@ public:
 
   void buildClientSettings(rmq::Settings& settings) override;
 
-  void topicsOfInterest(std::vector<std::string> topics) LOCKS_EXCLUDED(topics_mtx_);
+  void topicsOfInterest(std::vector<std::string> topics)
+      LOCKS_EXCLUDED(topics_mtx_);
+
+  const PublishStats& stats() const { return stats_; }
 
 protected:
   std::shared_ptr<ClientImpl> self() override {
@@ -126,6 +130,8 @@ private:
   std::vector<std::string> topics_ GUARDED_BY(topics_mtx_);
   absl::Mutex topics_mtx_;
 
+  PublishStats stats_;
+
   /**
    * @brief Acquire PublishInfo for the given topic.
    * Generally speaking, it first checks presence of the desired info in local cache, aka, topic_publish_table_;
diff --git a/src/main/cpp/rocketmq/include/PushConsumerImpl.h b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
index cf5fe50..0067009 100644
--- a/src/main/cpp/rocketmq/include/PushConsumerImpl.h
+++ b/src/main/cpp/rocketmq/include/PushConsumerImpl.h
@@ -25,6 +25,7 @@
 #include "ClientImpl.h"
 #include "ClientManagerImpl.h"
 #include "ConsumeMessageService.h"
+#include "ConsumeStats.h"
 #include "ProcessQueue.h"
 #include "Scheduler.h"
 #include "TopicAssignmentInfo.h"
@@ -162,6 +163,10 @@ public:
     return client_config_.subscriber.group.name();
   }
 
+  const ConsumeStats& stats() const {
+    return stats_;
+  }
+
 protected:
   std::shared_ptr<ClientImpl> self() override {
     return shared_from_this();
@@ -206,6 +211,8 @@ private:
 
   int32_t max_delivery_attempts_{MixAll::DEFAULT_MAX_DELIVERY_ATTEMPTS};
 
+  ConsumeStats stats_;
+
   void fetchRoutes() LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
 
   std::chrono::milliseconds invisibleDuration(std::size_t attempt);
diff --git a/src/main/cpp/rocketmq/include/SendContext.h b/src/main/cpp/rocketmq/include/SendContext.h
index bba83c8..4c05ceb 100644
--- a/src/main/cpp/rocketmq/include/SendContext.h
+++ b/src/main/cpp/rocketmq/include/SendContext.h
@@ -72,6 +72,8 @@ public:
    * @brief The on-going span. Should be terminated in the callback functions.
    */
   opencensus::trace::Span span_;
+
+  std::chrono::steady_clock::time_point request_time_{std::chrono::steady_clock::now()};
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/BUILD.bazel b/src/main/cpp/stats/BUILD.bazel
index f31a081..fbf6af2 100644
--- a/src/main/cpp/stats/BUILD.bazel
+++ b/src/main/cpp/stats/BUILD.bazel
@@ -22,7 +22,9 @@ cc_library(
     strip_include_prefix = "//src/main/cpp/stats/include",
     deps = [
         "//api:rocketmq_interface",
+        "//src/main/cpp/client:client_library",
         "@io_opencensus_cpp//opencensus/stats",
+        "@opencensus_proto//opencensus/proto/agent/metrics/v1:metrics_service_grpc_cc",
     ],
     visibility = ["//visibility:public"],
 )
\ No newline at end of file
diff --git a/src/main/cpp/stats/ConsumeStats.cpp b/src/main/cpp/stats/ConsumeStats.cpp
new file mode 100644
index 0000000..e563c8c
--- /dev/null
+++ b/src/main/cpp/stats/ConsumeStats.cpp
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeStats.h"
+
+#include "Tag.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+ConsumeStats::ConsumeStats()
+    : process_success_(
+          opencensus::stats::MeasureInt64::Register("process_success", "Number of messages processed", "1")),
+      process_failure_(opencensus::stats::MeasureInt64::Register(
+          "process_failure", "Number of failures when processing messages", "1")),
+      ack_success_(opencensus::stats::MeasureInt64::Register("ack_success", "Number of messages acknowledged", "1")),
+      ack_failure_(opencensus::stats::MeasureInt64::Register(
+          "ack_failure", "Number of failures when acknowledging messages", "1")),
+      change_invisible_time_success_(opencensus::stats::MeasureInt64::Register(
+          "change_invisible_time_success", "Number of change-invisible-time performed", "1")),
+      change_invisible_time_failure_(opencensus::stats::MeasureInt64::Register(
+          "change_invisible_time_failure", "Number of failures when changing message invisible time", "1")),
+      cached_message_quantity_(opencensus::stats::MeasureInt64::Register(
+          "cached_message_quantity", "Number of locally cached messages", "1")),
+      cached_message_bytes_(opencensus::stats::MeasureInt64::Register(
+          "cached_message_bytes", "Number of locally cached messages in bytes", "1")),
+      delivery_latency_(opencensus::stats::MeasureInt64::Register(
+          "delivery_latency", "Time spent delivering messages from servers to clients", "1")),
+      await_time_(opencensus::stats::MeasureInt64::Register(
+          "await_time", "Client side queuing time of messages before getting processed", "1")),
+      process_time_(opencensus::stats::MeasureInt64::Register("process_time", "Process message time", "1")) {
+  opencensus::stats::ViewDescriptor()
+      .set_name("rocketmq_process_success_total")
+      .set_description("Number of messages processed")
+      .set_measure("process_success")
+      .set_aggregation(opencensus::stats::Aggregation::Sum())
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
+      .RegisterForExport();
+
+  opencensus::stats::ViewDescriptor()
+      .set_name("rocketmq_process_failure_total")
+      .set_description("Number of failures on processing messages")
+      .set_measure("process_failure")
+      .set_aggregation(opencensus::stats::Aggregation::Sum())
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
+      .RegisterForExport();
+
+  opencensus::stats::ViewDescriptor()
+      .set_name("rocketmq_ack_success_total")
+      .set_description("Number of messages acknowledged")
+      .set_measure("ack_success")
+      .set_aggregation(opencensus::stats::Aggregation::Sum())
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
+      .RegisterForExport();
+
+  opencensus::stats::ViewDescriptor()
+      .set_name("rocketmq_ack_failure_total")
+      .set_description("Number of failures on acknowledging messages")
+      .set_measure("ack_failure")
+      .set_aggregation(opencensus::stats::Aggregation::Sum())
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
+      .RegisterForExport();
+
+  opencensus::stats::ViewDescriptor()
+      .set_name("rocketmq_change_invisible_time_success_total")
+      .set_description("Number of change-invisible-time operations")
+      .set_measure("change_invisible_time_success")
+      .set_aggregation(opencensus::stats::Aggregation::Sum())
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
+      .RegisterForExport();
+
+  opencensus::stats::ViewDescriptor()
+      .set_name("rocketmq_change_invisible_time_failure_total")
+      .set_description("Number of failed change-invisible-time operations")
+      .set_measure("change_invisible_time_failure")
+      .set_aggregation(opencensus::stats::Aggregation::Sum())
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
+      .RegisterForExport();
+
+  opencensus::stats::ViewDescriptor()
+      .set_name("rocketmq_consumer_cached_messages")
+      .set_description("Number of messages locally cached")
+      .set_measure("cached_message_quantity")
+      .set_aggregation(opencensus::stats::Aggregation::LastValue())
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
+      .RegisterForExport();
+
+  opencensus::stats::ViewDescriptor()
+      .set_name("rocketmq_consumer_cached_bytes")
+      .set_description("Number of locally cached messages in bytes")
+      .set_measure("cached_message_bytes")
+      .set_aggregation(opencensus::stats::Aggregation::LastValue())
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
+      .RegisterForExport();
+
+  opencensus::stats::ViewDescriptor()
+      .set_name("rocketmq_delivery_latency")
+      .set_description("Message delivery latency")
+      .set_measure("delivery_latency")
+      .set_aggregation(opencensus::stats::Aggregation::Distribution(
+          opencensus::stats::BucketBoundaries::Explicit({5, 10, 20, 50, 500})))
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
+      .RegisterForExport();
+
+  opencensus::stats::ViewDescriptor()
+      .set_name("rocketmq_await_time")
+      .set_description("Message await time")
+      .set_measure("await_time")
+      .set_aggregation(opencensus::stats::Aggregation::Distribution(
+          opencensus::stats::BucketBoundaries::Explicit({1, 1000, 60000, 900000})))
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
+      .RegisterForExport();
+
+  opencensus::stats::ViewDescriptor()
+      .set_name("rocketmq_process_time")
+      .set_description("Process time")
+      .set_measure("process_time")
+      .set_aggregation(opencensus::stats::Aggregation::Distribution(
+          opencensus::stats::BucketBoundaries::Explicit({100, 1000, 60000, 900000})))
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
+      .RegisterForExport();
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/MetricBidiReactor.cpp b/src/main/cpp/stats/MetricBidiReactor.cpp
new file mode 100644
index 0000000..62983ee
--- /dev/null
+++ b/src/main/cpp/stats/MetricBidiReactor.cpp
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MetricBidiReactor.h"
+
+#include "LoggerImpl.h"
+#include "OpencensusExporter.h"
+#include "Signature.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client> client, std::weak_ptr<OpencensusExporter> exporter)
+    : client_(client), exporter_(exporter) {
+  grpc::ClientContext context;
+  auto ptr = client_.lock();
+
+  Metadata metadata;
+  Signature::sign(ptr->config(), metadata);
+
+  for (const auto& entry : metadata) {
+    context.AddMetadata(entry.first, entry.second);
+  }
+  context.set_deadline(std::chrono::system_clock::now() + absl::ToChronoMilliseconds(ptr->config().request_timeout));
+
+  auto exporter_ptr = exporter_.lock();
+  if (!exporter_ptr) {
+    return;
+  }
+
+  exporter_ptr->stub()->async()->Export(&context, this);
+  StartCall();
+}
+
+void MetricBidiReactor::OnReadDone(bool ok) {
+  if (!ok) {
+    SPDLOG_WARN("Failed to read response");
+    return;
+  }
+
+  StartRead(&response_);
+}
+
+void MetricBidiReactor::OnWriteDone(bool ok) {
+  if (!ok) {
+    SPDLOG_WARN("Failed to report metrics");
+    return;
+  }
+
+  bool expected = true;
+  if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
+    fireWrite();
+  }
+}
+
+void MetricBidiReactor::OnDone(const grpc::Status& s) {
+  auto client = client_.lock();
+  if (!client) {
+    return;
+  }
+
+  if (s.ok()) {
+    SPDLOG_DEBUG("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
+  } else {
+    SPDLOG_WARN("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
+  }
+}
+
+void MetricBidiReactor::write(ExportMetricsServiceRequest request) {
+  {
+    absl::MutexLock lk(&requests_mtx_);
+    requests_.emplace_back(std::move(request));
+  }
+
+  fireWrite();
+}
+
+void MetricBidiReactor::fireWrite() {
+  {
+    absl::MutexLock lk(&requests_mtx_);
+    if (requests_.empty()) {
+      SPDLOG_DEBUG("No more metric data to write");
+      return;
+    }
+  }
+
+  bool expected = false;
+  if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) {
+    absl::MutexLock lk(&requests_mtx_);
+    request_.CopyFrom(requests_[0]);
+    requests_.erase(requests_.begin());
+    StartWrite(&request_);
+    fireRead();
+  }
+}
+
+void MetricBidiReactor::fireRead() {
+  bool expected = false;
+  if (read_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) {
+    StartRead(&response_);
+  }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/OpencensusExporter.cpp b/src/main/cpp/stats/OpencensusExporter.cpp
new file mode 100644
index 0000000..0bf13a4
--- /dev/null
+++ b/src/main/cpp/stats/OpencensusExporter.cpp
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpencensusExporter.h"
+
+#include "MetricBidiReactor.h"
+#include "google/protobuf/util/time_util.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+namespace opencensus_proto = opencensus::proto::metrics::v1;
+
+OpencensusExporter::OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client) : client_(client) {
+}
+
+void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceRequest& request) {
+  auto metrics = request.mutable_metrics();
+
+  for (const auto& entry : data) {
+    const auto& view_descriptor = entry.first;
+
+    auto metric = new opencensus::proto::metrics::v1::Metric();
+    auto descriptor = metric->mutable_metric_descriptor();
+    descriptor->set_name(view_descriptor.name());
+    descriptor->set_description(view_descriptor.description());
+    descriptor->set_unit(view_descriptor.measure_descriptor().units());
+    switch (view_descriptor.aggregation().type()) {
+      case opencensus::stats::Aggregation::Type::kCount: {
+        descriptor->set_type(opencensus_proto::MetricDescriptor_Type::MetricDescriptor_Type_CUMULATIVE_INT64);
+        break;
+      }
+
+      case opencensus::stats::Aggregation::Type::kSum: {
+        descriptor->set_type(opencensus_proto::MetricDescriptor_Type::MetricDescriptor_Type_CUMULATIVE_INT64);
+        break;
+      }
+
+      case opencensus::stats::Aggregation::Type::kLastValue: {
+        descriptor->set_type(opencensus_proto::MetricDescriptor_Type::MetricDescriptor_Type_GAUGE_INT64);
+        break;
+      }
+
+      case opencensus::stats::Aggregation::Type::kDistribution: {
+        descriptor->set_type(opencensus_proto::MetricDescriptor_Type::MetricDescriptor_Type_GAUGE_DISTRIBUTION);
+        break;
+      }
+    }
+
+    auto label_keys = descriptor->mutable_label_keys();
+    for (const auto& column : view_descriptor.columns()) {
+      auto label_key = new opencensus::proto::metrics::v1::LabelKey;
+      label_key->set_key(column.name());
+      label_keys->AddAllocated(label_key);
+    }
+
+    auto time_series = metric->mutable_timeseries();
+    const auto& view_data = entry.second;
+
+    auto start_times = view_data.start_times();
+
+    switch (view_data.type()) {
+      case opencensus::stats::ViewData::Type::kInt64: {
+        for (const auto& entry : view_data.int_data()) {
+          auto time_series_element = new opencensus::proto::metrics::v1::TimeSeries();
+
+          auto search = start_times.find(entry.first);
+          if (search != start_times.end()) {
+            absl::Time time = search->second;
+            time_series_element->mutable_start_timestamp()->CopyFrom(
+                google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(time)));
+          }
+
+          auto label_values = time_series_element->mutable_label_values();
+          for (const auto& value : entry.first) {
+            auto label_value = new opencensus::proto::metrics::v1::LabelValue;
+            label_value->set_value(value);
+            label_value->set_has_value(true);
+            label_values->AddAllocated(label_value);
+          }
+
+          auto point = new opencensus::proto::metrics::v1::Point();
+          point->set_int64_value(entry.second);
+          time_series_element->mutable_points()->AddAllocated(point);
+
+          time_series->AddAllocated(time_series_element);
+        }
+        break;
+      }
+      case opencensus::stats::ViewData::Type::kDouble: {
+        for (const auto& entry : view_data.double_data()) {
+          auto time_series_element = new opencensus::proto::metrics::v1::TimeSeries();
+          auto search = start_times.find(entry.first);
+          if (search != start_times.end()) {
+            absl::Time time = search->second;
+            time_series_element->mutable_start_timestamp()->CopyFrom(
+                google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(time)));
+          }
+
+          auto label_values = time_series_element->mutable_label_values();
+          for (const auto& value : entry.first) {
+            auto label_value = new opencensus::proto::metrics::v1::LabelValue;
+            label_value->set_value(value);
+            label_value->set_has_value(true);
+            label_values->AddAllocated(label_value);
+          }
+
+          auto point = new opencensus::proto::metrics::v1::Point();
+          point->set_double_value(entry.second);
+          time_series_element->mutable_points()->AddAllocated(point);
+
+          time_series->AddAllocated(time_series_element);
+        }
+        break;
+      }
+      case opencensus::stats::ViewData::Type::kDistribution: {
+        for (const auto& entry : view_data.distribution_data()) {
+          auto time_series_element = new opencensus::proto::metrics::v1::TimeSeries();
+          auto search = start_times.find(entry.first);
+          if (search != start_times.end()) {
+            absl::Time time = search->second;
+            time_series_element->mutable_start_timestamp()->CopyFrom(
+                google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(time)));
+          }
+
+          auto label_values = time_series_element->mutable_label_values();
+          for (const auto& value : entry.first) {
+            auto label_value = new opencensus::proto::metrics::v1::LabelValue;
+            label_value->set_value(value);
+            label_value->set_has_value(true);
+            label_values->AddAllocated(label_value);
+          }
+          auto point = new opencensus::proto::metrics::v1::Point();
+          auto distribution_value = new opencensus::proto::metrics::v1::DistributionValue;
+          point->set_allocated_distribution_value(distribution_value);
+          time_series_element->mutable_points()->AddAllocated(point);
+          distribution_value->set_count(entry.second.count());
+          distribution_value->set_sum_of_squared_deviation(entry.second.sum_of_squared_deviation());
+          for (const auto& cnt : entry.second.bucket_counts()) {
+            auto bucket = new opencensus::proto::metrics::v1::DistributionValue::Bucket();
+            bucket->set_count(cnt);
+            distribution_value->mutable_buckets()->AddAllocated(bucket);
+          }
+
+          auto bucket_options = distribution_value->mutable_bucket_options();
+
+          for (const auto& boundary : entry.second.bucket_boundaries().lower_boundaries()) {
+            bucket_options->mutable_explicit_()->mutable_bounds()->Add(boundary);
+          }
+
+          time_series->AddAllocated(time_series_element);
+        }
+        break;
+      }
+    }
+
+    metrics->AddAllocated(metric);
+  }
+}
+
+void OpencensusExporter::exportMetrics(
+    const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) {
+  opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest request;
+  wrap(data, request);
+  std::weak_ptr<OpencensusExporter> exporter{shared_from_this()};
+  if (!bidi_reactor_) {
+    bidi_reactor_ = absl::make_unique<MetricBidiReactor>(client_, exporter);
+  }
+  bidi_reactor_->write(request);
+}
+
+void OpencensusExporter::resetStream() {
+  std::weak_ptr<OpencensusExporter> exporter{shared_from_this()};
+  bidi_reactor_.reset(new MetricBidiReactor(client_, exporter));
+}
+
+ROCKETMQ_NAMESPACE_END
diff --git a/src/main/cpp/stats/PublishStats.cpp b/src/main/cpp/stats/PublishStats.cpp
index 9356f98..37ce613 100644
--- a/src/main/cpp/stats/PublishStats.cpp
+++ b/src/main/cpp/stats/PublishStats.cpp
@@ -16,6 +16,8 @@
  */
 #include "PublishStats.h"
 
+#include "Tag.h"
+
 ROCKETMQ_NAMESPACE_BEGIN
 
 PublishStats::PublishStats()
@@ -27,15 +29,17 @@ PublishStats::PublishStats()
       .set_description("Number of messages published")
       .set_measure("publish_success")
       .set_aggregation(opencensus::stats::Aggregation::Sum())
-      .add_column(topicTag())
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
       .RegisterForExport();
 
   opencensus::stats::ViewDescriptor()
       .set_name("rocketmq_send_failure_total")
       .set_description("Number of publish failures")
-      .set_measure("pubish_failure")
+      .set_measure("publish_failure")
       .set_aggregation(opencensus::stats::Aggregation::Sum())
-      .add_column(topicTag())
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
       .RegisterForExport();
 
   opencensus::stats::ViewDescriptor()
@@ -43,14 +47,10 @@ PublishStats::PublishStats()
       .set_description("Publish latency")
       .set_measure("publish_latency")
       .set_aggregation(opencensus::stats::Aggregation::Distribution(
-          opencensus::stats::BucketBoundaries::Explicit({1, 10, 100, 1000})))
-      .add_column(topicTag())
+          opencensus::stats::BucketBoundaries::Explicit({5, 10, 20, 50, 500})))
+      .add_column(Tag::topicTag())
+      .add_column(Tag::clientIdTag())
       .RegisterForExport();
 }
 
-opencensus::tags::TagKey& PublishStats::topicTag() {
-  static opencensus::tags::TagKey topic_tag = opencensus::tags::TagKey::Register("topic");
-  return topic_tag;
-}
-
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/include/PublishStats.h b/src/main/cpp/stats/Tag.cpp
similarity index 54%
copy from src/main/cpp/stats/include/PublishStats.h
copy to src/main/cpp/stats/Tag.cpp
index 46305d0..9599233 100644
--- a/src/main/cpp/stats/include/PublishStats.h
+++ b/src/main/cpp/stats/Tag.cpp
@@ -14,37 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#pragma once
-
-#include <string>
-
-#include "opencensus/stats/stats.h"
-#include "rocketmq/RocketMQ.h"
+#include "Tag.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-class PublishStats {
-public:
-  PublishStats();
-
-  opencensus::stats::MeasureInt64& success() {
-    return success_;
-  }
-
-  opencensus::stats::MeasureInt64& failure() {
-    return failure_;
-  }
+opencensus::tags::TagKey& Tag::topicTag() {
+  static opencensus::tags::TagKey topic_tag = opencensus::tags::TagKey::Register("topic");
+  return topic_tag;
+}
 
-  opencensus::stats::MeasureInt64& latency() {
-    return latency_;
-  }
+opencensus::tags::TagKey& Tag::clientIdTag() {
+  static opencensus::tags::TagKey client_id_tag = opencensus::tags::TagKey::Register("client_id");
+  return client_id_tag;
+}
 
-  static opencensus::tags::TagKey& topicTag();
+opencensus::tags::TagKey& Tag::userIdTag() {
+  static opencensus::tags::TagKey uid_tag = opencensus::tags::TagKey::Register("uid");
+  return uid_tag;
+}
 
-private:
-  opencensus::stats::MeasureInt64 success_;
-  opencensus::stats::MeasureInt64 failure_;
-  opencensus::stats::MeasureInt64 latency_;
-};
+opencensus::tags::TagKey& Tag::deploymentTag() {
+  static opencensus::tags::TagKey deployment_tag = opencensus::tags::TagKey::Register("deployment");
+  return deployment_tag;
+}
 
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
+ROCKETMQ_NAMESPACE_END
diff --git a/src/main/cpp/stats/include/ConsumeStats.h b/src/main/cpp/stats/include/ConsumeStats.h
new file mode 100644
index 0000000..a70f324
--- /dev/null
+++ b/src/main/cpp/stats/include/ConsumeStats.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "opencensus/stats/stats.h"
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class ConsumeStats {
+public:
+  ConsumeStats();
+
+  const opencensus::stats::MeasureInt64& processSuccess() const {
+    return process_success_;
+  }
+
+  const opencensus::stats::MeasureInt64& processFailure() const {
+    return process_failure_;
+  }
+
+  const opencensus::stats::MeasureInt64& ackSuccess() const {
+    return ack_success_;
+  }
+
+  const opencensus::stats::MeasureInt64& ackFailure() const {
+    return ack_failure_;
+  }
+
+  const opencensus::stats::MeasureInt64& changeInvisibleTimeSuccess() const {
+    return change_invisible_time_success_;
+  }
+
+  const opencensus::stats::MeasureInt64& changeInvisibleTimeFailure() const {
+    return change_invisible_time_failure_;
+  }
+
+  const opencensus::stats::MeasureInt64& cachedMessageQuantity() const {
+    return cached_message_quantity_;
+  }
+
+  const opencensus::stats::MeasureInt64& cachedMessageBytes() const {
+    return cached_message_bytes_;
+  }
+
+  const opencensus::stats::MeasureInt64& deliveryLatency() const {
+    return delivery_latency_;
+  }
+
+  const opencensus::stats::MeasureInt64& awaitTime() const {
+    return await_time_;
+  }
+
+  const opencensus::stats::MeasureInt64& processTime() const {
+    return process_time_;
+  }
+
+private:
+  opencensus::stats::MeasureInt64 process_success_;
+  opencensus::stats::MeasureInt64 process_failure_;
+  opencensus::stats::MeasureInt64 ack_success_;
+  opencensus::stats::MeasureInt64 ack_failure_;
+  opencensus::stats::MeasureInt64 change_invisible_time_success_;
+  opencensus::stats::MeasureInt64 change_invisible_time_failure_;
+  opencensus::stats::MeasureInt64 cached_message_quantity_;
+  opencensus::stats::MeasureInt64 cached_message_bytes_;
+
+  opencensus::stats::MeasureInt64 delivery_latency_;
+  opencensus::stats::MeasureInt64 await_time_;
+  opencensus::stats::MeasureInt64 process_time_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/include/PublishStats.h b/src/main/cpp/stats/include/Exporter.h
similarity index 65%
copy from src/main/cpp/stats/include/PublishStats.h
copy to src/main/cpp/stats/include/Exporter.h
index 46305d0..7f2f24a 100644
--- a/src/main/cpp/stats/include/PublishStats.h
+++ b/src/main/cpp/stats/include/Exporter.h
@@ -16,35 +16,15 @@
  */
 #pragma once
 
-#include <string>
-
 #include "opencensus/stats/stats.h"
 #include "rocketmq/RocketMQ.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-class PublishStats {
+class Exporter {
 public:
-  PublishStats();
-
-  opencensus::stats::MeasureInt64& success() {
-    return success_;
-  }
-
-  opencensus::stats::MeasureInt64& failure() {
-    return failure_;
-  }
-
-  opencensus::stats::MeasureInt64& latency() {
-    return latency_;
-  }
-
-  static opencensus::tags::TagKey& topicTag();
-
-private:
-  opencensus::stats::MeasureInt64 success_;
-  opencensus::stats::MeasureInt64 failure_;
-  opencensus::stats::MeasureInt64 latency_;
+  virtual void exportMetrics(
+      const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) = 0;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/include/MetricBidiReactor.h b/src/main/cpp/stats/include/MetricBidiReactor.h
new file mode 100644
index 0000000..cc08c1d
--- /dev/null
+++ b/src/main/cpp/stats/include/MetricBidiReactor.h
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "Client.h"
+#include "grpcpp/grpcpp.h"
+#include "grpcpp/impl/codegen/client_callback.h"
+#include "opencensus/proto/agent/metrics/v1/metrics_service.grpc.pb.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class OpencensusExporter;
+
+using ExportMetricsServiceRequest = opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
+using ExportMetricsServiceResponse = opencensus::proto::agent::metrics::v1::ExportMetricsServiceResponse;
+
+class MetricBidiReactor : public grpc::ClientBidiReactor<ExportMetricsServiceRequest, ExportMetricsServiceResponse> {
+public:
+  MetricBidiReactor(std::weak_ptr<Client> client, std::weak_ptr<OpencensusExporter> exporter);
+
+  /// Notifies the application that a StartRead operation completed.
+  ///
+  /// \param[in] ok Was it successful? If false, no new read/write operation
+  ///               will succeed, and any further Start* should not be called.
+  void OnReadDone(bool /*ok*/) override;
+
+  /// Notifies the application that a StartWrite or StartWriteLast operation
+  /// completed.
+  ///
+  /// \param[in] ok Was it successful? If false, no new read/write operation
+  ///               will succeed, and any further Start* should not be called.
+  void OnWriteDone(bool /*ok*/) override;
+
+  /// Notifies the application that all operations associated with this RPC
+  /// have completed and all Holds have been removed. OnDone provides the RPC
+  /// status outcome for both successful and failed RPCs and will be called in
+  /// all cases. If it is not called, it indicates an application-level problem
+  /// (like failure to remove a hold).
+  ///
+  /// \param[in] s The status outcome of this RPC
+  void OnDone(const grpc::Status& /*s*/) override;
+
+  void write(ExportMetricsServiceRequest request) LOCKS_EXCLUDED(requests_mtx_);
+
+private:
+  std::weak_ptr<Client> client_;
+  std::weak_ptr<OpencensusExporter> exporter_;
+
+  ExportMetricsServiceRequest request_;
+
+  std::vector<ExportMetricsServiceRequest> requests_ GUARDED_BY(requests_mtx_);
+  absl::Mutex requests_mtx_;
+
+  std::atomic_bool inflight_{false};
+  std::atomic_bool read_{false};
+
+  ExportMetricsServiceResponse response_;
+
+  void fireWrite();
+
+  void fireRead();
+};
+
+ROCKETMQ_NAMESPACE_END
diff --git a/src/main/cpp/stats/include/OpencensusExporter.h b/src/main/cpp/stats/include/OpencensusExporter.h
new file mode 100644
index 0000000..7920ff5
--- /dev/null
+++ b/src/main/cpp/stats/include/OpencensusExporter.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "Client.h"
+#include "Exporter.h"
+#include "grpcpp/grpcpp.h"
+#include "opencensus/proto/agent/metrics/v1/metrics_service.grpc.pb.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class MetricBidiReactor;
+
+using Stub = opencensus::proto::agent::metrics::v1::MetricsService::Stub;
+using StubPtr = std::unique_ptr<Stub>;
+using MetricData = std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>;
+using ExportMetricsServiceRequest = opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
+
+class OpencensusExporter : public Exporter, public std::enable_shared_from_this<OpencensusExporter> {
+public:
+  OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client);
+
+  void exportMetrics(const MetricData& data) override;
+
+  static void wrap(const MetricData& data, ExportMetricsServiceRequest& request);
+
+  void resetStream();
+
+  Stub* stub() {
+    return stub_.get();
+  }
+
+private:
+  std::weak_ptr<Client> client_;
+  StubPtr stub_;
+  std::unique_ptr<MetricBidiReactor> bidi_reactor_;
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/include/PublishStats.h b/src/main/cpp/stats/include/PublishStats.h
index 46305d0..d9d750a 100644
--- a/src/main/cpp/stats/include/PublishStats.h
+++ b/src/main/cpp/stats/include/PublishStats.h
@@ -18,6 +18,7 @@
 
 #include <string>
 
+#include "Tag.h"
 #include "opencensus/stats/stats.h"
 #include "rocketmq/RocketMQ.h"
 
@@ -27,20 +28,18 @@ class PublishStats {
 public:
   PublishStats();
 
-  opencensus::stats::MeasureInt64& success() {
+  const opencensus::stats::MeasureInt64& success() const {
     return success_;
   }
 
-  opencensus::stats::MeasureInt64& failure() {
+  const opencensus::stats::MeasureInt64& failure() const {
     return failure_;
   }
 
-  opencensus::stats::MeasureInt64& latency() {
+  const opencensus::stats::MeasureInt64& latency() const {
     return latency_;
   }
 
-  static opencensus::tags::TagKey& topicTag();
-
 private:
   opencensus::stats::MeasureInt64 success_;
   opencensus::stats::MeasureInt64 failure_;
diff --git a/src/main/cpp/stats/include/PublishStats.h b/src/main/cpp/stats/include/Tag.h
similarity index 68%
copy from src/main/cpp/stats/include/PublishStats.h
copy to src/main/cpp/stats/include/Tag.h
index 46305d0..ae69fcb 100644
--- a/src/main/cpp/stats/include/PublishStats.h
+++ b/src/main/cpp/stats/include/Tag.h
@@ -16,35 +16,20 @@
  */
 #pragma once
 
-#include <string>
-
 #include "opencensus/stats/stats.h"
 #include "rocketmq/RocketMQ.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-class PublishStats {
+class Tag {
 public:
-  PublishStats();
-
-  opencensus::stats::MeasureInt64& success() {
-    return success_;
-  }
-
-  opencensus::stats::MeasureInt64& failure() {
-    return failure_;
-  }
+  static opencensus::tags::TagKey& topicTag();
 
-  opencensus::stats::MeasureInt64& latency() {
-    return latency_;
-  }
+  static opencensus::tags::TagKey& clientIdTag();
 
-  static opencensus::tags::TagKey& topicTag();
+  static opencensus::tags::TagKey& userIdTag();
 
-private:
-  opencensus::stats::MeasureInt64 success_;
-  opencensus::stats::MeasureInt64 failure_;
-  opencensus::stats::MeasureInt64 latency_;
+  static opencensus::tags::TagKey& deploymentTag();
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/stats/tests/PublishStatsTest.cpp b/src/main/cpp/stats/tests/PublishStatsTest.cpp
index 609e634..b6fb7d3 100644
--- a/src/main/cpp/stats/tests/PublishStatsTest.cpp
+++ b/src/main/cpp/stats/tests/PublishStatsTest.cpp
@@ -17,6 +17,7 @@
 
 #include <chrono>
 #include <iostream>
+#include <mutex>
 #include <thread>
 
 #include "PublishStats.h"
@@ -28,11 +29,33 @@ class Handler : public opencensus::stats::StatsExporter::Handler {
 public:
   void ExportViewData(
       const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) override {
+    std::cout << "================================================================================" << std::endl;
     for (const auto& datum : data) {
       const auto& view_data = datum.second;
+      const auto& descriptor = datum.first;
+      auto start_times = view_data.start_times();
+      auto columns = descriptor.columns();
+
       switch (view_data.type()) {
         case opencensus::stats::ViewData::Type::kInt64: {
-          exportDatum(datum.first, view_data.start_time(), view_data.end_time(), view_data.int_data());
+          auto data_map = view_data.int_data();
+          for (const auto& entry : data_map) {
+            absl::Time time = start_times[entry.first];
+            std::string line;
+            line.append(absl::FormatTime(time)).append(" ");
+            line.append(descriptor.name());
+            line.append("{");
+            for (std::size_t i = 0; i < columns.size(); i++) {
+              line.append(columns[i].name()).append("=").append(entry.first[i]);
+              if (i < columns.size() - 1) {
+                line.append(", ");
+              } else {
+                line.append("} ==> ");
+              }
+            }
+            line.append(std::to_string(entry.second));
+            println(line);
+          }
           break;
         }
         case opencensus::stats::ViewData::Type::kDouble: {
@@ -40,7 +63,22 @@ public:
           break;
         }
         case opencensus::stats::ViewData::Type::kDistribution: {
-          exportDatum(datum.first, view_data.start_time(), view_data.end_time(), view_data.distribution_data());
+          for (const auto& entry : view_data.distribution_data()) {
+            std::string line(descriptor.name());
+            line.append("{");
+            for (std::size_t i = 0; i < columns.size(); i++) {
+              line.append(columns[i].name()).append("=").append(entry.first[i]);
+              if (i < columns.size() - 1) {
+                line.append(", ");
+              } else {
+                line.append("} ==> ");
+              }
+            }
+            line.append(entry.second.DebugString());
+            println(line);
+
+            println(absl::StrJoin(entry.second.bucket_boundaries().lower_boundaries(), ","));
+          }
           break;
         }
       }
@@ -53,7 +91,7 @@ public:
                    absl::Time end_time,
                    const opencensus::stats::ViewData::DataMap<T>& data) {
     if (data.empty()) {
-      std::cout << "No data for " << descriptor.name() << std::endl;
+      // std::cout << "No data for " << descriptor.name() << std::endl;
       return;
     }
 
@@ -65,6 +103,12 @@ public:
     }
   }
 
+  std::mutex console_mtx;
+  void println(const std::string& line) {
+    std::lock_guard<std::mutex> lk(console_mtx);
+    std::cout << line << std::endl;
+  }
+
   // Functions to format data for different aggregation types.
   std::string DataToString(double data) {
     return absl::StrCat(": ", data, "\n");
@@ -87,12 +131,27 @@ TEST(StatsTest, testBasics) {
   std::string t1("T1");
   std::string t2("T2");
   PublishStats metrics;
+  opencensus::stats::StatsExporter::SetInterval(absl::Seconds(5));
   opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<Handler>());
-  opencensus::stats::Record({{metrics.success(), 1}}, {{PublishStats::topicTag(), t1}});
-  opencensus::stats::Record({{metrics.success(), 100}}, {{PublishStats::topicTag(), t2}});
-  opencensus::stats::Record({{metrics.latency(), 100}}, {{PublishStats::topicTag(), t1}});
+  std::atomic_bool stopped{false};
+  auto generator = [&]() {
+    while (!stopped) {
+      opencensus::stats::Record({{metrics.success(), 1}}, {{Tag::topicTag(), t1}, {Tag::clientIdTag(), "client-0"}});
+      opencensus::stats::Record({{metrics.success(), 100}}, {{Tag::topicTag(), t2}, {Tag::clientIdTag(), "client-0"}});
+      for (std::size_t i = 0; i < 10; i++) {
+        opencensus::stats::Record({{metrics.latency(), i * 10}},
+                                  {{Tag::topicTag(), t1}, {Tag::clientIdTag(), "client-0"}});
+      }
+      std::this_thread::sleep_for(std::chrono::seconds(1));
+    }
+  };
+  std::thread feeder(generator);
 
   std::this_thread::sleep_for(std::chrono::seconds(10));
+  stopped.store(true);
+  if (feeder.joinable()) {
+    feeder.join();
+  }
 }
 
 ROCKETMQ_NAMESPACE_END