You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/13 06:41:19 UTC

[rocketmq-clients] branch master updated: Support client metric exporting (#39)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new c8ed73d  Support client metric exporting (#39)
c8ed73d is described below

commit c8ed73d3ef7e37e3fcd11fa6fa66126ffe1ac8ce
Author: Zhanhui Li <li...@gmail.com>
AuthorDate: Wed Jul 13 14:41:15 2022 +0800

    Support client metric exporting (#39)
---
 cpp/proto/apache/rocketmq/v2/definition.proto      |  14 ++-
 cpp/proto/apache/rocketmq/v2/service.proto         |  19 +---
 cpp/src/main/cpp/base/ErrorCategory.cpp            |  53 +++++++++-
 cpp/src/main/cpp/base/MixAll.cpp                   |  25 ++---
 cpp/src/main/cpp/base/include/MixAll.h             |   3 +-
 cpp/src/main/cpp/base/tests/MixAllTest.cpp         |  11 ++
 cpp/src/main/cpp/client/BUILD.bazel                |   1 +
 cpp/src/main/cpp/client/ClientManagerImpl.cpp      |   7 +-
 cpp/src/main/cpp/client/LogInterceptor.cpp         |   4 +-
 cpp/src/main/cpp/client/TelemetryBidiReactor.cpp   |   1 -
 cpp/src/main/cpp/client/include/ClientConfig.h     |   3 +-
 cpp/src/main/cpp/client/include/ClientManager.h    |   6 ++
 cpp/src/main/cpp/rocketmq/ClientImpl.cpp           |  47 +++++++++
 .../cpp/rocketmq/ConsumeMessageServiceImpl.cpp     |  13 ---
 cpp/src/main/cpp/rocketmq/ConsumeTask.cpp          |  32 +++++-
 cpp/src/main/cpp/rocketmq/ProducerImpl.cpp         |  16 ++-
 cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp     |   6 +-
 cpp/src/main/cpp/rocketmq/SendContext.cpp          |  14 +--
 cpp/src/main/cpp/rocketmq/SimpleConsumerImpl.cpp   |   2 +-
 cpp/src/main/cpp/rocketmq/include/ClientImpl.h     |   3 +-
 cpp/src/main/cpp/rocketmq/include/ProducerImpl.h   |   5 +-
 .../main/cpp/rocketmq/include/PushConsumerImpl.h   |   2 +-
 .../main/cpp/rocketmq/include/SimpleConsumerImpl.h |   2 +-
 cpp/src/main/cpp/stats/ConsumeStats.cpp            |  67 +-----------
 cpp/src/main/cpp/stats/MetricBidiReactor.cpp       |  25 +++--
 cpp/src/main/cpp/stats/OpencensusExporter.cpp      | 114 ++++++++++-----------
 .../stats/{include/Tag.h => OpencensusHandler.cpp} |  20 ++--
 cpp/src/main/cpp/stats/PublishStats.cpp            |  24 +----
 cpp/src/main/cpp/stats/StdoutHandler.cpp           |  81 +++++++++++++++
 cpp/src/main/cpp/stats/Tag.cpp                     |  11 +-
 cpp/src/main/cpp/stats/include/ConsumeStats.h      |  30 ------
 cpp/src/main/cpp/stats/include/MetricBidiReactor.h |   1 +
 .../main/cpp/stats/include/OpencensusExporter.h    |   7 +-
 .../include/{Exporter.h => OpencensusHandler.h}    |  12 ++-
 cpp/src/main/cpp/stats/include/PublishStats.h      |  12 +--
 cpp/src/main/cpp/stats/include/StdoutHandler.h     |  76 ++++++++++++++
 cpp/src/main/cpp/stats/include/Tag.h               |   4 +-
 cpp/src/main/cpp/stats/tests/PublishStatsTest.cpp  |   2 -
 38 files changed, 459 insertions(+), 316 deletions(-)

diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto b/cpp/proto/apache/rocketmq/v2/definition.proto
index 67520fe..86bb3dd 100644
--- a/cpp/proto/apache/rocketmq/v2/definition.proto
+++ b/cpp/proto/apache/rocketmq/v2/definition.proto
@@ -322,16 +322,20 @@ enum Code {
   ILLEGAL_MESSAGE_ID = 40009;
   // Format of filter expression is illegal.
   ILLEGAL_FILTER_EXPRESSION = 40010;
+  // The invisible time of request is invalid.
+  ILLEGAL_INVISIBLE_TIME = 40011;
+  // The delivery timestamp of message is invalid.
+  ILLEGAL_DELIVERY_TIME = 40012;
   // Receipt handle of message is invalid.
-  INVALID_RECEIPT_HANDLE = 40011;
+  INVALID_RECEIPT_HANDLE = 40013;
   // Message property conflicts with its type.
-  MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40012;
+  MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40014;
   // Client type could not be recognized.
-  UNRECOGNIZED_CLIENT_TYPE = 40013;
+  UNRECOGNIZED_CLIENT_TYPE = 40015;
   // Message is corrupted.
-  MESSAGE_CORRUPTED = 40014;
+  MESSAGE_CORRUPTED = 40016;
   // Request is rejected due to missing of x-mq-client-id header.
-  CLIENT_ID_REQUIRED = 40015;
+  CLIENT_ID_REQUIRED = 40017;
 
   // Generic code indicates that the client request lacks valid authentication
   // credentials for the requested resource.
diff --git a/cpp/proto/apache/rocketmq/v2/service.proto b/cpp/proto/apache/rocketmq/v2/service.proto
index c5d4cce..42bf332 100644
--- a/cpp/proto/apache/rocketmq/v2/service.proto
+++ b/cpp/proto/apache/rocketmq/v2/service.proto
@@ -182,8 +182,7 @@ message ThreadStackTrace {
 
 message VerifyMessageCommand {
   string nonce = 1;
-  MessageQueue message_queue = 2;
-  Message message = 3;
+  Message message = 2;
 }
 
 message VerifyMessageResult {
@@ -191,9 +190,8 @@ message VerifyMessageResult {
 }
 
 message RecoverOrphanedTransactionCommand {
-  MessageQueue message_queue = 1;
-  Message orphaned_transactional_message = 2;
-  string transaction_id = 3;
+  Message message = 1;
+  string transaction_id = 2;
 }
 
 message Publishing {
@@ -203,21 +201,14 @@ message Publishing {
   // List of topics to which messages will publish to.
   repeated Resource topics = 1;
 
-  // Publishing settings below here are from server, it is essential for
-  // server to push.
-  //
-  // Body of message will be deflated if its size in bytes exceeds the
-  // threshold.
-  int32 compress_body_threshold = 2;
-
   // If the message body size exceeds `max_body_size`, broker servers would
   // reject the request. As a result, it is advisable that Producer performs
   // client-side check validation.
-  int32 max_body_size = 3;
+  int32 max_body_size = 2;
 
   // When `validate_message_type` flag set `false`, no need to validate message's type
   // with messageQueue's `accept_message_types` before publising.
-  bool validate_message_type = 4;
+  bool validate_message_type = 3;
 }
 
 message Subscription {
diff --git a/cpp/src/main/cpp/base/ErrorCategory.cpp b/cpp/src/main/cpp/base/ErrorCategory.cpp
index 5ca2528..28390b8 100644
--- a/cpp/src/main/cpp/base/ErrorCategory.cpp
+++ b/cpp/src/main/cpp/base/ErrorCategory.cpp
@@ -68,7 +68,7 @@ std::string ErrorCategory::message(int code) const {
       return "State of dependent procedure is not right";
 
     case ErrorCode::TooManyRequests:
-      return "Quota exchausted. The user has sent too many requests in a given "
+      return "Quota exhausted. The user has sent too many requests in a given "
              "amount of time.";
 
     case ErrorCode::UnavailableForLegalReasons:
@@ -127,6 +127,57 @@ std::string ErrorCategory::message(int code) const {
     case ErrorCode::IllegalMessageTag: {
       return "Format of message tag is illegal.";
     }
+    case ErrorCode::InternalClientError: {
+      return "Internal client error";
+    }
+    case ErrorCode::IllegalMessageKey: {
+      return "Message key is not legal";
+    }
+    case ErrorCode::IllegalMessageProperty: {
+      return "One or multiple message properties is not legal";
+    }
+    case ErrorCode::IllegalMessageGroup: {
+      return "Message group is invalid";
+    }
+    case ErrorCode::InvalidTransactionId: {
+      return "Transaction ID is invalid";
+    }
+    case ErrorCode::IllegalMessageId: {
+      return "Message ID is invalid";
+    }
+    case ErrorCode::IllegalFilterExpression: {
+      return "Filter expression is malformed";
+    }
+    case ErrorCode::InvalidReceiptHandle: {
+      return "Receipt handle is invalid";
+    }
+    case ErrorCode::MessagePropertyConflictWithType: {
+      return "Message property conflicts with message type";
+    }
+    case ErrorCode::UnsupportedClientType: {
+      return "Server does not support the client type claimed";
+    }
+    case ErrorCode::MessageCorrupted: {
+      return "Message is corrupted";
+    }
+    case ErrorCode::ClientIdRequired: {
+      return "x-mq-client-id header meta-data is required";
+    }
+    case ErrorCode::MessageNotFound: {
+      return "No new messages are available at the moment";
+    }
+    case ErrorCode::MessageBodyTooLarge: {
+      return "Size of message body exceeds limits";
+    }
+    case ErrorCode::MessagePropertiesTooLarge: {
+      return "Size of message properties exceeds limits";
+    }
+    case ErrorCode::NotSupported: {
+      return "Action is not supported";
+    }
+    case ErrorCode::VerifyFifoMessageUnsupported: {
+      return "Verify FIFO message is not supported";
+    }
   }
 
   return "";
diff --git a/cpp/src/main/cpp/base/MixAll.cpp b/cpp/src/main/cpp/base/MixAll.cpp
index 4d244ce..0514c71 100644
--- a/cpp/src/main/cpp/base/MixAll.cpp
+++ b/cpp/src/main/cpp/base/MixAll.cpp
@@ -19,12 +19,15 @@
 #include <chrono>
 #include <cstdint>
 #include <cstdlib>
+#include <system_error>
 
+#include "LoggerImpl.h"
 #include "absl/random/random.h"
 #include "absl/strings/str_split.h"
 #include "fmt/format.h"
 #include "openssl/md5.h"
 #include "openssl/sha.h"
+#include "rocketmq/ErrorCode.h"
 #include "zlib.h"
 
 #ifdef _WIN32
@@ -49,7 +52,7 @@ const uint32_t MixAll::DEFAULT_CONSUME_THREAD_POOL_SIZE = 20;
 const uint32_t MixAll::DEFAULT_CONSUME_MESSAGE_BATCH_SIZE = 1;
 const int32_t MixAll::DEFAULT_MAX_DELIVERY_ATTEMPTS = 16;
 
-const RE2 MixAll::TOPIC_REGEX("[a-zA-Z0-9\\-_]{3,64}");
+const RE2 MixAll::TOPIC_REGEX("[a-zA-Z0-9\\-_%]{1,64}");
 const RE2 MixAll::IP_REGEX("\\d+\\.\\d+\\.\\d+\\.\\d+");
 
 const std::chrono::duration<long long> MixAll::DEFAULT_INVISIBLE_TIME_ = std::chrono::seconds(30);
@@ -138,26 +141,20 @@ const char* MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION = "__await_consumption";
 const char* MixAll::SPAN_ANNOTATION_MESSAGE_KEYS = "__message_keys";
 const char* MixAll::SPAN_ANNOTATION_ATTR_START_TIME = "__start_time";
 
-bool MixAll::validate(const Message& message) {
+void MixAll::validate(const Message& message, std::error_code& ec) {
   if (message.topic().empty()) {
-    return false;
+    SPDLOG_WARN("Topic of the message to publish is empty");
+    ec = ErrorCode::IllegalTopic;
+    return;
   }
+
   const std::string& topic = message.topic();
-  // Topic should not start with "CID" or "GID" which are reserved prefix
-  if (absl::StartsWith(topic, "CID") || absl::StartsWith(topic, "GID")) {
-    return false;
-  }
 
   // Legal topic characters are a-z, A-Z, 0-9, hyphen('-') and underline('_')
   if (!RE2::FullMatch(topic, TOPIC_REGEX)) {
-    return false;
+    SPDLOG_WARN("Topic of the message to publish does not match [a-zA-Z0-9\\-_%]{1,64}");
+    ec = ErrorCode::IllegalTopic;
   }
-
-  uint32_t body_length = message.body().length();
-  if (!body_length || body_length > MAX_MESSAGE_BODY_SIZE) {
-    return false;
-  }
-  return true;
 }
 
 uint32_t MixAll::random(uint32_t left, uint32_t right) {
diff --git a/cpp/src/main/cpp/base/include/MixAll.h b/cpp/src/main/cpp/base/include/MixAll.h
index 1483737..f30494f 100644
--- a/cpp/src/main/cpp/base/include/MixAll.h
+++ b/cpp/src/main/cpp/base/include/MixAll.h
@@ -19,6 +19,7 @@
 #include <chrono>
 #include <cstdint>
 #include <string>
+#include <system_error>
 
 #include "absl/strings/string_view.h"
 #include "re2/re2.h"
@@ -144,7 +145,7 @@ public:
    * @param message
    * @return
    */
-  static bool validate(const Message& message);
+  static void validate(const Message& message, std::error_code& ec);
 
   static uint32_t random(uint32_t left, uint32_t right);
 
diff --git a/cpp/src/main/cpp/base/tests/MixAllTest.cpp b/cpp/src/main/cpp/base/tests/MixAllTest.cpp
index e66030e..53b019a 100644
--- a/cpp/src/main/cpp/base/tests/MixAllTest.cpp
+++ b/cpp/src/main/cpp/base/tests/MixAllTest.cpp
@@ -14,7 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include <system_error>
+
 #include "MixAll.h"
+#include "absl/strings/str_split.h"
 #include "gtest/gtest.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
@@ -24,4 +27,12 @@ TEST(MixAllTest, testOsName) {
   std::cout << os_name << std::endl;
 }
 
+TEST(MixAllTest, testValidate) {
+  auto message = Message::newBuilder().withTopic("").build();
+  std::error_code ec;
+  MixAll::validate(*message, ec);
+  ASSERT_EQ(true, (bool)ec);
+  ASSERT_TRUE(absl::StrContains(ec.message(), "illegal"));
+}
+
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/client/BUILD.bazel b/cpp/src/main/cpp/client/BUILD.bazel
index c56f279..0af283e 100644
--- a/cpp/src/main/cpp/client/BUILD.bazel
+++ b/cpp/src/main/cpp/client/BUILD.bazel
@@ -37,5 +37,6 @@ cc_library(
         "//external:gtest",
     ],
     defines = [
+        "DEBUG_METRIC_EXPORTING",
     ],
 )
\ No newline at end of file
diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
index 1f9eec6..78b13f7 100644
--- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
@@ -331,7 +331,7 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met
     }
 
     if (State::STARTED != client_manager_ptr->state()) {
-      // TODO: Would this leak some memroy?
+      // TODO: Would this leak some memory?
       return;
     }
 
@@ -501,10 +501,7 @@ RpcClientSharedPtr ClientManagerImpl::getRpcClient(const std::string& target_hos
       } else if (!search->second->ok()) {
         SPDLOG_INFO("Prior RPC client to {} is not OK. Re-create one", target_host);
       }
-      std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories;
-      interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
-      auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
-          target_host, channel_credential_, channel_arguments_, std::move(interceptor_factories));
+      auto channel = createChannel(target_host);
       std::weak_ptr<ClientManager> client_manager(shared_from_this());
       client = std::make_shared<RpcClientImpl>(client_manager, channel, target_host, need_heartbeat);
       rpc_clients_.insert_or_assign(target_host, client);
diff --git a/cpp/src/main/cpp/client/LogInterceptor.cpp b/cpp/src/main/cpp/client/LogInterceptor.cpp
index 09dbc9f..7702864 100644
--- a/cpp/src/main/cpp/client/LogInterceptor.cpp
+++ b/cpp/src/main/cpp/client/LogInterceptor.cpp
@@ -15,13 +15,15 @@
  * limitations under the License.
  */
 #include "LogInterceptor.h"
+
+#include <cstddef>
+
 #include "InterceptorContinuation.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/strings/str_join.h"
 #include "google/protobuf/message.h"
 #include "rocketmq/Logger.h"
 #include "spdlog/spdlog.h"
-#include <cstddef>
 
 ROCKETMQ_NAMESPACE_BEGIN
 
diff --git a/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp b/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp
index 7479ba2..e88eb13 100644
--- a/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp
+++ b/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp
@@ -244,7 +244,6 @@ void TelemetryBidiReactor::applyBackoffPolicy(const rmq::Settings& settings, std
 
 void TelemetryBidiReactor::applyPublishingConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) {
   client->config().publisher.max_body_size = settings.publishing().max_body_size();
-  client->config().publisher.compress_body_threshold = settings.publishing().compress_body_threshold();
 }
 
 void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) {
diff --git a/cpp/src/main/cpp/client/include/ClientConfig.h b/cpp/src/main/cpp/client/include/ClientConfig.h
index bade9a4..58cd1fe 100644
--- a/cpp/src/main/cpp/client/include/ClientConfig.h
+++ b/cpp/src/main/cpp/client/include/ClientConfig.h
@@ -32,7 +32,6 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 struct PublisherConfig {
   std::vector<rmq::Resource> topics;
-  std::uint32_t compress_body_threshold{4096};
   std::uint32_t max_body_size{4194304};
 };
 
@@ -54,7 +53,7 @@ struct ClientConfig {
   rmq::ClientType client_type{rmq::ClientType::CLIENT_TYPE_UNSPECIFIED};
   RetryPolicy backoff_policy;
   // TODO: use std::chrono::milliseconds
-  absl::Duration request_timeout;
+  absl::Duration request_timeout{absl::Seconds(3)};
   std::string region{"cn-hangzhou"};
   std::string resource_namespace;
   std::shared_ptr<CredentialsProvider> credentials_provider;
diff --git a/cpp/src/main/cpp/client/include/ClientManager.h b/cpp/src/main/cpp/client/include/ClientManager.h
index 99ec6ad..2ec5b2e 100644
--- a/cpp/src/main/cpp/client/include/ClientManager.h
+++ b/cpp/src/main/cpp/client/include/ClientManager.h
@@ -50,6 +50,12 @@ public:
 
   virtual std::shared_ptr<RpcClient> getRpcClient(const std::string& target_host, bool need_heartbeat) = 0;
 
+  /**
+   * @brief Create a Channel object
+   *
+   * @param target_host gRPC naming targets, following https://github.com/grpc/grpc/blob/master/doc/naming.md
+   * @return std::shared_ptr<grpc::Channel>
+   */
   virtual std::shared_ptr<grpc::Channel> createChannel(const std::string& target_host) = 0;
 
   virtual void resolveRoute(const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
diff --git a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
index 8275922..61e6ed9 100644
--- a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -16,6 +16,8 @@
  */
 #include "ClientImpl.h"
 
+#include <apache/rocketmq/v2/definition.pb.h>
+
 #include <algorithm>
 #include <atomic>
 #include <chrono>
@@ -37,10 +39,12 @@
 #include "NamingScheme.h"
 #include "SessionImpl.h"
 #include "Signature.h"
+#include "StdoutHandler.h"
 #include "UtilAll.h"
 #include "absl/strings/numbers.h"
 #include "absl/strings/str_join.h"
 #include "absl/strings/str_split.h"
+#include "opencensus/stats/stats.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/MessageListener.h"
 
@@ -165,6 +169,49 @@ void ClientImpl::start() {
 
   route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
                                                                    std::chrono::seconds(10), std::chrono::seconds(30));
+
+  auto endpoints = client_config_.metric.endpoints;
+  std::string target;
+  switch (endpoints.scheme()) {
+    case rmq::AddressScheme::IPv4: {
+      target.append("ipv4:");
+      break;
+    }
+    case rmq::AddressScheme::IPv6: {
+      target.append("ipv6:");
+      break;
+    }
+    case rmq::AddressScheme::DOMAIN_NAME: {
+      target.append("dns:");
+      break;
+    }
+    default: {
+      SPDLOG_ERROR("Unknown metric address scheme");
+    }
+  }
+
+  bool first = true;
+  for (const auto& address : endpoints.addresses()) {
+    if (!first) {
+      target.push_back(',');
+    } else {
+      first = false;
+    }
+    target.append(address.host());
+    target.push_back(':');
+    target.append(std::to_string(address.port()));
+  }
+
+  std::weak_ptr<Client> client_weak_ptr(self());
+
+#ifdef DEBUG_METRIC_EXPORTING
+  opencensus::stats::StatsExporter::SetInterval(absl::Seconds(1));
+  opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<StdoutHandler>());
+#else
+  opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1));
+#endif
+  SPDLOG_INFO("Export client metrics to {}", target);
+  opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<OpencensusHandler>(target, client_weak_ptr));
 }
 
 void ClientImpl::shutdown() {
diff --git a/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp b/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
index 74d8176..bb56e71 100644
--- a/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -85,18 +85,8 @@ void ConsumeMessageServiceImpl::ack(const Message& message, std::function<void(c
 
   std::weak_ptr<PushConsumerImpl> client(consumer_);
   const auto& topic = message.topic();
-  // Collect metrics
-  opencensus::stats::Record({{consumer->stats().processSuccess(), 1}},
-                            {{Tag::topicTag(), topic}, {Tag::clientIdTag(), consumer->config().client_id}});
   auto callback = [cb, client, topic](const std::error_code& ec) {
     auto consumer = client.lock();
-    if (ec) {
-      opencensus::stats::Record({{consumer->stats().ackFailure(), 1}},
-                                {{Tag::topicTag(), topic}, {Tag::clientIdTag(), consumer->config().client_id}});
-    } else {
-      opencensus::stats::Record({{consumer->stats().ackSuccess(), 1}},
-                                {{Tag::topicTag(), topic}, {Tag::clientIdTag(), consumer->config().client_id}});
-    }
     cb(ec);
   };
 
@@ -108,9 +98,6 @@ void ConsumeMessageServiceImpl::nack(const Message& message, std::function<void(
   if (!consumer) {
     return;
   }
-  // Collect metrics
-  opencensus::stats::Record({{consumer->stats().processFailure(), 1}},
-                            {{Tag::topicTag(), message.topic()}, {Tag::clientIdTag(), consumer->config().client_id}});
   consumer->nack(message, cb);
 }
 
diff --git a/cpp/src/main/cpp/rocketmq/ConsumeTask.cpp b/cpp/src/main/cpp/rocketmq/ConsumeTask.cpp
index 69d299a..7af1b28 100644
--- a/cpp/src/main/cpp/rocketmq/ConsumeTask.cpp
+++ b/cpp/src/main/cpp/rocketmq/ConsumeTask.cpp
@@ -21,6 +21,7 @@
 #include "LoggerImpl.h"
 #include "PushConsumerImpl.h"
 #include "Tag.h"
+#include "rocketmq/ConsumeResult.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -128,18 +129,43 @@ void ConsumeTask::process() {
       auto it = messages_.begin();
       SPDLOG_DEBUG("Start to process message[message-id={}]", (*it)->id());
       svc->preHandle(**it);
+
+      // Collect metrics of await_time
       auto await_time = std::chrono::system_clock::now() - (*it)->extension().decode_time;
       opencensus::stats::Record(
           {{consumer->stats().awaitTime(), MixAll::millisecondsOf(await_time)}},
           {{Tag::topicTag(), (*it)->topic()}, {Tag::clientIdTag(), consumer->config().client_id}});
 
       std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
+
+      // Invoke user-defined-callback
       auto result = listener(**it);
+
+      // Collect metrics of process_time
       auto duration = std::chrono::steady_clock::now() - start;
-      opencensus::stats::Record(
-          {{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
-          {{Tag::topicTag(), (*it)->topic()}, {Tag::clientIdTag(), consumer->config().client_id}});
+      switch (result) {
+        case ConsumeResult::SUCCESS: {
+          opencensus::stats::Record({{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
+                                    {
+                                        {Tag::topicTag(), (*it)->topic()},
+                                        {Tag::clientIdTag(), consumer->config().client_id},
+                                        {Tag::invocationStatus(), "success"},
+                                    });
+          break;
+        }
+        case ConsumeResult::FAILURE: {
+          opencensus::stats::Record({{consumer->stats().processTime(), MixAll::millisecondsOf(duration)}},
+                                    {
+                                        {Tag::topicTag(), (*it)->topic()},
+                                        {Tag::clientIdTag(), consumer->config().client_id},
+                                        {Tag::invocationStatus(), "failure"},
+                                    });
+          break;
+        }
+      }
+
       svc->postHandle(**it, result);
+
       switch (result) {
         case ConsumeResult::SUCCESS: {
           auto callback = std::bind(&ConsumeTask::onAck, self, std::placeholders::_1);
diff --git a/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp b/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
index 65cc5ba..2cd4399 100644
--- a/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -101,8 +101,15 @@ void ProducerImpl::ensureRunning(std::error_code& ec) const noexcept {
   }
 }
 
-bool ProducerImpl::validate(const Message& message) {
-  return MixAll::validate(message);
+void ProducerImpl::validate(const Message& message, std::error_code& ec) {
+  MixAll::validate(message, ec);
+
+  if (!ec) {
+    if (message.body().length() > client_config_.publisher.max_body_size) {
+      SPDLOG_WARN("Body of the message to send is too large");
+      ec = ErrorCode::PayloadTooLarge;
+    }
+  }
 }
 
 void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageRequest& request,
@@ -338,9 +345,8 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) {
 void ProducerImpl::send0(MessageConstPtr message, SendCallback callback, std::vector<rmq::MessageQueue> list) {
   SendReceipt send_receipt;
   std::error_code ec;
-
-  if (!validate(*message)) {
-    ec = ErrorCode::BadRequest;
+  validate(*message, ec);
+  if (ec) {
     callback(ec, send_receipt);
     return;
   }
diff --git a/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp b/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
index 2acff23..a1bfe90 100644
--- a/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
@@ -44,7 +44,7 @@ PushConsumerImpl::~PushConsumerImpl() {
   shutdown();
 }
 
-void PushConsumerImpl::topicsOfInterest(std::vector<std::string>& topics) {
+void PushConsumerImpl::topicsOfInterest(std::vector<std::string> topics) {
   absl::MutexLock lk(&topic_filter_expression_table_mtx_);
   for (const auto& entry : topic_filter_expression_table_) {
     topics.push_back(entry.first);
@@ -214,7 +214,7 @@ void PushConsumerImpl::queryAssignment(
     absl::flat_hash_map<std::string, std::string> metadata;
     Signature::sign(client_config_, metadata);
     auto assignment_callback = [this, cb, topic, broker_host](const std::error_code& ec,
-                                                              const QueryAssignmentResponse& response) {
+                                                        const QueryAssignmentResponse& response) {
       if (ec) {
         SPDLOG_WARN("Failed to acquire queue assignment of topic={} from brokerAddress={}", topic, broker_host);
         cb(ec, nullptr);
@@ -559,4 +559,4 @@ void PushConsumerImpl::onVerifyMessage(MessageConstSharedPtr message, std::funct
   }
 }
 
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
+ROCKETMQ_NAMESPACE_END
diff --git a/cpp/src/main/cpp/rocketmq/SendContext.cpp b/cpp/src/main/cpp/rocketmq/SendContext.cpp
index 9dee0e8..f73a30d 100644
--- a/cpp/src/main/cpp/rocketmq/SendContext.cpp
+++ b/cpp/src/main/cpp/rocketmq/SendContext.cpp
@@ -49,12 +49,7 @@ void SendContext::onSuccess(const SendReceipt& send_receipt) noexcept {
                               {
                                   {Tag::topicTag(), message_->topic()},
                                   {Tag::clientIdTag(), publisher->config().client_id},
-                              });
-
-    opencensus::stats::Record({{publisher->stats().success(), 1}},
-                              {
-                                  {Tag::topicTag(), message_->topic()},
-                                  {Tag::clientIdTag(), publisher->config().client_id},
+                                  {Tag::invocationStatus(), "success"},
                               });
   }
 
@@ -82,12 +77,7 @@ void SendContext::onFailure(const std::error_code& ec) noexcept {
                               {
                                   {Tag::topicTag(), message_->topic()},
                                   {Tag::clientIdTag(), publisher->config().client_id},
-                              });
-
-    opencensus::stats::Record({{publisher->stats().failure(), 1}},
-                              {
-                                  {Tag::topicTag(), message_->topic()},
-                                  {Tag::clientIdTag(), publisher->config().client_id},
+                                  {Tag::invocationStatus(), "failure"},
                               });
   }
 
diff --git a/cpp/src/main/cpp/rocketmq/SimpleConsumerImpl.cpp b/cpp/src/main/cpp/rocketmq/SimpleConsumerImpl.cpp
index 82e6b42..6a2dc75 100644
--- a/cpp/src/main/cpp/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/SimpleConsumerImpl.cpp
@@ -67,7 +67,7 @@ void SimpleConsumerImpl::buildClientSettings(rmq::Settings& settings) {
   }
 }
 
-void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string>& topics) {
+void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string> topics) {
   absl::MutexLock lk(&subscriptions_mtx_);
   for (const auto& entry : subscriptions_) {
     if (std::find(topics.begin(), topics.end(), entry.first) == topics.end()) {
diff --git a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
index c136cc9..c095c56 100644
--- a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -30,6 +30,7 @@
 #include "InvocationContext.h"
 #include "MessageExt.h"
 #include "NameServerResolver.h"
+#include "OpencensusHandler.h"
 #include "RpcClient.h"
 #include "Session.h"
 #include "TelemetryBidiReactor.h"
@@ -144,7 +145,7 @@ protected:
   absl::flat_hash_map<std::string, std::unique_ptr<Session>> session_map_ GUARDED_BY(session_map_mtx_);
   absl::Mutex session_map_mtx_;
 
-  virtual void topicsOfInterest(std::vector<std::string>& topics) {
+  virtual void topicsOfInterest(std::vector<std::string> topics) {
   }
 
   void updateRouteInfo() LOCKS_EXCLUDED(topic_route_table_mtx_);
diff --git a/cpp/src/main/cpp/rocketmq/include/ProducerImpl.h b/cpp/src/main/cpp/rocketmq/include/ProducerImpl.h
index a50c6ef..d3d865c 100644
--- a/cpp/src/main/cpp/rocketmq/include/ProducerImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/ProducerImpl.h
@@ -106,8 +106,7 @@ public:
 
   void buildClientSettings(rmq::Settings& settings) override;
 
-  void topicsOfInterest(std::vector<std::string> topics)
-      LOCKS_EXCLUDED(topics_mtx_);
+  void topicsOfInterest(std::vector<std::string> topics) override LOCKS_EXCLUDED(topics_mtx_);
 
   const PublishStats& stats() const { return stats_; }
 
@@ -152,7 +151,7 @@ private:
 
   void ensureRunning(std::error_code& ec) const noexcept;
 
-  bool validate(const Message& message);
+  void validate(const Message& message, std::error_code& ec);
 
   void send0(MessageConstPtr message, SendCallback callback, std::vector<rmq::MessageQueue> list);
 
diff --git a/cpp/src/main/cpp/rocketmq/include/PushConsumerImpl.h b/cpp/src/main/cpp/rocketmq/include/PushConsumerImpl.h
index 0067009..95a07f2 100644
--- a/cpp/src/main/cpp/rocketmq/include/PushConsumerImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/PushConsumerImpl.h
@@ -52,7 +52,7 @@ public:
 
   void prepareHeartbeatData(HeartbeatRequest& request) override;
 
-  void topicsOfInterest(std::vector<std::string>& topics) override LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
+  void topicsOfInterest(std::vector<std::string> topics) override LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
 
   void start() override;
 
diff --git a/cpp/src/main/cpp/rocketmq/include/SimpleConsumerImpl.h b/cpp/src/main/cpp/rocketmq/include/SimpleConsumerImpl.h
index 5a5a1b9..9fac1bb 100644
--- a/cpp/src/main/cpp/rocketmq/include/SimpleConsumerImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/SimpleConsumerImpl.h
@@ -56,7 +56,7 @@ public:
                                ChangeInvisibleDurationCallback callback);
 
 protected:
-  void topicsOfInterest(std::vector<std::string>& topics) override;
+  void topicsOfInterest(std::vector<std::string> topics) override;
 
 private:
   absl::flat_hash_map<std::string, FilterExpression> subscriptions_ GUARDED_BY(subscriptions_mtx_);
diff --git a/cpp/src/main/cpp/stats/ConsumeStats.cpp b/cpp/src/main/cpp/stats/ConsumeStats.cpp
index c95ad2e..df4ee7e 100644
--- a/cpp/src/main/cpp/stats/ConsumeStats.cpp
+++ b/cpp/src/main/cpp/stats/ConsumeStats.cpp
@@ -22,18 +22,7 @@
 ROCKETMQ_NAMESPACE_BEGIN
 
 ConsumeStats::ConsumeStats()
-    : process_success_(
-          opencensus::stats::MeasureInt64::Register("process_success", "Number of messages processed", "1")),
-      process_failure_(opencensus::stats::MeasureInt64::Register(
-          "process_failure", "Number of failures when processing messages", "1")),
-      ack_success_(opencensus::stats::MeasureInt64::Register("ack_success", "Number of messages acknowledged", "1")),
-      ack_failure_(opencensus::stats::MeasureInt64::Register(
-          "ack_failure", "Number of failures when acknowledging messages", "1")),
-      change_invisible_time_success_(opencensus::stats::MeasureInt64::Register(
-          "change_invisible_time_success", "Number of change-invisible-time performed", "1")),
-      change_invisible_time_failure_(opencensus::stats::MeasureInt64::Register(
-          "change_invisible_time_failure", "Number of failures when changing message invisible time", "1")),
-      cached_message_quantity_(opencensus::stats::MeasureInt64::Register(
+    : cached_message_quantity_(opencensus::stats::MeasureInt64::Register(
           "cached_message_quantity", "Number of locally cached messages", "1")),
       cached_message_bytes_(opencensus::stats::MeasureInt64::Register(
           "cached_message_bytes", "Number of locally cached messages in bytes", "1")),
@@ -42,60 +31,6 @@ ConsumeStats::ConsumeStats()
       await_time_(opencensus::stats::MeasureInt64::Register(
           "await_time", "Client side queuing time of messages before getting processed", "1")),
       process_time_(opencensus::stats::MeasureInt64::Register("process_time", "Process message time", "1")) {
-  opencensus::stats::ViewDescriptor()
-      .set_name("rocketmq_process_success_total")
-      .set_description("Number of messages processed")
-      .set_measure("process_success")
-      .set_aggregation(opencensus::stats::Aggregation::Sum())
-      .add_column(Tag::topicTag())
-      .add_column(Tag::clientIdTag())
-      .RegisterForExport();
-
-  opencensus::stats::ViewDescriptor()
-      .set_name("rocketmq_process_failure_total")
-      .set_description("Number of failures on processing messages")
-      .set_measure("process_failure")
-      .set_aggregation(opencensus::stats::Aggregation::Sum())
-      .add_column(Tag::topicTag())
-      .add_column(Tag::clientIdTag())
-      .RegisterForExport();
-
-  opencensus::stats::ViewDescriptor()
-      .set_name("rocketmq_ack_success_total")
-      .set_description("Number of messages acknowledged")
-      .set_measure("ack_success")
-      .set_aggregation(opencensus::stats::Aggregation::Sum())
-      .add_column(Tag::topicTag())
-      .add_column(Tag::clientIdTag())
-      .RegisterForExport();
-
-  opencensus::stats::ViewDescriptor()
-      .set_name("rocketmq_ack_failure_total")
-      .set_description("Number of failures on acknowledging messages")
-      .set_measure("ack_failure")
-      .set_aggregation(opencensus::stats::Aggregation::Sum())
-      .add_column(Tag::topicTag())
-      .add_column(Tag::clientIdTag())
-      .RegisterForExport();
-
-  opencensus::stats::ViewDescriptor()
-      .set_name("rocketmq_change_invisible_time_success_total")
-      .set_description("Number of change-invisible-time operations")
-      .set_measure("change_invisible_time_success")
-      .set_aggregation(opencensus::stats::Aggregation::Sum())
-      .add_column(Tag::topicTag())
-      .add_column(Tag::clientIdTag())
-      .RegisterForExport();
-
-  opencensus::stats::ViewDescriptor()
-      .set_name("rocketmq_change_invisible_time_failure_total")
-      .set_description("Number of failed change-invisible-time operations")
-      .set_measure("change_invisible_time_failure")
-      .set_aggregation(opencensus::stats::Aggregation::Sum())
-      .add_column(Tag::topicTag())
-      .add_column(Tag::clientIdTag())
-      .RegisterForExport();
-
   opencensus::stats::ViewDescriptor()
       .set_name("rocketmq_consumer_cached_messages")
       .set_description("Number of messages locally cached")
diff --git a/cpp/src/main/cpp/stats/MetricBidiReactor.cpp b/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
index 62983ee..43e69ed 100644
--- a/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
+++ b/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
@@ -16,6 +16,8 @@
  */
 #include "MetricBidiReactor.h"
 
+#include <chrono>
+
 #include "LoggerImpl.h"
 #include "OpencensusExporter.h"
 #include "Signature.h"
@@ -24,23 +26,22 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client> client, std::weak_ptr<OpencensusExporter> exporter)
     : client_(client), exporter_(exporter) {
-  grpc::ClientContext context;
   auto ptr = client_.lock();
 
   Metadata metadata;
   Signature::sign(ptr->config(), metadata);
 
   for (const auto& entry : metadata) {
-    context.AddMetadata(entry.first, entry.second);
+    context_.AddMetadata(entry.first, entry.second);
   }
-  context.set_deadline(std::chrono::system_clock::now() + absl::ToChronoMilliseconds(ptr->config().request_timeout));
+  context_.set_deadline(std::chrono::system_clock::now() + std::chrono::hours(24));
 
   auto exporter_ptr = exporter_.lock();
   if (!exporter_ptr) {
+    SPDLOG_WARN("Exporter has already been destructed");
     return;
   }
-
-  exporter_ptr->stub()->async()->Export(&context, this);
+  exporter_ptr->stub()->async()->Export(&context_, this);
   StartCall();
 }
 
@@ -49,7 +50,7 @@ void MetricBidiReactor::OnReadDone(bool ok) {
     SPDLOG_WARN("Failed to read response");
     return;
   }
-
+  SPDLOG_DEBUG("OnReadDone OK");
   StartRead(&response_);
 }
 
@@ -58,7 +59,8 @@ void MetricBidiReactor::OnWriteDone(bool ok) {
     SPDLOG_WARN("Failed to report metrics");
     return;
   }
-
+  SPDLOG_DEBUG("OnWriteDone OK");
+  fireRead();
   bool expected = true;
   if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
     fireWrite();
@@ -75,10 +77,15 @@ void MetricBidiReactor::OnDone(const grpc::Status& s) {
     SPDLOG_DEBUG("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
   } else {
     SPDLOG_WARN("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
+    auto exporter = exporter_.lock();
+    if (exporter) {
+      exporter->resetStream();
+    }
   }
 }
 
 void MetricBidiReactor::write(ExportMetricsServiceRequest request) {
+  SPDLOG_DEBUG("Append ExportMetricsServiceRequest to buffer");
   {
     absl::MutexLock lk(&requests_mtx_);
     requests_.emplace_back(std::move(request));
@@ -99,10 +106,10 @@ void MetricBidiReactor::fireWrite() {
   bool expected = false;
   if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) {
     absl::MutexLock lk(&requests_mtx_);
-    request_.CopyFrom(requests_[0]);
+    request_ = std::move(*requests_.begin());
     requests_.erase(requests_.begin());
+    SPDLOG_DEBUG("MetricBidiReactor#StartWrite");
     StartWrite(&request_);
-    fireRead();
   }
 }
 
diff --git a/cpp/src/main/cpp/stats/OpencensusExporter.cpp b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
index 0bf13a4..effe512 100644
--- a/cpp/src/main/cpp/stats/OpencensusExporter.cpp
+++ b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
@@ -17,6 +17,7 @@
 
 #include "OpencensusExporter.h"
 
+#include "ClientManager.h"
 #include "MetricBidiReactor.h"
 #include "google/protobuf/util/time_util.h"
 
@@ -25,6 +26,13 @@ ROCKETMQ_NAMESPACE_BEGIN
 namespace opencensus_proto = opencensus::proto::metrics::v1;
 
 OpencensusExporter::OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client) : client_(client) {
+  auto client_shared_ptr = client.lock();
+  if (client_shared_ptr) {
+    auto channel = client_shared_ptr->manager()->createChannel(endpoints);
+    stub_ = opencensus::proto::agent::metrics::v1::MetricsService::NewStub(channel);
+  } else {
+    SPDLOG_ERROR("Failed to initialize OpencensusExporter. weak_ptr to Client is nullptr");
+  }
 }
 
 void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceRequest& request) {
@@ -33,7 +41,7 @@ void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceReques
   for (const auto& entry : data) {
     const auto& view_descriptor = entry.first;
 
-    auto metric = new opencensus::proto::metrics::v1::Metric();
+    auto metric = absl::make_unique<opencensus::proto::metrics::v1::Metric>();
     auto descriptor = metric->mutable_metric_descriptor();
     descriptor->set_name(view_descriptor.name());
     descriptor->set_description(view_descriptor.description());
@@ -62,116 +70,100 @@ void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceReques
 
     auto label_keys = descriptor->mutable_label_keys();
     for (const auto& column : view_descriptor.columns()) {
-      auto label_key = new opencensus::proto::metrics::v1::LabelKey;
+      auto label_key = absl::make_unique<opencensus::proto::metrics::v1::LabelKey>();
       label_key->set_key(column.name());
-      label_keys->AddAllocated(label_key);
+      label_keys->AddAllocated(label_key.release());
     }
 
     auto time_series = metric->mutable_timeseries();
     const auto& view_data = entry.second;
 
-    auto start_times = view_data.start_times();
-
+    // TODO: Opencensus provides end-timestamp of the statistics conducted whilst OpenTelemetry requires
+    // start-timestamp. Let us ignore the difference for now.
+    auto stats_time = google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(view_data.end_time()));
     switch (view_data.type()) {
       case opencensus::stats::ViewData::Type::kInt64: {
         for (const auto& entry : view_data.int_data()) {
-          auto time_series_element = new opencensus::proto::metrics::v1::TimeSeries();
-
-          auto search = start_times.find(entry.first);
-          if (search != start_times.end()) {
-            absl::Time time = search->second;
-            time_series_element->mutable_start_timestamp()->CopyFrom(
-                google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(time)));
-          }
-
+          auto time_series_element = absl::make_unique<opencensus::proto::metrics::v1::TimeSeries>();
+          time_series_element->mutable_start_timestamp()->CopyFrom(stats_time);
           auto label_values = time_series_element->mutable_label_values();
           for (const auto& value : entry.first) {
-            auto label_value = new opencensus::proto::metrics::v1::LabelValue;
+            auto label_value = absl::make_unique<opencensus::proto::metrics::v1::LabelValue>();
             label_value->set_value(value);
             label_value->set_has_value(true);
-            label_values->AddAllocated(label_value);
+            label_values->AddAllocated(label_value.release());
           }
-
-          auto point = new opencensus::proto::metrics::v1::Point();
+          auto point = absl::make_unique<opencensus::proto::metrics::v1::Point>();
+          point->mutable_timestamp()->CopyFrom(stats_time);
           point->set_int64_value(entry.second);
-          time_series_element->mutable_points()->AddAllocated(point);
-
-          time_series->AddAllocated(time_series_element);
+          time_series_element->mutable_points()->AddAllocated(point.release());
+          time_series->AddAllocated(time_series_element.release());
         }
         break;
       }
       case opencensus::stats::ViewData::Type::kDouble: {
         for (const auto& entry : view_data.double_data()) {
-          auto time_series_element = new opencensus::proto::metrics::v1::TimeSeries();
-          auto search = start_times.find(entry.first);
-          if (search != start_times.end()) {
-            absl::Time time = search->second;
-            time_series_element->mutable_start_timestamp()->CopyFrom(
-                google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(time)));
-          }
-
+          auto time_series_element = absl::make_unique<opencensus::proto::metrics::v1::TimeSeries>();
+          time_series_element->mutable_start_timestamp()->CopyFrom(stats_time);
           auto label_values = time_series_element->mutable_label_values();
           for (const auto& value : entry.first) {
-            auto label_value = new opencensus::proto::metrics::v1::LabelValue;
+            auto label_value = absl::make_unique<opencensus::proto::metrics::v1::LabelValue>();
             label_value->set_value(value);
             label_value->set_has_value(true);
-            label_values->AddAllocated(label_value);
+            label_values->AddAllocated(label_value.release());
           }
-
-          auto point = new opencensus::proto::metrics::v1::Point();
+          auto point = absl::make_unique<opencensus::proto::metrics::v1::Point>();
+          point->mutable_timestamp()->CopyFrom(stats_time);
           point->set_double_value(entry.second);
-          time_series_element->mutable_points()->AddAllocated(point);
-
-          time_series->AddAllocated(time_series_element);
+          time_series_element->mutable_points()->AddAllocated(point.release());
+          time_series->AddAllocated(time_series_element.release());
         }
         break;
       }
       case opencensus::stats::ViewData::Type::kDistribution: {
         for (const auto& entry : view_data.distribution_data()) {
-          auto time_series_element = new opencensus::proto::metrics::v1::TimeSeries();
-          auto search = start_times.find(entry.first);
-          if (search != start_times.end()) {
-            absl::Time time = search->second;
-            time_series_element->mutable_start_timestamp()->CopyFrom(
-                google::protobuf::util::TimeUtil::TimeTToTimestamp(absl::ToTimeT(time)));
-          }
-
+          auto time_series_element = absl::make_unique<opencensus::proto::metrics::v1::TimeSeries>();
+          time_series_element->mutable_start_timestamp()->CopyFrom(stats_time);
           auto label_values = time_series_element->mutable_label_values();
           for (const auto& value : entry.first) {
-            auto label_value = new opencensus::proto::metrics::v1::LabelValue;
+            auto label_value = absl::make_unique<opencensus::proto::metrics::v1::LabelValue>();
             label_value->set_value(value);
             label_value->set_has_value(true);
-            label_values->AddAllocated(label_value);
+            label_values->AddAllocated(label_value.release());
           }
-          auto point = new opencensus::proto::metrics::v1::Point();
-          auto distribution_value = new opencensus::proto::metrics::v1::DistributionValue;
-          point->set_allocated_distribution_value(distribution_value);
-          time_series_element->mutable_points()->AddAllocated(point);
+          auto point = absl::make_unique<opencensus::proto::metrics::v1::Point>();
+          point->mutable_timestamp()->CopyFrom(stats_time);
+          auto distribution_value = absl::make_unique<opencensus::proto::metrics::v1::DistributionValue>();
           distribution_value->set_count(entry.second.count());
           distribution_value->set_sum_of_squared_deviation(entry.second.sum_of_squared_deviation());
+          distribution_value->set_sum(entry.second.count() * entry.second.mean());
           for (const auto& cnt : entry.second.bucket_counts()) {
-            auto bucket = new opencensus::proto::metrics::v1::DistributionValue::Bucket();
+            auto bucket = absl::make_unique<opencensus::proto::metrics::v1::DistributionValue::Bucket>();
             bucket->set_count(cnt);
-            distribution_value->mutable_buckets()->AddAllocated(bucket);
+            distribution_value->mutable_buckets()->AddAllocated(bucket.release());
           }
-
           auto bucket_options = distribution_value->mutable_bucket_options();
-
           for (const auto& boundary : entry.second.bucket_boundaries().lower_boundaries()) {
             bucket_options->mutable_explicit_()->mutable_bounds()->Add(boundary);
           }
+          point->set_allocated_distribution_value(distribution_value.release());
+          time_series_element->mutable_points()->AddAllocated(point.release());
 
-          time_series->AddAllocated(time_series_element);
+          time_series->AddAllocated(time_series_element.release());
         }
         break;
       }
     }
 
-    metrics->AddAllocated(metric);
+    if (time_series->empty()) {
+      continue;
+    }
+
+    metrics->AddAllocated(metric.release());
   }
 }
 
-void OpencensusExporter::exportMetrics(
+void OpencensusExporter::ExportViewData(
     const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) {
   opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest request;
   wrap(data, request);
@@ -179,7 +171,13 @@ void OpencensusExporter::exportMetrics(
   if (!bidi_reactor_) {
     bidi_reactor_ = absl::make_unique<MetricBidiReactor>(client_, exporter);
   }
-  bidi_reactor_->write(request);
+
+  if (request.metrics_size()) {
+    SPDLOG_DEBUG("ExportMetricRequest: {}", request.DebugString());
+    bidi_reactor_->write(request);
+  } else {
+    SPDLOG_DEBUG("ExportMetricRequest contains no valid metric");
+  }
 }
 
 void OpencensusExporter::resetStream() {
diff --git a/cpp/src/main/cpp/stats/include/Tag.h b/cpp/src/main/cpp/stats/OpencensusHandler.cpp
similarity index 69%
copy from cpp/src/main/cpp/stats/include/Tag.h
copy to cpp/src/main/cpp/stats/OpencensusHandler.cpp
index ae69fcb..422108d 100644
--- a/cpp/src/main/cpp/stats/include/Tag.h
+++ b/cpp/src/main/cpp/stats/OpencensusHandler.cpp
@@ -14,22 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#pragma once
+#include "OpencensusHandler.h"
 
-#include "opencensus/stats/stats.h"
-#include "rocketmq/RocketMQ.h"
+#include "MetricBidiReactor.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-class Tag {
-public:
-  static opencensus::tags::TagKey& topicTag();
+OpencensusHandler::OpencensusHandler(std::string endpoints, std::weak_ptr<Client> client)
+    : exporter_(std::make_shared<OpencensusExporter>(std::move(endpoints), std::move(client))) {
+}
 
-  static opencensus::tags::TagKey& clientIdTag();
-
-  static opencensus::tags::TagKey& userIdTag();
-
-  static opencensus::tags::TagKey& deploymentTag();
-};
+void OpencensusHandler::ExportViewData(const MetricData& data) {
+  exporter_->ExportViewData(data);
+}
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/PublishStats.cpp b/cpp/src/main/cpp/stats/PublishStats.cpp
index a012515..0338efe 100644
--- a/cpp/src/main/cpp/stats/PublishStats.cpp
+++ b/cpp/src/main/cpp/stats/PublishStats.cpp
@@ -21,33 +21,13 @@
 ROCKETMQ_NAMESPACE_BEGIN
 
 PublishStats::PublishStats()
-    : success_(opencensus::stats::MeasureInt64::Register("publish_success", "Number of message published", "1")),
-      failure_(opencensus::stats::MeasureInt64::Register("publish_failure", "Number of publish failures", "1")),
-      latency_(opencensus::stats::MeasureInt64::Register("publish_latency", "Publish latency in milliseconds", "ms")) {
-  opencensus::stats::ViewDescriptor()
-      .set_name("rocketmq_send_success_total")
-      .set_description("Number of messages published")
-      .set_measure("publish_success")
-      .set_aggregation(opencensus::stats::Aggregation::Sum())
-      .add_column(Tag::topicTag())
-      .add_column(Tag::clientIdTag())
-      .RegisterForExport();
-
-  opencensus::stats::ViewDescriptor()
-      .set_name("rocketmq_send_failure_total")
-      .set_description("Number of publish failures")
-      .set_measure("publish_failure")
-      .set_aggregation(opencensus::stats::Aggregation::Sum())
-      .add_column(Tag::topicTag())
-      .add_column(Tag::clientIdTag())
-      .RegisterForExport();
-
+    : latency_(opencensus::stats::MeasureInt64::Register("publish_latency", "Publish latency in milliseconds", "ms")) {
   opencensus::stats::ViewDescriptor()
       .set_name("rocketmq_send_cost_time")
       .set_description("Publish latency")
       .set_measure("publish_latency")
       .set_aggregation(opencensus::stats::Aggregation::Distribution(
-          opencensus::stats::BucketBoundaries::Explicit({5, 10, 20, 50, 500})))
+          opencensus::stats::BucketBoundaries::Explicit({1, 5, 10, 20, 50, 200, 500})))
       .add_column(Tag::topicTag())
       .add_column(Tag::clientIdTag())
       .RegisterForExport();
diff --git a/cpp/src/main/cpp/stats/StdoutHandler.cpp b/cpp/src/main/cpp/stats/StdoutHandler.cpp
new file mode 100644
index 0000000..9d18d78
--- /dev/null
+++ b/cpp/src/main/cpp/stats/StdoutHandler.cpp
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StdoutHandler.h"
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void StdoutHandler::ExportViewData(
+    const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) {
+  for (const auto& datum : data) {
+    const auto& view_data = datum.second;
+    const auto& descriptor = datum.first;
+    auto start_times = view_data.start_times();
+    auto columns = descriptor.columns();
+
+    switch (view_data.type()) {
+      case opencensus::stats::ViewData::Type::kInt64: {
+        auto data_map = view_data.int_data();
+        for (const auto& entry : data_map) {
+          absl::Time time = start_times[entry.first];
+          std::string line;
+          line.append(absl::FormatTime(time)).append(" ");
+          line.append(descriptor.name());
+          line.append("{");
+          for (std::size_t i = 0; i < columns.size(); i++) {
+            line.append(columns[i].name()).append("=").append(entry.first[i]);
+            if (i < columns.size() - 1) {
+              line.append(", ");
+            } else {
+              line.append("} ==> ");
+            }
+          }
+          line.append(std::to_string(entry.second));
+          println(line);
+        }
+        break;
+      }
+      case opencensus::stats::ViewData::Type::kDouble: {
+        exportDatum(datum.first, view_data.start_time(), view_data.end_time(), view_data.double_data());
+        break;
+      }
+      case opencensus::stats::ViewData::Type::kDistribution: {
+        for (const auto& entry : view_data.distribution_data()) {
+          std::string line(descriptor.name());
+          line.append("{");
+          for (std::size_t i = 0; i < columns.size(); i++) {
+            line.append(columns[i].name()).append("=").append(entry.first[i]);
+            if (i < columns.size() - 1) {
+              line.append(", ");
+            } else {
+              line.append("} ==> ");
+            }
+          }
+          line.append(entry.second.DebugString());
+          println(line);
+
+          println(absl::StrJoin(entry.second.bucket_boundaries().lower_boundaries(), ","));
+        }
+        break;
+      }
+    }
+  }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/Tag.cpp b/cpp/src/main/cpp/stats/Tag.cpp
index 9599233..62f2076 100644
--- a/cpp/src/main/cpp/stats/Tag.cpp
+++ b/cpp/src/main/cpp/stats/Tag.cpp
@@ -28,14 +28,9 @@ opencensus::tags::TagKey& Tag::clientIdTag() {
   return client_id_tag;
 }
 
-opencensus::tags::TagKey& Tag::userIdTag() {
-  static opencensus::tags::TagKey uid_tag = opencensus::tags::TagKey::Register("uid");
-  return uid_tag;
-}
-
-opencensus::tags::TagKey& Tag::deploymentTag() {
-  static opencensus::tags::TagKey deployment_tag = opencensus::tags::TagKey::Register("deployment");
-  return deployment_tag;
+opencensus::tags::TagKey& Tag::invocationStatus() {
+  static opencensus::tags::TagKey invocation_status = opencensus::tags::TagKey::Register("invocation_status");
+  return invocation_status;
 }
 
 ROCKETMQ_NAMESPACE_END
diff --git a/cpp/src/main/cpp/stats/include/ConsumeStats.h b/cpp/src/main/cpp/stats/include/ConsumeStats.h
index a70f324..470a12d 100644
--- a/cpp/src/main/cpp/stats/include/ConsumeStats.h
+++ b/cpp/src/main/cpp/stats/include/ConsumeStats.h
@@ -25,30 +25,6 @@ class ConsumeStats {
 public:
   ConsumeStats();
 
-  const opencensus::stats::MeasureInt64& processSuccess() const {
-    return process_success_;
-  }
-
-  const opencensus::stats::MeasureInt64& processFailure() const {
-    return process_failure_;
-  }
-
-  const opencensus::stats::MeasureInt64& ackSuccess() const {
-    return ack_success_;
-  }
-
-  const opencensus::stats::MeasureInt64& ackFailure() const {
-    return ack_failure_;
-  }
-
-  const opencensus::stats::MeasureInt64& changeInvisibleTimeSuccess() const {
-    return change_invisible_time_success_;
-  }
-
-  const opencensus::stats::MeasureInt64& changeInvisibleTimeFailure() const {
-    return change_invisible_time_failure_;
-  }
-
   const opencensus::stats::MeasureInt64& cachedMessageQuantity() const {
     return cached_message_quantity_;
   }
@@ -70,12 +46,6 @@ public:
   }
 
 private:
-  opencensus::stats::MeasureInt64 process_success_;
-  opencensus::stats::MeasureInt64 process_failure_;
-  opencensus::stats::MeasureInt64 ack_success_;
-  opencensus::stats::MeasureInt64 ack_failure_;
-  opencensus::stats::MeasureInt64 change_invisible_time_success_;
-  opencensus::stats::MeasureInt64 change_invisible_time_failure_;
   opencensus::stats::MeasureInt64 cached_message_quantity_;
   opencensus::stats::MeasureInt64 cached_message_bytes_;
 
diff --git a/cpp/src/main/cpp/stats/include/MetricBidiReactor.h b/cpp/src/main/cpp/stats/include/MetricBidiReactor.h
index cc08c1d..0a10cd8 100644
--- a/cpp/src/main/cpp/stats/include/MetricBidiReactor.h
+++ b/cpp/src/main/cpp/stats/include/MetricBidiReactor.h
@@ -59,6 +59,7 @@ public:
 private:
   std::weak_ptr<Client> client_;
   std::weak_ptr<OpencensusExporter> exporter_;
+  grpc::ClientContext context_;
 
   ExportMetricsServiceRequest request_;
 
diff --git a/cpp/src/main/cpp/stats/include/OpencensusExporter.h b/cpp/src/main/cpp/stats/include/OpencensusExporter.h
index 7920ff5..161843c 100644
--- a/cpp/src/main/cpp/stats/include/OpencensusExporter.h
+++ b/cpp/src/main/cpp/stats/include/OpencensusExporter.h
@@ -17,9 +17,9 @@
 #pragma once
 
 #include "Client.h"
-#include "Exporter.h"
 #include "grpcpp/grpcpp.h"
 #include "opencensus/proto/agent/metrics/v1/metrics_service.grpc.pb.h"
+#include "opencensus/stats/stats.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -30,11 +30,12 @@ using StubPtr = std::unique_ptr<Stub>;
 using MetricData = std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>;
 using ExportMetricsServiceRequest = opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
 
-class OpencensusExporter : public Exporter, public std::enable_shared_from_this<OpencensusExporter> {
+class OpencensusExporter : public opencensus::stats::StatsExporter::Handler,
+                           public std::enable_shared_from_this<OpencensusExporter> {
 public:
   OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client);
 
-  void exportMetrics(const MetricData& data) override;
+  void ExportViewData(const MetricData& data) override;
 
   static void wrap(const MetricData& data, ExportMetricsServiceRequest& request);
 
diff --git a/cpp/src/main/cpp/stats/include/Exporter.h b/cpp/src/main/cpp/stats/include/OpencensusHandler.h
similarity index 73%
rename from cpp/src/main/cpp/stats/include/Exporter.h
rename to cpp/src/main/cpp/stats/include/OpencensusHandler.h
index 7f2f24a..5c6ec1f 100644
--- a/cpp/src/main/cpp/stats/include/Exporter.h
+++ b/cpp/src/main/cpp/stats/include/OpencensusHandler.h
@@ -16,15 +16,17 @@
  */
 #pragma once
 
-#include "opencensus/stats/stats.h"
-#include "rocketmq/RocketMQ.h"
+#include "OpencensusExporter.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-class Exporter {
+class OpencensusHandler : public opencensus::stats::StatsExporter::Handler {
 public:
-  virtual void exportMetrics(
-      const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) = 0;
+  OpencensusHandler(std::string endpoints, std::weak_ptr<Client> client);
+
+  void ExportViewData(const MetricData& data) override;
+
+  std::shared_ptr<OpencensusExporter> exporter_;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/include/PublishStats.h b/cpp/src/main/cpp/stats/include/PublishStats.h
index d9d750a..3872808 100644
--- a/cpp/src/main/cpp/stats/include/PublishStats.h
+++ b/cpp/src/main/cpp/stats/include/PublishStats.h
@@ -27,22 +27,12 @@ ROCKETMQ_NAMESPACE_BEGIN
 class PublishStats {
 public:
   PublishStats();
-
-  const opencensus::stats::MeasureInt64& success() const {
-    return success_;
-  }
-
-  const opencensus::stats::MeasureInt64& failure() const {
-    return failure_;
-  }
-
+  
   const opencensus::stats::MeasureInt64& latency() const {
     return latency_;
   }
 
 private:
-  opencensus::stats::MeasureInt64 success_;
-  opencensus::stats::MeasureInt64 failure_;
   opencensus::stats::MeasureInt64 latency_;
 };
 
diff --git a/cpp/src/main/cpp/stats/include/StdoutHandler.h b/cpp/src/main/cpp/stats/include/StdoutHandler.h
new file mode 100644
index 0000000..1bfc3dc
--- /dev/null
+++ b/cpp/src/main/cpp/stats/include/StdoutHandler.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <mutex>
+#include <string>
+
+#include "opencensus/stats/stats.h"
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class StdoutHandler : public opencensus::stats::StatsExporter::Handler {
+public:
+  void ExportViewData(
+      const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) override;
+
+private:
+  template <typename T>
+  void exportDatum(const opencensus::stats::ViewDescriptor& descriptor,
+                   absl::Time start_time,
+                   absl::Time end_time,
+                   const opencensus::stats::ViewData::DataMap<T>& data) {
+    if (data.empty()) {
+      // std::cout << "No data for " << descriptor.name() << std::endl;
+      return;
+    }
+
+    for (const auto& row : data) {
+      for (std::size_t column = 0; column < descriptor.columns().size(); column++) {
+        std::cout << descriptor.name() << "[" << descriptor.columns()[column].name() << "=" << row.first[column] << "]"
+                  << dataToString(row.second) << std::endl;
+      }
+    }
+  }
+
+  std::mutex console_mtx_;
+
+  void println(const std::string& line) {
+    std::lock_guard<std::mutex> lk(console_mtx_);
+    std::cout << line << std::endl;
+  }
+
+  // Functions to format data for different aggregation types.
+  std::string dataToString(double data) {
+    return absl::StrCat(": ", data, "\n");
+  }
+  std::string dataToString(int64_t data) {
+    return absl::StrCat(": ", data, "\n");
+  }
+  std::string dataToString(const opencensus::stats::Distribution& data) {
+    std::string output = "\n";
+    std::vector<std::string> lines = absl::StrSplit(data.DebugString(), '\n');
+    // Add indent.
+    for (const auto& line : lines) {
+      absl::StrAppend(&output, "    ", line, "\n");
+    }
+    return output;
+  }
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/include/Tag.h b/cpp/src/main/cpp/stats/include/Tag.h
index ae69fcb..df78650 100644
--- a/cpp/src/main/cpp/stats/include/Tag.h
+++ b/cpp/src/main/cpp/stats/include/Tag.h
@@ -27,9 +27,7 @@ public:
 
   static opencensus::tags::TagKey& clientIdTag();
 
-  static opencensus::tags::TagKey& userIdTag();
-
-  static opencensus::tags::TagKey& deploymentTag();
+  static opencensus::tags::TagKey& invocationStatus();
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/tests/PublishStatsTest.cpp b/cpp/src/main/cpp/stats/tests/PublishStatsTest.cpp
index b6fb7d3..7c4d997 100644
--- a/cpp/src/main/cpp/stats/tests/PublishStatsTest.cpp
+++ b/cpp/src/main/cpp/stats/tests/PublishStatsTest.cpp
@@ -136,8 +136,6 @@ TEST(StatsTest, testBasics) {
   std::atomic_bool stopped{false};
   auto generator = [&]() {
     while (!stopped) {
-      opencensus::stats::Record({{metrics.success(), 1}}, {{Tag::topicTag(), t1}, {Tag::clientIdTag(), "client-0"}});
-      opencensus::stats::Record({{metrics.success(), 100}}, {{Tag::topicTag(), t2}, {Tag::clientIdTag(), "client-0"}});
       for (std::size_t i = 0; i < 10; i++) {
         opencensus::stats::Record({{metrics.latency(), i * 10}},
                                   {{Tag::topicTag(), t1}, {Tag::clientIdTag(), "client-0"}});