You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/04 05:43:10 UTC

[rocketmq-clients] branch master updated: Make the CPP codebase compiles on popular platforms Mac/Windows/Linux (#24)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new bd3f083  Make the CPP codebase compiles on popular platforms Mac/Windows/Linux (#24)
bd3f083 is described below

commit bd3f08307ef69d77db59cbccdf43f746a69f6b8a
Author: Zhanhui Li <li...@gmail.com>
AuthorDate: Mon Jul 4 13:43:07 2022 +0800

    Make the CPP codebase compiles on popular platforms Mac/Windows/Linux (#24)
    
    * Sync and update protobuf files to v2.0-alpha
    
    * Complete error handling of QueryRoute
    
    * Complete error handling for Heartbeat
    
    * Finish error handling of SendMessage
    
    * Complete error handling for QueryAssignment
    
    * Add error handling for ReceiveMessage
    
    * Add error handling for Ack
    
    * Add error handling for EndTransaction
    
    * Add error handling for NotifyClientTermination
    
    * Add error handling
    
    * Translate ClientIdRequired to InternalClientError
    
    * Add dependent CDN through OSS
    
    * Add io_bazel_rules_go mirror
    
    * Add mirror for rules_proto
    
    * Fix url of bazel_proto mirror
    
    * Upgrade grpc to v1.46.3.1
    
    * Upgrade grpc to v1.46.3.2
    
    * Use official gRPC release v1.46.3
    
    * Try to build on Linux, MacOS, Windows
    
    * Use matrix
    
    * Fix clang check
    
    * Upgrade bazel version
    
    * Add sha256 for grpc
    
    * Fix to make it compile on Windows
    
    * Fix typo
---
 .github/workflows/cpp_build.yml                    |  10 +-
 cpp/.bazelrc                                       |   4 +-
 cpp/.bazelversion                                  |   2 +-
 cpp/api/rocketmq/ErrorCode.h                       | 107 ++++-
 cpp/api/rocketmq/Tracing.h                         |  10 +-
 cpp/bazel/rocketmq_deps.bzl                        |  55 ++-
 cpp/proto/apache/rocketmq/v2/definition.proto      | 178 +++----
 cpp/proto/apache/rocketmq/v2/service.proto         |   7 +
 cpp/src/main/cpp/base/ErrorCategory.cpp            |  29 +-
 cpp/src/main/cpp/base/Tracing.cpp                  |  26 --
 cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp    |  14 +-
 cpp/src/main/cpp/client/ClientManagerImpl.cpp      | 515 ++++++++++++++++++---
 .../main/cpp/client/ReceiveMessageStreamReader.cpp |  54 ++-
 cpp/src/main/cpp/client/RpcClientImpl.cpp          |   4 +-
 cpp/src/main/cpp/client/TelemetryBidiReactor.cpp   |   4 +-
 cpp/src/main/cpp/client/include/ClientConfig.h     |   7 +-
 cpp/src/main/cpp/client/include/RpcClient.h        |   7 +-
 cpp/src/main/cpp/client/include/RpcClientImpl.h    |   7 +-
 cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp     |   4 +-
 cpp/src/main/cpp/rocketmq/ProducerImpl.cpp         |  13 +-
 cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp     |   2 +-
 cpp/src/main/cpp/rocketmq/include/ClientImpl.h     |   6 +
 cpp/src/main/cpp/stats/PublishStats.cpp            |   2 +-
 23 files changed, 804 insertions(+), 263 deletions(-)

diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml
index 4d02855..a09c782 100644
--- a/.github/workflows/cpp_build.yml
+++ b/.github/workflows/cpp_build.yml
@@ -1,11 +1,13 @@
 name: CPP Build
 on: [push, pull_request]
 jobs:
-  cpp_ubuntu_18_04:
-    name: Ubuntu 18.04
-    runs-on: ubuntu-18.04
+  cpp:
+    strategy:
+      matrix:
+        os: [ubuntu-18.04, ubuntu-20.04, ubuntu-22.04, macos-12,  macos-11, windows-2019]
+    runs-on: ${{ matrix.os }}
     steps:
       - uses: actions/checkout@v2
       - name: Compile All Targets
         working-directory: ./cpp
-        run: bazel build //...
+        run: bazel build //...
\ No newline at end of file
diff --git a/cpp/.bazelrc b/cpp/.bazelrc
index d5ec2a9..6b7b1da 100644
--- a/cpp/.bazelrc
+++ b/cpp/.bazelrc
@@ -13,8 +13,6 @@ run --color=yes
 build --color=yes
 
 build --host_force_python=PY3
-build --host_javabase=@bazel_tools//tools/jdk:remote_jdk11
-build --javabase=@bazel_tools//tools/jdk:remote_jdk11
 
 # https://docs.bazel.build/versions/main/command-line-reference.html#flag--enable_platform_specific_config
 # If true, Bazel picks up host-OS-specific config lines from bazelrc files. For example, if the host OS is Linux and
@@ -48,7 +46,7 @@ build --action_env=LD_LIBRARY_PATH
 build --action_env=LLVM_CONFIG
 build --action_env=PATH
 
-build --copt=-maes
+build:linux --copt=-maes
 
 # Common flags for sanitizers
 build:sanitizer --define tcmalloc=disabled
diff --git a/cpp/.bazelversion b/cpp/.bazelversion
index af8c8ec..91ff572 100644
--- a/cpp/.bazelversion
+++ b/cpp/.bazelversion
@@ -1 +1 @@
-4.2.2
+5.2.0
diff --git a/cpp/api/rocketmq/ErrorCode.h b/cpp/api/rocketmq/ErrorCode.h
index 5068c83..4f05ded 100644
--- a/cpp/api/rocketmq/ErrorCode.h
+++ b/cpp/api/rocketmq/ErrorCode.h
@@ -31,67 +31,117 @@ enum class ErrorCode : int {
    * @brief Client state not as expected. Call Producer#start() first.
    *
    */
-  IllegalState = 1,
+  IllegalState = 10000,
+
+  /**
+   * @brief To publish FIFO messages, only synchronous API is supported.
+   */
+  BadRequestAsyncPubFifoMessage = 10100,
+
+  /**
+   * @brief 102XX is used for client side error.
+   *
+   */
+  InternalClientError = 10200,
 
   /**
    * @brief Broker has processed the request but is not going to return any content.
    */
-  NoContent = 204,
+  NoContent = 20400,
 
   /**
    * @brief Bad configuration. For example, negative max-attempt-times.
    *
    */
-  BadConfiguration = 300,
+  BadConfiguration = 30000,
 
   /**
-   * @brief The server cannot process the request due to apprent client-side
+   * @brief Generic code, representing multiple return results.
+   *
+   */
+  MultipleResults = 30100,
+
+  /**
+   * @brief The server cannot process the request due to apparent client-side
    * error. For example, topic contains invalid character or is excessively
    * long.
    *
    */
-  BadRequest = 400,
+  BadRequest = 40000,
 
   /**
-   * @brief To publish FIFO messages, only synchronous API is supported.
+   * @brief The access point is illegal
+   *
    */
-  BadRequestAsyncPubFifoMessage = 40001,
+  IllegalAccessPoint = 40001,
+
+  IllegalTopic = 40002,
+  IllegalConsumerGroup = 40003,
+  IllegalMessageTag = 40004,
+  IllegalMessageKey = 40005,
+  IllegalMessageGroup = 40006,
+  IllegalMessageProperty = 40007,
+  InvalidTransactionId = 40008,
+  IllegalMessageId = 40009,
+  IllegalFilterExpression = 40010,
+  InvalidReceiptHandle = 40011,
+
+  /**
+   * @brief Message property conflicts with its type.
+   */
+  MessagePropertyConflictWithType = 40012,
+
+  // Client type is not recognized by server
+  UnsupportedClientType = 40013,
+
+  // Received message is corrupted, generally failing to pass integrity checksum validation.
+  MessageCorrupted = 40014,
+
+  // Request is rejected due to missing of x-mq-client-id HTTP header in the metadata frame.
+  ClientIdRequired = 40015,
 
   /**
    * @brief Authentication failed. Possibly caused by invalid credentials.
    *
    */
-  Unauthorized = 401,
+  Unauthorized = 40100,
 
   /**
    * @brief Credentials are understood by server but authenticated user does not
    * have privilege to perform the requested action.
    *
    */
-  Forbidden = 403,
+  Forbidden = 40300,
 
   /**
    * @brief Topic not found, which should be created through console or
    * administration API before hand.
    *
    */
-  NotFound = 404,
+  NotFound = 40400,
 
-  TopicNotFound = 404001,
+  MessageNotFound = 40401,
 
-  GroupNotFound = 404002,
+  TopicNotFound = 404002,
+
+  ConsumerGroupNotFound = 404003,
 
   /**
    * @brief Timeout when connecting, reading from or writing to brokers.
    *
    */
-  RequestTimeout = 408,
+  RequestTimeout = 40800,
 
   /**
    * @brief Message body is too large.
    *
    */
-  PayloadTooLarge = 413,
+  PayloadTooLarge = 41300,
+
+  /**
+   * @brief Message body size exceeds limited allowed by server.
+   */
+  MessageBodyTooLarge = 41301,
 
   /**
    * @brief When trying to perform an action whose dependent procedure state is
@@ -100,14 +150,14 @@ enum class ErrorCode : int {
    * 2. Commit/Rollback a transactional message that does not exist;
    * 3. Commit an offset which is greater than maximum of partition;
    */
-  PreconditionRequired = 428,
+  PreconditionRequired = 42800,
 
   /**
    * @brief Quota exchausted. The user has sent too many requests in a given
    * amount of time.
    *
    */
-  TooManyRequest = 429,
+  TooManyRequests = 42900,
 
   /**
    * @brief The server is unwilling to process the request because either an
@@ -115,62 +165,69 @@ enum class ErrorCode : int {
    * large
    *
    */
-  HeaderFieldsTooLarge = 431,
+  HeaderFieldsTooLarge = 43100,
+
+  // Message properties total size exceeds the threshold.
+  MessagePropertiesTooLarge = 43101,
 
   /**
    * @brief A server operator has received a legal demand to deny access to a
    * resource or to a set of resources that includes the requested resource.
    *
    */
-  UnavailableForLegalReasons = 451,
+  UnavailableForLegalReasons = 45100,
 
   /**
    * @brief Server side interval error
    *
    */
-  InternalServerError = 500,
+  InternalServerError = 50000,
 
   /**
    * @brief The server either does not recognize the request method, or it lacks
    * the ability to fulfil the request.
    *
    */
-  NotImplemented = 501,
+  NotImplemented = 50100,
 
   /**
    * @brief The server was acting as a gateway or proxy and received an invalid
    * response from the upstream server.
    *
    */
-  BadGateway = 502,
+  BadGateway = 50200,
 
   /**
    * @brief The server cannot handle the request (because it is overloaded or
    * down for maintenance). Generally, this is a temporary state.
    *
    */
-  ServiceUnavailable = 503,
+  ServiceUnavailable = 50300,
 
   /**
    * @brief The server was acting as a gateway or proxy and did not receive a
    * timely response from the upstream server.
    *
    */
-  GatewayTimeout = 504,
+  GatewayTimeout = 50400,
 
   /**
    * @brief The server does not support the protocol version used in the
    * request.
    *
    */
-  ProtocolVersionNotSupported = 505,
+  NotSupported = 50500,
+
+  ProtocolUnsupported = 50501,
+
+  VerifyFifoMessageUnsupported = 50502,
 
   /**
    * @brief The server is unable to store the representation needed to complete
    * the request.
    *
    */
-  InsufficientStorage = 507,
+  InsufficientStorage = 50700,
 };
 
 std::error_code make_error_code(ErrorCode code);
diff --git a/cpp/api/rocketmq/Tracing.h b/cpp/api/rocketmq/Tracing.h
index bc12362..cdec455 100644
--- a/cpp/api/rocketmq/Tracing.h
+++ b/cpp/api/rocketmq/Tracing.h
@@ -16,12 +16,18 @@
  */
 #pragma once
 
-#include "opencensus/trace/sampler.h"
+#include <memory>
 
 #include "RocketMQ.h"
+#include "opencensus/trace/sampler.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-opencensus::trace::Sampler* traceSampler() __attribute__((weak));
+class TracingSamplerProvider {
+public:
+  virtual ~TracingSamplerProvider() = default;
+
+  virtual std::unique_ptr<opencensus::trace::Sampler> tracingSampler() = 0;
+};
 
 ROCKETMQ_NAMESPACE_END
diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl
index 39f3642..983a476 100644
--- a/cpp/bazel/rocketmq_deps.bzl
+++ b/cpp/bazel/rocketmq_deps.bzl
@@ -26,7 +26,8 @@ def rocketmq_deps():
              sha256 = "b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5",
              strip_prefix = "googletest-release-1.11.0",
              urls = [
-                 "https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz",
+                "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz",
+                "https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz",
              ],
          )
 
@@ -36,6 +37,7 @@ def rocketmq_deps():
             strip_prefix = "filesystem-1.5.0",
             sha256 = "eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb",
             urls = [
+                "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz",
                 "https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz",
             ],
             build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD",
@@ -47,6 +49,7 @@ def rocketmq_deps():
             strip_prefix = "spdlog-1.9.2",
             sha256 = "6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38",
             urls = [
+                "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz",
                 "https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz",
             ],
             build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD",
@@ -58,6 +61,7 @@ def rocketmq_deps():
             strip_prefix = "fmt-8.0.1",
             sha256 = "b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01",
             urls = [
+                "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz",
                 "https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz",
             ],
             build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD",
@@ -107,16 +111,16 @@ def rocketmq_deps():
             ],
         )
 
-    if "com_github_grpc_grpc" not in native.existing_rules():
-        http_archive(
-            name = "com_github_grpc_grpc",
-            strip_prefix = "grpc-1.46.0",
-            sha256 = "67423a4cd706ce16a88d1549297023f0f9f0d695a96dd684adc21e67b021f9bc",
-            urls = [
-                "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.0.tar.gz",
-                "https://github.com/grpc/grpc/archive/refs/tags/v1.46.0.tar.gz",
-            ],
-        )
+    maybe(
+        http_archive,
+        name = "com_github_grpc_grpc",
+        strip_prefix = "grpc-1.46.3",
+        sha256 = "d6cbf22cb5007af71b61c6be316a79397469c58c82a942552a62e708bce60964",
+        urls = [
+            "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.3.tar.gz",
+            "https://github.com/grpc/grpc/archive/refs/tags/v1.46.3.tar.gz",
+        ],
+    )
 
     maybe(
         http_archive,
@@ -125,6 +129,7 @@ def rocketmq_deps():
         build_file = "@org_apache_rocketmq//third_party:asio.BUILD",
         strip_prefix = "asio-1.18.2",
         urls = [
+            "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/asio/asio-1.18.2.tar.gz",
             "https://github.com/lizhanhui/asio/archive/refs/tags/v1.18.2.tar.gz",
         ],
     )
@@ -135,7 +140,10 @@ def rocketmq_deps():
         sha256 = "0ff62e28eb0f6e563178d44b77c94dddb8702141d83dd34b83cb046399c2b1d5",
         build_file = "@org_apache_rocketmq//third_party:cpp_httplib.BUILD",
         strip_prefix = "cpp-httplib-0.9.4",
-        urls = ["https://github.com/yhirose/cpp-httplib/archive/refs/tags/v0.9.4.tar.gz"],
+        urls = [
+            "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/cpp-httplib/cpp-httplib-0.9.4.tar.gz",
+            "https://github.com/yhirose/cpp-httplib/archive/refs/tags/v0.9.4.tar.gz",
+        ],
     )
 
     maybe(
@@ -143,6 +151,7 @@ def rocketmq_deps():
         name = "com_google_googleapis",
         sha256 = "e89f15d54b0ddab0cd41d18cb2299e5447db704e2b05ff141cb1769170671466",
         urls = [
+            "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googleapis/googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip",
             "https://github.com/googleapis/googleapis/archive/af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip"
         ],
         strip_prefix = "googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95",
@@ -167,4 +176,26 @@ def rocketmq_deps():
             "https://github.com/bazelbuild/rules_swift/archive/refs/tags/0.27.0.tar.gz",
         ],
         strip_prefix = "rules_swift-0.27.0",
+    )
+
+    maybe(
+        http_archive,
+        name = "io_bazel_rules_go",
+        sha256 = "685052b498b6ddfe562ca7a97736741d87916fe536623afb7da2824c0211c369",
+        urls = [
+            "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-go/rules_go-v0.33.0.zip",
+            "https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip",
+            "https://github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip",
+        ],
+    )
+
+    maybe(
+        http_archive,
+        name = "rules_proto",
+        sha256 = "e017528fd1c91c5a33f15493e3a398181a9e821a804eb7ff5acdd1d2d6c2b18d",
+        strip_prefix = "rules_proto-4.0.0-3.20.0",
+        urls = [
+            "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto/rules_proto-4.0.0-3.20.0.tar.gz",
+            "https://github.com/bazelbuild/rules_proto/archive/refs/tags/4.0.0-3.20.0.tar.gz",
+        ],
     )
\ No newline at end of file
diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto b/cpp/proto/apache/rocketmq/v2/definition.proto
index 58bf06d..67520fe 100644
--- a/cpp/proto/apache/rocketmq/v2/definition.proto
+++ b/cpp/proto/apache/rocketmq/v2/definition.proto
@@ -292,106 +292,92 @@ message Assignment {
 }
 
 enum Code {
-  // Success.
-  OK = 0;
+  CODE_UNSPECIFIED = 0;
+
+  // Generic code for success.
+  OK = 20000;
+
+  // Generic code for multiple return results.
+  MULTIPLE_RESULTS = 30000;
+
+  // Generic code for bad request, indicating that required fields or headers are missing.
+  BAD_REQUEST = 40000;
   // Format of access point is illegal.
-  ILLEGAL_ACCESS_POINT = 1;
+  ILLEGAL_ACCESS_POINT = 40001;
   // Format of topic is illegal.
-  ILLEGAL_TOPIC = 2;
+  ILLEGAL_TOPIC = 40002;
   // Format of consumer group is illegal.
-  ILLEGAL_CONSUMER_GROUP = 3;
+  ILLEGAL_CONSUMER_GROUP = 40003;
   // Format of message tag is illegal.
-  ILLEGAL_MESSAGE_TAG = 4;
+  ILLEGAL_MESSAGE_TAG = 40004;
   // Format of message key is illegal.
-  ILLEGAL_MESSAGE_KEY = 5;
-  // Size of message keys exceeds the threshold.
-  MESSAGE_KEYS_TOO_LARGE = 6;
+  ILLEGAL_MESSAGE_KEY = 40005;
   // Format of message group is illegal.
-  ILLEGAL_MESSAGE_GROUP = 7;
+  ILLEGAL_MESSAGE_GROUP = 40006;
   // Format of message property key is illegal.
-  ILLEGAL_MESSAGE_PROPERTY_KEY = 8;
-  // Message properties total size exceeds the threshold.
-  MESSAGE_PROPERTIES_TOO_LARGE = 9;
-  // Message body size exceeds the threshold.
-  MESSAGE_BODY_TOO_LARGE = 10;
-
-  // User does not have the permission to operate.
-  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/403
-  FORBIDDEN = 403;
-
-  // Code indicates that the client request has not been completed
-  // because it lacks valid authentication credentials for the
-  // requested resource.
-  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/401
-  UNAUTHORIZED = 401;
-
-  // Topic resource does not exist.
-  TOPIC_NOT_FOUND = 13;
-
-  // Consumer group resource does not exist.
-  CONSUMER_GROUP_NOT_FOUND = 14;
-
-  // Not allowed to verify message. Chances are that you are verifying
-  // a FIFO message, as is violating FIFO semantics.
-  VERIFY_MESSAGE_FORBIDDEN = 15;
-
-  // Failed to consume message.
-  FAILED_TO_CONSUME_MESSAGE = 16;
-
-  // Message is corrupted.
-  MESSAGE_CORRUPTED = 17;
-
-  // Too many requests are made in short period of duration.
-  // Requests are throttled.
-  TOO_MANY_REQUESTS = 18;
-
-  // Expired receipt-handle is used when trying to acknowledge or change
-  // invisible duration of a message
-  RECEIPT_HANDLE_EXPIRED = 19;
-
-  // Message property is not match the message type.
-  MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 20;
-
-  // Format of message id is illegal.
-  ILLEGAL_MESSAGE_ID = 21;
-
+  ILLEGAL_MESSAGE_PROPERTY_KEY = 40007;
   // Transaction id is invalid.
-  INVALID_TRANSACTION_ID = 22;
-
+  INVALID_TRANSACTION_ID = 40008;
+  // Format of message id is illegal.
+  ILLEGAL_MESSAGE_ID = 40009;
   // Format of filter expression is illegal.
-  ILLEGAL_FILTER_EXPRESSION = 23;
-
+  ILLEGAL_FILTER_EXPRESSION = 40010;
   // Receipt handle of message is invalid.
-  INVALID_RECEIPT_HANDLE = 24;
-
-  // Message persistence timeout.
-  MASTER_PERSISTENCE_TIMEOUT = 25;
+  INVALID_RECEIPT_HANDLE = 40011;
+  // Message property conflicts with its type.
+  MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40012;
+  // Client type could not be recognized.
+  UNRECOGNIZED_CLIENT_TYPE = 40013;
+  // Message is corrupted.
+  MESSAGE_CORRUPTED = 40014;
+  // Request is rejected due to missing of x-mq-client-id header.
+  CLIENT_ID_REQUIRED = 40015;
 
-  // Slave persistence timeout.
-  SLAVE_PERSISTENCE_TIMEOUT = 26;
+  // Generic code indicates that the client request lacks valid authentication
+  // credentials for the requested resource.
+  UNAUTHORIZED = 40100;
 
-  // The HA-mechanism is not working now.
-  HA_NOT_AVAILABLE = 27;
+  // Generic code indicates that the account is suspended due to overdue of payment.
+  PAYMENT_REQUIRED = 40200;
 
-  // Operation is not allowed in current version.
-  VERSION_UNSUPPORTED = 28;
+  // Generic code for the case that user does not have the permission to operate.
+  FORBIDDEN = 40300;
 
+  // Generic code for resource not found.
+  NOT_FOUND = 40400;
   // Message not found from server.
-  MESSAGE_NOT_FOUND = 29;
+  MESSAGE_NOT_FOUND = 40401;
+  // Topic resource does not exist.
+  TOPIC_NOT_FOUND = 40402;
+  // Consumer group resource does not exist.
+  CONSUMER_GROUP_NOT_FOUND = 40403;
+
+  // Generic code representing client side timeout when connecting to, reading data from, or write data to server.
+  REQUEST_TIMEOUT = 40800;
 
-  // Message offset is illegal.
-  ILLEGAL_MESSAGE_OFFSET = 30;
+  // Generic code represents that the request entity is larger than limits defined by server.
+  PAYLOAD_TOO_LARGE = 41300;
+  // Message body size exceeds the threshold.
+  MESSAGE_BODY_TOO_LARGE = 41301;
 
-  // Illegal message is for the sake of backward compatibility. In most case,
-  // more definitive code is better, e.g. `ILLEGAL_MESSAGE_TAG`.
-  ILLEGAL_MESSAGE = 31;
+  // Generic code for use cases where pre-conditions are not met.
+  // For example, if a producer instance is used to publish messages without prior start() invocation,
+  // this error code will be raised.
+  PRECONDITION_FAILED = 42800;
 
-  // Client type could not be recognized.
-  UNRECOGNIZED_CLIENT_TYPE = 32;
+  // Generic code indicates that too many requests are made in short period of duration.
+  // Requests are throttled.
+  TOO_MANY_REQUESTS = 42900;
 
-  // Return different results for entries in composite request.
-  MULTIPLE_RESULTS = 33;
+  // Generic code for the case that the server is unwilling to process the request because its header fields are too large.
+  // The request may be resubmitted after reducing the size of the request header fields.
+  REQUEST_HEADER_FIELDS_TOO_LARGE = 43100;
+  // Message properties total size exceeds the threshold.
+  MESSAGE_PROPERTIES_TOO_LARGE = 43101;
 
+  // Generic code indicates that server/client encountered an unexpected
+  // condition that prevented it from fulfilling the request.
+  INTERNAL_ERROR = 50000;
   // Code indicates that the server encountered an unexpected condition
   // that prevented it from fulfilling the request.
   // This error response is a generic "catch-all" response.
@@ -401,17 +387,33 @@ enum Code {
   // to prevent the error from happening again in the future.
   //
   // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
-  INTERNAL_SERVER_ERROR = 500;
+  INTERNAL_SERVER_ERROR = 50001;
+  // The HA-mechanism is not working now.
+  HA_NOT_AVAILABLE = 50002;
 
-  // Code means that the server or client does not support the functionality
-  // required to fulfill the request.
-  NOT_IMPLEMENTED = 501;
+  // Generic code means that the server or client does not support the
+  // functionality required to fulfill the request.
+  NOT_IMPLEMENTED = 50100;
 
-  // Code indicates that the server, while acting as a gateway or proxy,
-  // did not get a response in time from the upstream server that
-  // it needed in order to complete the request.
+  // Generic code represents that the server, which acts as a gateway or proxy,
+  // does not get an satisfied response in time from its upstream servers.
   // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
-  GATEWAY_TIMEOUT = 504;
+  PROXY_TIMEOUT = 50400;
+  // Message persistence timeout.
+  MASTER_PERSISTENCE_TIMEOUT = 50401;
+  // Slave persistence timeout.
+  SLAVE_PERSISTENCE_TIMEOUT = 50402;
+
+  // Generic code for unsupported operation.
+  UNSUPPORTED = 50500;
+  // Operation is not allowed in current version.
+  VERSION_UNSUPPORTED = 50501;
+  // Not allowed to verify message. Chances are that you are verifying
+  // a FIFO message, as is violating FIFO semantics.
+  VERIFY_FIFO_MESSAGE_UNSUPPORTED = 50502;
+
+  // Generic code for failed message consumption.
+  FAILED_TO_CONSUME_MESSAGE = 60000;
 }
 
 message Status {
diff --git a/cpp/proto/apache/rocketmq/v2/service.proto b/cpp/proto/apache/rocketmq/v2/service.proto
index fa2a2f0..c5d4cce 100644
--- a/cpp/proto/apache/rocketmq/v2/service.proto
+++ b/cpp/proto/apache/rocketmq/v2/service.proto
@@ -16,6 +16,7 @@
 syntax = "proto3";
 
 import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
 
 import "apache/rocketmq/v2/definition.proto";
 
@@ -101,6 +102,8 @@ message ReceiveMessageResponse {
   oneof content {
     Status status = 1;
     Message message = 2;
+    // The timestamp that brokers start to deliver status line or message.
+    google.protobuf.Timestamp delivery_timestamp = 3;
   }
 }
 
@@ -211,6 +214,10 @@ message Publishing {
   // reject the request. As a result, it is advisable that Producer performs
   // client-side check validation.
   int32 max_body_size = 3;
+
+  // When `validate_message_type` flag set `false`, no need to validate message's type
+  // with messageQueue's `accept_message_types` before publising.
+  bool validate_message_type = 4;
 }
 
 message Subscription {
diff --git a/cpp/src/main/cpp/base/ErrorCategory.cpp b/cpp/src/main/cpp/base/ErrorCategory.cpp
index 4fe2ea0..5ca2528 100644
--- a/cpp/src/main/cpp/base/ErrorCategory.cpp
+++ b/cpp/src/main/cpp/base/ErrorCategory.cpp
@@ -16,6 +16,8 @@
  */
 #include "rocketmq/ErrorCategory.h"
 
+#include "rocketmq/ErrorCode.h"
+
 ROCKETMQ_NAMESPACE_BEGIN
 
 std::string ErrorCategory::message(int code) const {
@@ -53,7 +55,7 @@ std::string ErrorCategory::message(int code) const {
     case ErrorCode::TopicNotFound:
       return "Topic is not found. Verify the request topic has already been created through console or management API";
 
-    case ErrorCode::GroupNotFound:
+    case ErrorCode::ConsumerGroupNotFound:
       return "Group is not found. Verify the request group has already been created through console or management API";
 
     case ErrorCode::RequestTimeout:
@@ -65,7 +67,7 @@ std::string ErrorCategory::message(int code) const {
     case ErrorCode::PreconditionRequired:
       return "State of dependent procedure is not right";
 
-    case ErrorCode::TooManyRequest:
+    case ErrorCode::TooManyRequests:
       return "Quota exchausted. The user has sent too many requests in a given "
              "amount of time.";
 
@@ -100,7 +102,7 @@ std::string ErrorCategory::message(int code) const {
              "a timely response from the upstream "
              "server.";
 
-    case ErrorCode::ProtocolVersionNotSupported:
+    case ErrorCode::ProtocolUnsupported:
       return "The server does not support the protocol version used in the "
              "request.";
 
@@ -108,9 +110,26 @@ std::string ErrorCategory::message(int code) const {
       return "The server is unable to store the representation needed to "
              "complete the request.";
 
-    default:
-      return "Not-Implemented";
+    case ErrorCode::MultipleResults:
+      return "Multiple results are available";
+
+    case ErrorCode::IllegalAccessPoint:
+      return "Access point is either malformed or invalid";
+
+    case ErrorCode::IllegalTopic: {
+      return "Topic is illegal either due to length is too long or invalid character is included";
+    }
+
+    case ErrorCode::IllegalConsumerGroup: {
+      return "ConsumerGroup is illegal due to its length is too long or invalid character is included";
+    }
+
+    case ErrorCode::IllegalMessageTag: {
+      return "Format of message tag is illegal.";
+    }
   }
+
+  return "";
 }
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/base/Tracing.cpp b/cpp/src/main/cpp/base/Tracing.cpp
deleted file mode 100644
index 9eb7335..0000000
--- a/cpp/src/main/cpp/base/Tracing.cpp
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/Tracing.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-opencensus::trace::Sampler* traceSampler() {
-  static opencensus::trace::NeverSampler sampler;
-  return &sampler;
-}
-
-ROCKETMQ_NAMESPACE_END
diff --git a/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp b/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp
index 168b87a..1e3c7bf 100644
--- a/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp
+++ b/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp
@@ -20,12 +20,14 @@
 ROCKETMQ_NAMESPACE_BEGIN
 
 TEST(RetryPolicyTest, testBackoff) {
-  RetryPolicy policy{.max_attempt = 3,
-                     .strategy = BackoffStrategy::Customized,
-                     .initial = absl::Milliseconds(0),
-                     .max = absl::Milliseconds(0),
-                     .multiplier = 0.0f,
-                     .next = {absl::Milliseconds(10), absl::Milliseconds(100), absl::Milliseconds(500)}};
+  RetryPolicy policy;
+  policy.max_attempt = 3;
+  policy.strategy = BackoffStrategy::Customized;
+  policy.initial = absl::Milliseconds(0);
+  policy.max = absl::Milliseconds(0);
+  policy.multiplier = 0.0f;
+  policy.next = {absl::Milliseconds(10), absl::Milliseconds(100), absl::Milliseconds(500)};
+
   ASSERT_EQ(policy.backoff(1), 10);
   ASSERT_EQ(policy.backoff(2), 100);
   ASSERT_EQ(policy.backoff(3), 500);
diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
index 0c6e155..1f9eec6 100644
--- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
@@ -16,6 +16,8 @@
  */
 #include "ClientManagerImpl.h"
 
+#include <apache/rocketmq/v2/definition.pb.h>
+
 #include <atomic>
 #include <cassert>
 #include <chrono>
@@ -229,26 +231,57 @@ void ClientManagerImpl::heartbeat(const std::string& target_host, const Metadata
     switch (status.code()) {
       case rmq::Code::OK: {
         cb(ec, invocation_context->response);
-      } break;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+        SPDLOG_ERROR("IllegalConsumerGroup: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalConsumerGroup;
+        break;
+      }
+
+      case rmq::Code::TOO_MANY_REQUESTS: {
+        SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::TooManyRequests;
+        cb(ec, invocation_context->response);
+        break;
+      }
+
       case rmq::Code::UNAUTHORIZED: {
-        SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address);
+        SPDLOG_WARN("Unauthorized: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Unauthorized;
         cb(ec, invocation_context->response);
-      } break;
-      case rmq::Code::FORBIDDEN: {
-        SPDLOG_WARN("Forbidden: {}, host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::Forbidden;
+        break;
+      }
+
+      case rmq::Code::UNRECOGNIZED_CLIENT_TYPE: {
+        SPDLOG_ERROR("UnsupportedClientType: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::UnsupportedClientType;
+        cb(ec, invocation_context->response);
+        break;
+      }
+
+      case rmq::Code::CLIENT_ID_REQUIRED: {
+        SPDLOG_ERROR("ClientIdRequired: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::InternalClientError;
         cb(ec, invocation_context->response);
-      } break;
+        break;
+      }
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::InternalServerError;
         cb(ec, invocation_context->response);
-      } break;
+        break;
+      }
+
       default: {
-        SPDLOG_WARN("NotImplemented: Please upgrade SDK to latest release. Message={}, host={}", status.message(),
+        SPDLOG_WARN("NotSupported: Please upgrade SDK to latest release. Message={}, host={}", status.message(),
                     invocation_context->remote_address);
-      } break;
+        ec = ErrorCode::NotSupported;
+        cb(ec, invocation_context->response);
+        break;
+      }
     }
   };
 
@@ -319,33 +352,122 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met
         send_receipt.message_id = invocation_context->response.entries().begin()->message_id();
         break;
       }
+
+      case rmq::Code::ILLEGAL_TOPIC: {
+        SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalTopic;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_MESSAGE_TAG: {
+        SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalMessageTag;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_MESSAGE_KEY: {
+        SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalMessageKey;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_MESSAGE_GROUP: {
+        SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalMessageGroup;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: {
+        SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalMessageProperty;
+        break;
+      }
+
+      case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: {
+        SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::MessagePropertiesTooLarge;
+        break;
+      }
+
+      case rmq::Code::MESSAGE_BODY_TOO_LARGE: {
+        SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::MessageBodyTooLarge;
+        break;
+      }
+
       case rmq::Code::TOPIC_NOT_FOUND: {
-        SPDLOG_WARN("TopicNotFound: {}", status.message());
+        SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::TopicNotFound;
+        break;
+      }
+
+      case rmq::Code::NOT_FOUND: {
+        SPDLOG_WARN("NotFound: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::NotFound;
         break;
       }
+
       case rmq::Code::UNAUTHORIZED: {
-        SPDLOG_WARN("Unauthenticated: {}", status.message());
+        SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Unauthorized;
         break;
       }
+        
       case rmq::Code::FORBIDDEN: {
-        SPDLOG_WARN("Forbidden: {}", status.message());
+        SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Forbidden;
-        cb(ec, send_receipt);
         break;
       }
+
+      case rmq::Code::MESSAGE_CORRUPTED: {
+        SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::MessageCorrupted;
+        break;
+      }
+
+      case rmq::Code::TOO_MANY_REQUESTS: {
+        SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::TooManyRequests;
+        break;
+      }
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
-        SPDLOG_WARN("InternalServerError: {}", status.message());
+        SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::InternalServerError;
+        break;
+      }
+
+      case rmq::Code::HA_NOT_AVAILABLE: {
+        SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::InternalServerError;
         break;
       }
+
+      case rmq::Code::PROXY_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::GatewayTimeout;
+        break;
+      }
+
+      case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::GatewayTimeout;
+        break;
+      }
+
+      case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::GatewayTimeout;
+        break;
+      }
+
       default: {
-        SPDLOG_WARN("Unsupported status code. Check and upgrade SDK to the latest");
-        ec = ErrorCode::NotImplemented;
+        SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest. Host={}", invocation_context->remote_address);
+        ec = ErrorCode::NotSupported;
         break;
       }
     }
+
     cb(ec, send_receipt);
   };
 
@@ -478,32 +600,64 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metad
         }
         auto ptr = std::make_shared<TopicRouteData>(std::move(message_queues));
         cb(ec, ptr);
-      } break;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_ACCESS_POINT: {
+        SPDLOG_WARN("IllegalAccessPoint: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalAccessPoint;
+        cb(ec, nullptr);
+        break;
+      }
+
       case rmq::Code::UNAUTHORIZED: {
         SPDLOG_WARN("Unauthorized: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Unauthorized;
         cb(ec, nullptr);
-      } break;
-      case rmq::Code::FORBIDDEN: {
-        SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::Forbidden;
-        cb(ec, nullptr);
-      } break;
+        break;
+      }
+
       case rmq::Code::TOPIC_NOT_FOUND: {
         SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::NotFound;
         cb(ec, nullptr);
-      } break;
+        break;
+      }
+
+      case rmq::Code::TOO_MANY_REQUESTS: {
+        SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::TooManyRequests;
+        cb(ec, nullptr);
+        break;
+      }
+
+      case rmq::Code::CLIENT_ID_REQUIRED: {
+        SPDLOG_ERROR("ClientIdRequired: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::InternalClientError;
+        cb(ec, nullptr);
+        break;
+      }
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::InternalServerError;
         cb(ec, nullptr);
-      } break;
+        break;
+      }
+
+      case rmq::Code::PROXY_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::GatewayTimeout;
+        cb(ec, nullptr);
+        break;
+      }
+
       default: {
         SPDLOG_WARN("NotImplement: Please upgrade to latest SDK release. Host={}", invocation_context->remote_address);
         ec = ErrorCode::NotImplemented;
         cb(ec, nullptr);
-      } break;
+        break;
+      }
     }
   };
   invocation_context->callback = callback;
@@ -529,25 +683,75 @@ void ClientManagerImpl::queryAssignment(
     std::error_code ec;
     switch (status.code()) {
       case rmq::Code::OK: {
-        SPDLOG_DEBUG("Query assignment OK");
-      } break;
+        SPDLOG_DEBUG("Query assignment OK. Host={}", invocation_context->remote_address);
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_ACCESS_POINT: {
+        SPDLOG_WARN("IllegalAccessPoint: {}, host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalAccessPoint;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_TOPIC: {
+        SPDLOG_WARN("IllegalAccessPoint: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalTopic;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+        SPDLOG_WARN("IllegalConsumerGroup: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalConsumerGroup;
+        break;
+      }
+
+      case rmq::Code::CLIENT_ID_REQUIRED: {
+        SPDLOG_WARN("ClientIdRequired: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::InternalClientError;
+        break;
+      }
+
       case rmq::Code::UNAUTHORIZED: {
         SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Unauthorized;
-      } break;
+        break;
+      }
+
       case rmq::Code::FORBIDDEN: {
         SPDLOG_WARN("Forbidden: {}, host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Forbidden;
-      } break;
+        break;
+      }
+
+      case rmq::Code::TOPIC_NOT_FOUND: {
+        SPDLOG_WARN("TopicNotFound: {}, host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::TopicNotFound;
+        break;
+      }
+
+      case rmq::Code::CONSUMER_GROUP_NOT_FOUND: {
+        SPDLOG_WARN("ConsumerGroupNotFound: {}, host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::ConsumerGroupNotFound;
+        break;
+      }
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::InternalServerError;
-      } break;
+        break;
+      }
+
+      case rmq::Code::PROXY_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::GatewayTimeout;
+        break;
+      }
+
       default: {
-        SPDLOG_WARN("NotImplemented: please upgrade SDK to latest release. Host={}",
-                    invocation_context->remote_address);
-        ec = ErrorCode::NotImplemented;
-      } break;
+        SPDLOG_WARN("NotSupported: please upgrade SDK to latest release. Host={}", invocation_context->remote_address);
+        ec = ErrorCode::NotSupported;
+        break;
+      }
     }
     cb(ec, invocation_context->response);
   };
@@ -806,23 +1010,80 @@ void ClientManagerImpl::ack(const std::string& target, const Metadata& metadata,
     switch (status.code()) {
       case rmq::Code::OK: {
         SPDLOG_DEBUG("Ack OK. host={}", invocation_context->remote_address);
-      } break;
+        break;
+      }
+
+      case rmq::Code::MULTIPLE_RESULTS: {
+        SPDLOG_DEBUG("Server returns multiple results. host={}", invocation_context->remote_address);
+        // Treat it as successful, allowing top tier processing according to response entries.
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_TOPIC: {
+        SPDLOG_WARN("IllegalTopic: {}, host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalTopic;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+        SPDLOG_WARN("IllegalConsumerGroup: {}, host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalConsumerGroup;
+        break;
+      }
+
+      case rmq::Code::INVALID_RECEIPT_HANDLE: {
+        SPDLOG_WARN("InvalidReceiptHandle: {}, host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::InvalidReceiptHandle;
+        break;
+      }
+
+      case rmq::Code::CLIENT_ID_REQUIRED: {
+        SPDLOG_WARN("ClientIdRequired: {}, host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::InternalClientError;
+        break;
+      }
+
+      case rmq::Code::TOPIC_NOT_FOUND: {
+        SPDLOG_WARN("TopicNotFound: {}, host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::TopicNotFound;
+        break;
+      }
+
       case rmq::Code::UNAUTHORIZED: {
         SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Unauthorized;
-      } break;
+        break;
+      }
+
       case rmq::Code::FORBIDDEN: {
         SPDLOG_WARN("PermissionDenied: {}, host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Forbidden;
-      } break;
+        break;
+      }
+
+      case rmq::Code::TOO_MANY_REQUESTS: {
+        SPDLOG_WARN("TooManyRequests: {}, host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::TooManyRequests;
+        break;
+      }
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::InternalServerError;
-      } break;
+        break;
+      }
+
+      case rmq::Code::PROXY_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}, host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::GatewayTimeout;
+        break;
+      }
+
       default: {
-        SPDLOG_WARN("NotImplement: please upgrade SDK to latest release. host={}", invocation_context->remote_address);
-        ec = ErrorCode::NotImplemented;
-      } break;
+        SPDLOG_WARN("NotSupported: please upgrade SDK to latest release. host={}", invocation_context->remote_address);
+        ec = ErrorCode::NotSupported;
+        break;
+      }
     }
     cb(ec);
   };
@@ -856,29 +1117,64 @@ void ClientManagerImpl::changeInvisibleDuration(
 
     std::error_code ec;
     auto&& status = invocation_context->response.status();
+    auto&& peer_address = invocation_context->remote_address;
     switch (status.code()) {
       case rmq::Code::OK: {
-        SPDLOG_DEBUG("Nack to {} OK", invocation_context->remote_address);
+        SPDLOG_DEBUG("ChangeInvisibleDuration to {} OK", peer_address);
         break;
       };
+
+      case rmq::Code::ILLEGAL_TOPIC: {
+        SPDLOG_WARN("IllegalTopic: {}. Host={}", status.message(), peer_address);
+        ec = ErrorCode::IllegalTopic;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+        SPDLOG_WARN("IllegalConsumerGroup: {}. Host={}", status.message(), peer_address);
+        ec = ErrorCode::IllegalConsumerGroup;
+        break;
+      }
+
       case rmq::Code::UNAUTHORIZED: {
         SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Unauthorized;
         break;
       }
+
+      case rmq::Code::INVALID_RECEIPT_HANDLE: {
+        SPDLOG_WARN("InvalidReceiptHandle: {}, host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::InvalidReceiptHandle;
+        break;
+      }
+
+      case rmq::Code::CLIENT_ID_REQUIRED: {
+        SPDLOG_WARN("ClientIdRequired: {}, host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::InternalClientError;
+        break;
+      }
+
       case rmq::Code::FORBIDDEN: {
         SPDLOG_WARN("Forbidden: {}, host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Forbidden;
         break;
       }
+        
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::InternalServerError;
         break;
       }
+
+      case rmq::Code::TOO_MANY_REQUESTS: {
+        SPDLOG_WARN("TooManyRequests: {}, host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::TooManyRequests;
+        break;
+      }
+
       default: {
-        SPDLOG_WARN("NotImplemented: Please upgrade to latest SDK, host={}", invocation_context->remote_address);
-        ec = ErrorCode::NotImplemented;
+        SPDLOG_WARN("NotSupported: Please upgrade to latest SDK, host={}", invocation_context->remote_address);
+        ec = ErrorCode::NotSupported;
         break;
       }
     }
@@ -927,27 +1223,72 @@ void ClientManagerImpl::endTransaction(
     }
 
     auto&& status = invocation_context->response.status();
+    auto&& peer_address = invocation_context->remote_address;
     switch (status.code()) {
       case rmq::Code::OK: {
         SPDLOG_DEBUG("endTransaction completed OK. Response: {}, host={}", invocation_context->response.DebugString(),
-                     invocation_context->remote_address);
-      } break;
+                     peer_address);
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_TOPIC: {
+        SPDLOG_WARN("IllegalTopic: {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::IllegalTopic;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+        SPDLOG_WARN("IllegalConsumerGroup: {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::IllegalConsumerGroup;
+        break;
+      }
+
+      case rmq::Code::INVALID_TRANSACTION_ID: {
+        SPDLOG_WARN("InvalidTransactionId: {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::InvalidTransactionId;
+        break;
+      }
+
+      case rmq::Code::CLIENT_ID_REQUIRED: {
+        SPDLOG_WARN("ClientIdRequired: {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::InternalClientError;
+        break;
+      }
+
+      case rmq::Code::TOPIC_NOT_FOUND: {
+        SPDLOG_WARN("TopicNotFound: {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::TopicNotFound;
+        break;
+      }
+
       case rmq::Code::UNAUTHORIZED: {
-        SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address);
+        SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), peer_address);
         ec = ErrorCode::Unauthorized;
-      } break;
+        break;
+      }
+
       case rmq::Code::FORBIDDEN: {
-        SPDLOG_WARN("Forbidden: {}, host={}", status.message(), invocation_context->remote_address);
+        SPDLOG_WARN("Forbidden: {}, host={}", status.message(), peer_address);
         ec = ErrorCode::Forbidden;
-      } break;
+        break;
+      }
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
-        SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
+        SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), peer_address);
         ec = ErrorCode::InternalServerError;
-      } break;
+        break;
+      }
+
+      case rmq::Code::PROXY_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::GatewayTimeout;
+        break;
+      }
+
       default: {
-        SPDLOG_WARN("NotImplemented: please upgrade SDK to latest release. {}, host={}", status.message(),
-                    invocation_context->remote_address);
-        ec = ErrorCode::NotImplemented;
+        SPDLOG_WARN("NotSupported: please upgrade SDK to latest release. {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::NotSupported;
+        break;
       }
     }
     cb(ec, invocation_context->response);
@@ -985,22 +1326,64 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe
 
     SPDLOG_DEBUG("Received forwardToDeadLetterQueue response from server[host={}]", invocation_context->remote_address);
     std::error_code ec;
-    switch (invocation_context->response.status().code()) {
+    auto&& status = invocation_context->response.status();
+    auto&& peer_address = invocation_context->remote_address;
+    switch (status.code()) {
       case rmq::Code::OK: {
         break;
       }
+      case rmq::Code::ILLEGAL_TOPIC: {
+        SPDLOG_WARN("IllegalTopic: {}. Host={}", status.message(), peer_address);
+        ec = ErrorCode::IllegalTopic;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+        SPDLOG_WARN("IllegalConsumerGroup: {}. Host={}", status.message(), peer_address);
+        ec = ErrorCode::IllegalConsumerGroup;
+        break;
+      }
+
+      case rmq::Code::INVALID_RECEIPT_HANDLE: {
+        SPDLOG_WARN("IllegalReceiptHandle: {}. Host={}", status.message(), peer_address);
+        ec = ErrorCode::InvalidReceiptHandle;
+        break;
+      }
+
+      case rmq::Code::CLIENT_ID_REQUIRED: {
+        SPDLOG_WARN("IllegalTopic: {}. Host={}", status.message(), peer_address);
+        ec = ErrorCode::InternalClientError;
+        break;
+      }
+
+      case rmq::Code::TOPIC_NOT_FOUND: {
+        ec = ErrorCode::TopicNotFound;
+        break;
+      }
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         ec = ErrorCode::ServiceUnavailable;
+        break;
       }
+        
       case rmq::Code::TOO_MANY_REQUESTS: {
-        ec = ErrorCode::TooManyRequest;
+        ec = ErrorCode::TooManyRequests;
+        break;
       }
+
+      case rmq::Code::PROXY_TIMEOUT: {
+        ec = ErrorCode::GatewayTimeout;
+        break;
+      }
+
       default: {
         ec = ErrorCode::NotImplemented;
+        break;
       }
     }
     cb(ec);
   };
+
   invocation_context->callback = callback;
   client->asyncForwardMessageToDeadLetterQueue(request, invocation_context);
 }
@@ -1042,26 +1425,36 @@ std::error_code ClientManagerImpl::notifyClientTermination(const std::string& ta
       SPDLOG_DEBUG("NotifyClientTermination OK. host={}", target_host);
       break;
     }
+
+    case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+      SPDLOG_ERROR("IllegalConsumerGroup: {}. Host={}", status.message(), target_host);
+      ec = ErrorCode::IllegalConsumerGroup;
+      break;
+    }
+
     case rmq::Code::INTERNAL_SERVER_ERROR: {
       SPDLOG_WARN("InternalServerError: Cause={}, host={}", status.message(), target_host);
       ec = ErrorCode::InternalServerError;
       break;
     }
+
     case rmq::Code::UNAUTHORIZED: {
       SPDLOG_WARN("Unauthorized due to lack of valid authentication credentials: Cause={}, host={}", status.message(),
                   target_host);
       ec = ErrorCode::Unauthorized;
       break;
     }
+
     case rmq::Code::FORBIDDEN: {
       SPDLOG_WARN("Forbidden due to insufficient permission to the resource: Cause={}, host={}", status.message(),
                   target_host);
       ec = ErrorCode::Forbidden;
       break;
     }
+
     default: {
-      SPDLOG_WARN("NotImplemented. Please upgrade to latest SDK release. host={}", target_host);
-      ec = ErrorCode::NotImplemented;
+      SPDLOG_WARN("NotSupported. Please upgrade to latest SDK release. host={}", target_host);
+      ec = ErrorCode::NotSupported;
       break;
     }
   }
diff --git a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
index 223c879..af99874 100644
--- a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
@@ -17,7 +17,10 @@
 
 #include "ReceiveMessageStreamReader.h"
 
+#include <apache/rocketmq/v2/definition.pb.h>
+
 #include "LoggerImpl.h"
+#include "rocketmq/ErrorCode.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -63,26 +66,71 @@ void ReceiveMessageStreamReader::OnReadDone(bool ok) {
         case rmq::Code::OK: {
           break;
         }
+        case rmq::Code::ILLEGAL_TOPIC: {
+          ec_ = ErrorCode::IllegalTopic;
+          break;
+        }
+
+        case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+          ec_ = ErrorCode::IllegalConsumerGroup;
+          break;
+        }
+
+        case rmq::Code::ILLEGAL_FILTER_EXPRESSION: {
+          ec_ = ErrorCode::IllegalFilterExpression;
+          break;
+        }
+
+        case rmq::Code::CLIENT_ID_REQUIRED: {
+          ec_ = ErrorCode::InternalClientError;
+          break;
+        }
+
         case rmq::Code::TOPIC_NOT_FOUND: {
           ec_ = ErrorCode::TopicNotFound;
           break;
         }
 
         case rmq::Code::CONSUMER_GROUP_NOT_FOUND: {
-          ec_ = ErrorCode::GroupNotFound;
+          ec_ = ErrorCode::ConsumerGroupNotFound;
           break;
         }
+
         case rmq::Code::TOO_MANY_REQUESTS: {
-          ec_ = ErrorCode::TooManyRequest;
+          ec_ = ErrorCode::TooManyRequests;
           break;
         }
+
         case rmq::Code::MESSAGE_NOT_FOUND: {
           ec_ = ErrorCode::NoContent;
           break;
         }
-        default:
+
+        case rmq::Code::UNAUTHORIZED: {
+          ec_ = ErrorCode::Unauthorized;
+          break;
+        }
+
+        case rmq::Code::FORBIDDEN: {
+          ec_ = ErrorCode::Forbidden;
+          break;
+        }
+
+        case rmq::Code::INTERNAL_SERVER_ERROR: {
+          ec_ = ErrorCode::InternalServerError;
+          break;
+        }
+
+        case rmq::Code::PROXY_TIMEOUT: {
+          ec_ = ErrorCode::GatewayTimeout;
+          break;
+        }
+
+        default: {
+          ec_ = ErrorCode::NotSupported;
           SPDLOG_WARN("Unsupported code={}", response_.status().code());
           break;
+        }
       }
       break;
     }
diff --git a/cpp/src/main/cpp/client/RpcClientImpl.cpp b/cpp/src/main/cpp/client/RpcClientImpl.cpp
index 623547e..35016c3 100644
--- a/cpp/src/main/cpp/client/RpcClientImpl.cpp
+++ b/cpp/src/main/cpp/client/RpcClientImpl.cpp
@@ -21,14 +21,12 @@
 #include <sstream>
 #include <thread>
 
-#include "absl/time/time.h"
-
 #include "ClientManager.h"
 #include "ReceiveMessageStreamReader.h"
 #include "RpcClient.h"
 #include "TelemetryBidiReactor.h"
 #include "TlsHelper.h"
-#include "include/ReceiveMessageContext.h"
+#include "absl/time/time.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
diff --git a/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp b/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp
index cf251e4..7479ba2 100644
--- a/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp
+++ b/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp
@@ -17,6 +17,7 @@
 #include "TelemetryBidiReactor.h"
 
 #include <atomic>
+#include <cstdint>
 #include <memory>
 #include <utility>
 
@@ -47,7 +48,8 @@ TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
 }
 
 TelemetryBidiReactor::~TelemetryBidiReactor() {
-  SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", peer_address_, stream_state_);
+  SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", peer_address_,
+              static_cast<std::uint8_t>(stream_state_));
 }
 
 bool TelemetryBidiReactor::await() {
diff --git a/cpp/src/main/cpp/client/include/ClientConfig.h b/cpp/src/main/cpp/client/include/ClientConfig.h
index 66ca230..bade9a4 100644
--- a/cpp/src/main/cpp/client/include/ClientConfig.h
+++ b/cpp/src/main/cpp/client/include/ClientConfig.h
@@ -21,12 +21,12 @@
 #include <string>
 #include <vector>
 
-#include "absl/container/flat_hash_map.h"
-#include "absl/time/time.h"
-
 #include "Protocol.h"
 #include "RetryPolicy.h"
+#include "absl/container/flat_hash_map.h"
+#include "absl/time/time.h"
 #include "rocketmq/CredentialsProvider.h"
+#include "rocketmq/Tracing.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -61,6 +61,7 @@ struct ClientConfig {
   PublisherConfig publisher;
   SubscriberConfig subscriber;
   Metric metric;
+  std::unique_ptr<opencensus::trace::Sampler> sampler_;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/client/include/RpcClient.h b/cpp/src/main/cpp/client/include/RpcClient.h
index 2fc8448..fbb3017 100644
--- a/cpp/src/main/cpp/client/include/RpcClient.h
+++ b/cpp/src/main/cpp/client/include/RpcClient.h
@@ -21,15 +21,12 @@
 #include <memory>
 #include <string>
 
-#include "ReceiveMessageResult.h"
+#include "Protocol.h"
+#include "ReceiveMessageContext.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/strings/string_view.h"
 #include "grpcpp/grpcpp.h"
 
-#include "InvocationContext.h"
-#include "Protocol.h"
-#include "ReceiveMessageContext.h"
-
 ROCKETMQ_NAMESPACE_BEGIN
 
 using Channel = grpc::Channel;
diff --git a/cpp/src/main/cpp/client/include/RpcClientImpl.h b/cpp/src/main/cpp/client/include/RpcClientImpl.h
index 6406c74..35316ec 100644
--- a/cpp/src/main/cpp/client/include/RpcClientImpl.h
+++ b/cpp/src/main/cpp/client/include/RpcClientImpl.h
@@ -18,14 +18,11 @@
 
 #include <memory>
 
-#include "InvocationContext.h"
-#include "ReceiveMessageCallback.h"
-#include "ReceiveMessageContext.h"
-#include "absl/container/flat_hash_map.h"
-
 #include "Client.h"
 #include "ClientManager.h"
+#include "ReceiveMessageContext.h"
 #include "RpcClient.h"
+#include "absl/container/flat_hash_map.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
diff --git a/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp b/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp
index 0eeb08e..70ab77b 100644
--- a/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp
@@ -22,13 +22,13 @@
 #include <system_error>
 #include <utility>
 
+#include "AsyncReceiveMessageCallback.h"
 #include "ClientManagerImpl.h"
 #include "MetadataConstants.h"
 #include "Protocol.h"
 #include "PushConsumerImpl.h"
 #include "ReceiveMessageResult.h"
 #include "Signature.h"
-#include "include/AsyncReceiveMessageCallback.h"
 #include "rocketmq/MessageListener.h"
 
 using namespace std::chrono;
@@ -113,7 +113,7 @@ void ProcessQueueImpl::popMessage() {
 
   std::weak_ptr<AsyncReceiveMessageCallback> cb{receive_callback_};
   auto callback = [cb](const std::error_code& ec, const ReceiveMessageResult& result) {
-    auto recv_cb = cb.lock();
+    std::shared_ptr<AsyncReceiveMessageCallback> recv_cb = cb.lock();
     if (recv_cb) {
       recv_cb->onCompletion(ec, result);
     }
diff --git a/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp b/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
index 4b6c3f0..65cc5ba 100644
--- a/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -290,16 +290,17 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) {
 
   {
     // Trace Send RPC
-    if (context->message_->traceContext().has_value()) {
+    if (context->message_->traceContext().has_value() && client_config_.sampler_) {
       auto span_context =
           opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext().value());
       auto span = opencensus::trace::Span::BlankSpan();
       std::string span_name = resourceNamespace() + "/" + context->message_->topic() + " " +
                               MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION;
       if (span_context.IsValid()) {
-        span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {traceSampler()});
+        span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context,
+                                                                  {client_config_.sampler_.get()});
       } else {
-        span = opencensus::trace::Span::StartSpan(span_name, nullptr, {traceSampler()});
+        span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()});
       }
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
                         MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION);
@@ -380,7 +381,7 @@ bool ProducerImpl::endTransaction0(const Transaction& transaction, TransactionSt
   bool completed = false;
   bool success = false;
   auto span = opencensus::trace::Span::BlankSpan();
-  if (!transaction.traceContext().empty()) {
+  if (!transaction.traceContext().empty() && client_config_.sampler_) {
     // Trace transactional message
     opencensus::trace::SpanContext span_context =
         opencensus::trace::propagation::FromTraceParentHeader(transaction.traceContext());
@@ -389,9 +390,9 @@ bool ProducerImpl::endTransaction0(const Transaction& transaction, TransactionSt
                                            : MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION;
     std::string span_name = resourceNamespace() + "/" + transaction.topic() + " " + trace_operation_name;
     if (span_context.IsValid()) {
-      span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {traceSampler()});
+      span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {client_config_.sampler_.get()});
     } else {
-      span = opencensus::trace::Span::StartSpan(span_name, nullptr, {traceSampler()});
+      span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()});
     }
     span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, trace_operation_name);
     span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION, trace_operation_name);
diff --git a/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp b/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
index 89dc112..2acff23 100644
--- a/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
@@ -550,7 +550,7 @@ void PushConsumerImpl::onVerifyMessage(MessageConstSharedPtr message, std::funct
         cmd.mutable_status()->set_message("Unexpected exception raised");
       }
     } else {
-      cmd.mutable_status()->set_code(rmq::Code::VERIFY_MESSAGE_FORBIDDEN);
+      cmd.mutable_status()->set_code(rmq::Code::VERIFY_FIFO_MESSAGE_UNSUPPORTED);
       cmd.mutable_status()->set_message("Unsupported Operation For FIFO Message");
     }
   } else {
diff --git a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
index ff61af3..c136cc9 100644
--- a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -110,6 +110,12 @@ public:
   virtual void buildClientSettings(rmq::Settings& settings) {
   }
 
+  void registerTracingSampler(TracingSamplerProvider *provider) {
+    if (provider) {
+      client_config_.sampler_ = provider->tracingSampler();
+    }
+  }
+
 protected:
   ClientConfig client_config_;
 
diff --git a/cpp/src/main/cpp/stats/PublishStats.cpp b/cpp/src/main/cpp/stats/PublishStats.cpp
index 37ce613..a012515 100644
--- a/cpp/src/main/cpp/stats/PublishStats.cpp
+++ b/cpp/src/main/cpp/stats/PublishStats.cpp
@@ -22,7 +22,7 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 PublishStats::PublishStats()
     : success_(opencensus::stats::MeasureInt64::Register("publish_success", "Number of message published", "1")),
-      failure_(opencensus::stats::MeasureInt64::Register("pubish_failure", "Number of publish failures", "1")),
+      failure_(opencensus::stats::MeasureInt64::Register("publish_failure", "Number of publish failures", "1")),
       latency_(opencensus::stats::MeasureInt64::Register("publish_latency", "Publish latency in milliseconds", "ms")) {
   opencensus::stats::ViewDescriptor()
       .set_name("rocketmq_send_success_total")