You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/04 05:43:10 UTC
[rocketmq-clients] branch master updated: Make the CPP codebase compiles on popular platforms Mac/Windows/Linux (#24)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new bd3f083 Make the CPP codebase compiles on popular platforms Mac/Windows/Linux (#24)
bd3f083 is described below
commit bd3f08307ef69d77db59cbccdf43f746a69f6b8a
Author: Zhanhui Li <li...@gmail.com>
AuthorDate: Mon Jul 4 13:43:07 2022 +0800
Make the CPP codebase compiles on popular platforms Mac/Windows/Linux (#24)
* Sync and update protobuf files to v2.0-alpha
* Complete error handling of QueryRoute
* Complete error handling for Heartbeat
* Finish error handling of SendMessage
* Complete error handling for QueryAssignment
* Add error handling for ReceiveMessage
* Add error handling for Ack
* Add error handling for EndTransaction
* Add error handling for NotifyClientTermination
* Add error handling
* Translate ClientIdRequired to InternalClientError
* Add dependent CDN through OSS
* Add io_bazel_rules_go mirror
* Add mirror for rules_proto
* Fix url of bazel_proto mirror
* Upgrade grpc to v1.46.3.1
* Upgrade grpc to v1.46.3.2
* Use official gRPC release v1.46.3
* Try to build on Linux, MacOS, Windows
* Use matrix
* Fix clang check
* Upgrade bazel version
* Add sha256 for grpc
* Fix to make it compile on Windows
* Fix typo
---
.github/workflows/cpp_build.yml | 10 +-
cpp/.bazelrc | 4 +-
cpp/.bazelversion | 2 +-
cpp/api/rocketmq/ErrorCode.h | 107 ++++-
cpp/api/rocketmq/Tracing.h | 10 +-
cpp/bazel/rocketmq_deps.bzl | 55 ++-
cpp/proto/apache/rocketmq/v2/definition.proto | 178 +++----
cpp/proto/apache/rocketmq/v2/service.proto | 7 +
cpp/src/main/cpp/base/ErrorCategory.cpp | 29 +-
cpp/src/main/cpp/base/Tracing.cpp | 26 --
cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp | 14 +-
cpp/src/main/cpp/client/ClientManagerImpl.cpp | 515 ++++++++++++++++++---
.../main/cpp/client/ReceiveMessageStreamReader.cpp | 54 ++-
cpp/src/main/cpp/client/RpcClientImpl.cpp | 4 +-
cpp/src/main/cpp/client/TelemetryBidiReactor.cpp | 4 +-
cpp/src/main/cpp/client/include/ClientConfig.h | 7 +-
cpp/src/main/cpp/client/include/RpcClient.h | 7 +-
cpp/src/main/cpp/client/include/RpcClientImpl.h | 7 +-
cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp | 4 +-
cpp/src/main/cpp/rocketmq/ProducerImpl.cpp | 13 +-
cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp | 2 +-
cpp/src/main/cpp/rocketmq/include/ClientImpl.h | 6 +
cpp/src/main/cpp/stats/PublishStats.cpp | 2 +-
23 files changed, 804 insertions(+), 263 deletions(-)
diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml
index 4d02855..a09c782 100644
--- a/.github/workflows/cpp_build.yml
+++ b/.github/workflows/cpp_build.yml
@@ -1,11 +1,13 @@
name: CPP Build
on: [push, pull_request]
jobs:
- cpp_ubuntu_18_04:
- name: Ubuntu 18.04
- runs-on: ubuntu-18.04
+ cpp:
+ strategy:
+ matrix:
+ os: [ubuntu-18.04, ubuntu-20.04, ubuntu-22.04, macos-12, macos-11, windows-2019]
+ runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- name: Compile All Targets
working-directory: ./cpp
- run: bazel build //...
+ run: bazel build //...
\ No newline at end of file
diff --git a/cpp/.bazelrc b/cpp/.bazelrc
index d5ec2a9..6b7b1da 100644
--- a/cpp/.bazelrc
+++ b/cpp/.bazelrc
@@ -13,8 +13,6 @@ run --color=yes
build --color=yes
build --host_force_python=PY3
-build --host_javabase=@bazel_tools//tools/jdk:remote_jdk11
-build --javabase=@bazel_tools//tools/jdk:remote_jdk11
# https://docs.bazel.build/versions/main/command-line-reference.html#flag--enable_platform_specific_config
# If true, Bazel picks up host-OS-specific config lines from bazelrc files. For example, if the host OS is Linux and
@@ -48,7 +46,7 @@ build --action_env=LD_LIBRARY_PATH
build --action_env=LLVM_CONFIG
build --action_env=PATH
-build --copt=-maes
+build:linux --copt=-maes
# Common flags for sanitizers
build:sanitizer --define tcmalloc=disabled
diff --git a/cpp/.bazelversion b/cpp/.bazelversion
index af8c8ec..91ff572 100644
--- a/cpp/.bazelversion
+++ b/cpp/.bazelversion
@@ -1 +1 @@
-4.2.2
+5.2.0
diff --git a/cpp/api/rocketmq/ErrorCode.h b/cpp/api/rocketmq/ErrorCode.h
index 5068c83..4f05ded 100644
--- a/cpp/api/rocketmq/ErrorCode.h
+++ b/cpp/api/rocketmq/ErrorCode.h
@@ -31,67 +31,117 @@ enum class ErrorCode : int {
* @brief Client state not as expected. Call Producer#start() first.
*
*/
- IllegalState = 1,
+ IllegalState = 10000,
+
+ /**
+ * @brief To publish FIFO messages, only synchronous API is supported.
+ */
+ BadRequestAsyncPubFifoMessage = 10100,
+
+ /**
+ * @brief 102XX is used for client side error.
+ *
+ */
+ InternalClientError = 10200,
/**
* @brief Broker has processed the request but is not going to return any content.
*/
- NoContent = 204,
+ NoContent = 20400,
/**
* @brief Bad configuration. For example, negative max-attempt-times.
*
*/
- BadConfiguration = 300,
+ BadConfiguration = 30000,
/**
- * @brief The server cannot process the request due to apprent client-side
+ * @brief Generic code, representing multiple return results.
+ *
+ */
+ MultipleResults = 30100,
+
+ /**
+ * @brief The server cannot process the request due to apparent client-side
* error. For example, topic contains invalid character or is excessively
* long.
*
*/
- BadRequest = 400,
+ BadRequest = 40000,
/**
- * @brief To publish FIFO messages, only synchronous API is supported.
+ * @brief The access point is illegal
+ *
*/
- BadRequestAsyncPubFifoMessage = 40001,
+ IllegalAccessPoint = 40001,
+
+ IllegalTopic = 40002,
+ IllegalConsumerGroup = 40003,
+ IllegalMessageTag = 40004,
+ IllegalMessageKey = 40005,
+ IllegalMessageGroup = 40006,
+ IllegalMessageProperty = 40007,
+ InvalidTransactionId = 40008,
+ IllegalMessageId = 40009,
+ IllegalFilterExpression = 40010,
+ InvalidReceiptHandle = 40011,
+
+ /**
+ * @brief Message property conflicts with its type.
+ */
+ MessagePropertyConflictWithType = 40012,
+
+ // Client type is not recognized by server
+ UnsupportedClientType = 40013,
+
+ // Received message is corrupted, generally failing to pass integrity checksum validation.
+ MessageCorrupted = 40014,
+
+ // Request is rejected due to missing of x-mq-client-id HTTP header in the metadata frame.
+ ClientIdRequired = 40015,
/**
* @brief Authentication failed. Possibly caused by invalid credentials.
*
*/
- Unauthorized = 401,
+ Unauthorized = 40100,
/**
* @brief Credentials are understood by server but authenticated user does not
* have privilege to perform the requested action.
*
*/
- Forbidden = 403,
+ Forbidden = 40300,
/**
* @brief Topic not found, which should be created through console or
* administration API before hand.
*
*/
- NotFound = 404,
+ NotFound = 40400,
- TopicNotFound = 404001,
+ MessageNotFound = 40401,
- GroupNotFound = 404002,
+ TopicNotFound = 404002,
+
+ ConsumerGroupNotFound = 404003,
/**
* @brief Timeout when connecting, reading from or writing to brokers.
*
*/
- RequestTimeout = 408,
+ RequestTimeout = 40800,
/**
* @brief Message body is too large.
*
*/
- PayloadTooLarge = 413,
+ PayloadTooLarge = 41300,
+
+ /**
+ * @brief Message body size exceeds limited allowed by server.
+ */
+ MessageBodyTooLarge = 41301,
/**
* @brief When trying to perform an action whose dependent procedure state is
@@ -100,14 +150,14 @@ enum class ErrorCode : int {
* 2. Commit/Rollback a transactional message that does not exist;
* 3. Commit an offset which is greater than maximum of partition;
*/
- PreconditionRequired = 428,
+ PreconditionRequired = 42800,
/**
* @brief Quota exchausted. The user has sent too many requests in a given
* amount of time.
*
*/
- TooManyRequest = 429,
+ TooManyRequests = 42900,
/**
* @brief The server is unwilling to process the request because either an
@@ -115,62 +165,69 @@ enum class ErrorCode : int {
* large
*
*/
- HeaderFieldsTooLarge = 431,
+ HeaderFieldsTooLarge = 43100,
+
+ // Message properties total size exceeds the threshold.
+ MessagePropertiesTooLarge = 43101,
/**
* @brief A server operator has received a legal demand to deny access to a
* resource or to a set of resources that includes the requested resource.
*
*/
- UnavailableForLegalReasons = 451,
+ UnavailableForLegalReasons = 45100,
/**
* @brief Server side interval error
*
*/
- InternalServerError = 500,
+ InternalServerError = 50000,
/**
* @brief The server either does not recognize the request method, or it lacks
* the ability to fulfil the request.
*
*/
- NotImplemented = 501,
+ NotImplemented = 50100,
/**
* @brief The server was acting as a gateway or proxy and received an invalid
* response from the upstream server.
*
*/
- BadGateway = 502,
+ BadGateway = 50200,
/**
* @brief The server cannot handle the request (because it is overloaded or
* down for maintenance). Generally, this is a temporary state.
*
*/
- ServiceUnavailable = 503,
+ ServiceUnavailable = 50300,
/**
* @brief The server was acting as a gateway or proxy and did not receive a
* timely response from the upstream server.
*
*/
- GatewayTimeout = 504,
+ GatewayTimeout = 50400,
/**
* @brief The server does not support the protocol version used in the
* request.
*
*/
- ProtocolVersionNotSupported = 505,
+ NotSupported = 50500,
+
+ ProtocolUnsupported = 50501,
+
+ VerifyFifoMessageUnsupported = 50502,
/**
* @brief The server is unable to store the representation needed to complete
* the request.
*
*/
- InsufficientStorage = 507,
+ InsufficientStorage = 50700,
};
std::error_code make_error_code(ErrorCode code);
diff --git a/cpp/api/rocketmq/Tracing.h b/cpp/api/rocketmq/Tracing.h
index bc12362..cdec455 100644
--- a/cpp/api/rocketmq/Tracing.h
+++ b/cpp/api/rocketmq/Tracing.h
@@ -16,12 +16,18 @@
*/
#pragma once
-#include "opencensus/trace/sampler.h"
+#include <memory>
#include "RocketMQ.h"
+#include "opencensus/trace/sampler.h"
ROCKETMQ_NAMESPACE_BEGIN
-opencensus::trace::Sampler* traceSampler() __attribute__((weak));
+class TracingSamplerProvider {
+public:
+ virtual ~TracingSamplerProvider() = default;
+
+ virtual std::unique_ptr<opencensus::trace::Sampler> tracingSampler() = 0;
+};
ROCKETMQ_NAMESPACE_END
diff --git a/cpp/bazel/rocketmq_deps.bzl b/cpp/bazel/rocketmq_deps.bzl
index 39f3642..983a476 100644
--- a/cpp/bazel/rocketmq_deps.bzl
+++ b/cpp/bazel/rocketmq_deps.bzl
@@ -26,7 +26,8 @@ def rocketmq_deps():
sha256 = "b4870bf121ff7795ba20d20bcdd8627b8e088f2d1dab299a031c1034eddc93d5",
strip_prefix = "googletest-release-1.11.0",
urls = [
- "https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz",
+ "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googletest/googletest-release-1.11.0.tar.gz",
+ "https://github.com/google/googletest/archive/refs/tags/release-1.11.0.tar.gz",
],
)
@@ -36,6 +37,7 @@ def rocketmq_deps():
strip_prefix = "filesystem-1.5.0",
sha256 = "eb6f3b0739908ad839cde68885d70e7324db191b9fad63d9915beaa40444d9cb",
urls = [
+ "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/filesystem/filesystem-1.5.0.tar.gz",
"https://github.com/gulrak/filesystem/archive/v1.5.0.tar.gz",
],
build_file = "@org_apache_rocketmq//third_party:filesystem.BUILD",
@@ -47,6 +49,7 @@ def rocketmq_deps():
strip_prefix = "spdlog-1.9.2",
sha256 = "6fff9215f5cb81760be4cc16d033526d1080427d236e86d70bb02994f85e3d38",
urls = [
+ "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/spdlog/spdlog-1.9.2.tar.gz",
"https://github.com/gabime/spdlog/archive/refs/tags/v1.9.2.tar.gz",
],
build_file = "@org_apache_rocketmq//third_party:spdlog.BUILD",
@@ -58,6 +61,7 @@ def rocketmq_deps():
strip_prefix = "fmt-8.0.1",
sha256 = "b06ca3130158c625848f3fb7418f235155a4d389b2abc3a6245fb01cb0eb1e01",
urls = [
+ "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/fmt/fmt-8.0.1.tar.gz",
"https://github.com/fmtlib/fmt/archive/refs/tags/8.0.1.tar.gz",
],
build_file = "@org_apache_rocketmq//third_party:fmtlib.BUILD",
@@ -107,16 +111,16 @@ def rocketmq_deps():
],
)
- if "com_github_grpc_grpc" not in native.existing_rules():
- http_archive(
- name = "com_github_grpc_grpc",
- strip_prefix = "grpc-1.46.0",
- sha256 = "67423a4cd706ce16a88d1549297023f0f9f0d695a96dd684adc21e67b021f9bc",
- urls = [
- "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.0.tar.gz",
- "https://github.com/grpc/grpc/archive/refs/tags/v1.46.0.tar.gz",
- ],
- )
+ maybe(
+ http_archive,
+ name = "com_github_grpc_grpc",
+ strip_prefix = "grpc-1.46.3",
+ sha256 = "d6cbf22cb5007af71b61c6be316a79397469c58c82a942552a62e708bce60964",
+ urls = [
+ "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/grpc/grpc-1.46.3.tar.gz",
+ "https://github.com/grpc/grpc/archive/refs/tags/v1.46.3.tar.gz",
+ ],
+ )
maybe(
http_archive,
@@ -125,6 +129,7 @@ def rocketmq_deps():
build_file = "@org_apache_rocketmq//third_party:asio.BUILD",
strip_prefix = "asio-1.18.2",
urls = [
+ "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/asio/asio-1.18.2.tar.gz",
"https://github.com/lizhanhui/asio/archive/refs/tags/v1.18.2.tar.gz",
],
)
@@ -135,7 +140,10 @@ def rocketmq_deps():
sha256 = "0ff62e28eb0f6e563178d44b77c94dddb8702141d83dd34b83cb046399c2b1d5",
build_file = "@org_apache_rocketmq//third_party:cpp_httplib.BUILD",
strip_prefix = "cpp-httplib-0.9.4",
- urls = ["https://github.com/yhirose/cpp-httplib/archive/refs/tags/v0.9.4.tar.gz"],
+ urls = [
+ "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/cpp-httplib/cpp-httplib-0.9.4.tar.gz",
+ "https://github.com/yhirose/cpp-httplib/archive/refs/tags/v0.9.4.tar.gz",
+ ],
)
maybe(
@@ -143,6 +151,7 @@ def rocketmq_deps():
name = "com_google_googleapis",
sha256 = "e89f15d54b0ddab0cd41d18cb2299e5447db704e2b05ff141cb1769170671466",
urls = [
+ "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/googleapis/googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip",
"https://github.com/googleapis/googleapis/archive/af7fb72df59a814221b123a4d1acb3f6c3e6cc95.zip"
],
strip_prefix = "googleapis-af7fb72df59a814221b123a4d1acb3f6c3e6cc95",
@@ -167,4 +176,26 @@ def rocketmq_deps():
"https://github.com/bazelbuild/rules_swift/archive/refs/tags/0.27.0.tar.gz",
],
strip_prefix = "rules_swift-0.27.0",
+ )
+
+ maybe(
+ http_archive,
+ name = "io_bazel_rules_go",
+ sha256 = "685052b498b6ddfe562ca7a97736741d87916fe536623afb7da2824c0211c369",
+ urls = [
+ "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules-go/rules_go-v0.33.0.zip",
+ "https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip",
+ "https://github.com/bazelbuild/rules_go/releases/download/v0.33.0/rules_go-v0.33.0.zip",
+ ],
+ )
+
+ maybe(
+ http_archive,
+ name = "rules_proto",
+ sha256 = "e017528fd1c91c5a33f15493e3a398181a9e821a804eb7ff5acdd1d2d6c2b18d",
+ strip_prefix = "rules_proto-4.0.0-3.20.0",
+ urls = [
+ "https://shutian.oss-cn-hangzhou.aliyuncs.com/cdn/rules_proto/rules_proto-4.0.0-3.20.0.tar.gz",
+ "https://github.com/bazelbuild/rules_proto/archive/refs/tags/4.0.0-3.20.0.tar.gz",
+ ],
)
\ No newline at end of file
diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto b/cpp/proto/apache/rocketmq/v2/definition.proto
index 58bf06d..67520fe 100644
--- a/cpp/proto/apache/rocketmq/v2/definition.proto
+++ b/cpp/proto/apache/rocketmq/v2/definition.proto
@@ -292,106 +292,92 @@ message Assignment {
}
enum Code {
- // Success.
- OK = 0;
+ CODE_UNSPECIFIED = 0;
+
+ // Generic code for success.
+ OK = 20000;
+
+ // Generic code for multiple return results.
+ MULTIPLE_RESULTS = 30000;
+
+ // Generic code for bad request, indicating that required fields or headers are missing.
+ BAD_REQUEST = 40000;
// Format of access point is illegal.
- ILLEGAL_ACCESS_POINT = 1;
+ ILLEGAL_ACCESS_POINT = 40001;
// Format of topic is illegal.
- ILLEGAL_TOPIC = 2;
+ ILLEGAL_TOPIC = 40002;
// Format of consumer group is illegal.
- ILLEGAL_CONSUMER_GROUP = 3;
+ ILLEGAL_CONSUMER_GROUP = 40003;
// Format of message tag is illegal.
- ILLEGAL_MESSAGE_TAG = 4;
+ ILLEGAL_MESSAGE_TAG = 40004;
// Format of message key is illegal.
- ILLEGAL_MESSAGE_KEY = 5;
- // Size of message keys exceeds the threshold.
- MESSAGE_KEYS_TOO_LARGE = 6;
+ ILLEGAL_MESSAGE_KEY = 40005;
// Format of message group is illegal.
- ILLEGAL_MESSAGE_GROUP = 7;
+ ILLEGAL_MESSAGE_GROUP = 40006;
// Format of message property key is illegal.
- ILLEGAL_MESSAGE_PROPERTY_KEY = 8;
- // Message properties total size exceeds the threshold.
- MESSAGE_PROPERTIES_TOO_LARGE = 9;
- // Message body size exceeds the threshold.
- MESSAGE_BODY_TOO_LARGE = 10;
-
- // User does not have the permission to operate.
- // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/403
- FORBIDDEN = 403;
-
- // Code indicates that the client request has not been completed
- // because it lacks valid authentication credentials for the
- // requested resource.
- // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/401
- UNAUTHORIZED = 401;
-
- // Topic resource does not exist.
- TOPIC_NOT_FOUND = 13;
-
- // Consumer group resource does not exist.
- CONSUMER_GROUP_NOT_FOUND = 14;
-
- // Not allowed to verify message. Chances are that you are verifying
- // a FIFO message, as is violating FIFO semantics.
- VERIFY_MESSAGE_FORBIDDEN = 15;
-
- // Failed to consume message.
- FAILED_TO_CONSUME_MESSAGE = 16;
-
- // Message is corrupted.
- MESSAGE_CORRUPTED = 17;
-
- // Too many requests are made in short period of duration.
- // Requests are throttled.
- TOO_MANY_REQUESTS = 18;
-
- // Expired receipt-handle is used when trying to acknowledge or change
- // invisible duration of a message
- RECEIPT_HANDLE_EXPIRED = 19;
-
- // Message property is not match the message type.
- MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 20;
-
- // Format of message id is illegal.
- ILLEGAL_MESSAGE_ID = 21;
-
+ ILLEGAL_MESSAGE_PROPERTY_KEY = 40007;
// Transaction id is invalid.
- INVALID_TRANSACTION_ID = 22;
-
+ INVALID_TRANSACTION_ID = 40008;
+ // Format of message id is illegal.
+ ILLEGAL_MESSAGE_ID = 40009;
// Format of filter expression is illegal.
- ILLEGAL_FILTER_EXPRESSION = 23;
-
+ ILLEGAL_FILTER_EXPRESSION = 40010;
// Receipt handle of message is invalid.
- INVALID_RECEIPT_HANDLE = 24;
-
- // Message persistence timeout.
- MASTER_PERSISTENCE_TIMEOUT = 25;
+ INVALID_RECEIPT_HANDLE = 40011;
+ // Message property conflicts with its type.
+ MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40012;
+ // Client type could not be recognized.
+ UNRECOGNIZED_CLIENT_TYPE = 40013;
+ // Message is corrupted.
+ MESSAGE_CORRUPTED = 40014;
+ // Request is rejected due to missing of x-mq-client-id header.
+ CLIENT_ID_REQUIRED = 40015;
- // Slave persistence timeout.
- SLAVE_PERSISTENCE_TIMEOUT = 26;
+ // Generic code indicates that the client request lacks valid authentication
+ // credentials for the requested resource.
+ UNAUTHORIZED = 40100;
- // The HA-mechanism is not working now.
- HA_NOT_AVAILABLE = 27;
+ // Generic code indicates that the account is suspended due to overdue of payment.
+ PAYMENT_REQUIRED = 40200;
- // Operation is not allowed in current version.
- VERSION_UNSUPPORTED = 28;
+ // Generic code for the case that user does not have the permission to operate.
+ FORBIDDEN = 40300;
+ // Generic code for resource not found.
+ NOT_FOUND = 40400;
// Message not found from server.
- MESSAGE_NOT_FOUND = 29;
+ MESSAGE_NOT_FOUND = 40401;
+ // Topic resource does not exist.
+ TOPIC_NOT_FOUND = 40402;
+ // Consumer group resource does not exist.
+ CONSUMER_GROUP_NOT_FOUND = 40403;
+
+ // Generic code representing client side timeout when connecting to, reading data from, or write data to server.
+ REQUEST_TIMEOUT = 40800;
- // Message offset is illegal.
- ILLEGAL_MESSAGE_OFFSET = 30;
+ // Generic code represents that the request entity is larger than limits defined by server.
+ PAYLOAD_TOO_LARGE = 41300;
+ // Message body size exceeds the threshold.
+ MESSAGE_BODY_TOO_LARGE = 41301;
- // Illegal message is for the sake of backward compatibility. In most case,
- // more definitive code is better, e.g. `ILLEGAL_MESSAGE_TAG`.
- ILLEGAL_MESSAGE = 31;
+ // Generic code for use cases where pre-conditions are not met.
+ // For example, if a producer instance is used to publish messages without prior start() invocation,
+ // this error code will be raised.
+ PRECONDITION_FAILED = 42800;
- // Client type could not be recognized.
- UNRECOGNIZED_CLIENT_TYPE = 32;
+ // Generic code indicates that too many requests are made in short period of duration.
+ // Requests are throttled.
+ TOO_MANY_REQUESTS = 42900;
- // Return different results for entries in composite request.
- MULTIPLE_RESULTS = 33;
+ // Generic code for the case that the server is unwilling to process the request because its header fields are too large.
+ // The request may be resubmitted after reducing the size of the request header fields.
+ REQUEST_HEADER_FIELDS_TOO_LARGE = 43100;
+ // Message properties total size exceeds the threshold.
+ MESSAGE_PROPERTIES_TOO_LARGE = 43101;
+ // Generic code indicates that server/client encountered an unexpected
+ // condition that prevented it from fulfilling the request.
+ INTERNAL_ERROR = 50000;
// Code indicates that the server encountered an unexpected condition
// that prevented it from fulfilling the request.
// This error response is a generic "catch-all" response.
@@ -401,17 +387,33 @@ enum Code {
// to prevent the error from happening again in the future.
//
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
- INTERNAL_SERVER_ERROR = 500;
+ INTERNAL_SERVER_ERROR = 50001;
+ // The HA-mechanism is not working now.
+ HA_NOT_AVAILABLE = 50002;
- // Code means that the server or client does not support the functionality
- // required to fulfill the request.
- NOT_IMPLEMENTED = 501;
+ // Generic code means that the server or client does not support the
+ // functionality required to fulfill the request.
+ NOT_IMPLEMENTED = 50100;
- // Code indicates that the server, while acting as a gateway or proxy,
- // did not get a response in time from the upstream server that
- // it needed in order to complete the request.
+ // Generic code represents that the server, which acts as a gateway or proxy,
+ // does not get an satisfied response in time from its upstream servers.
// See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
- GATEWAY_TIMEOUT = 504;
+ PROXY_TIMEOUT = 50400;
+ // Message persistence timeout.
+ MASTER_PERSISTENCE_TIMEOUT = 50401;
+ // Slave persistence timeout.
+ SLAVE_PERSISTENCE_TIMEOUT = 50402;
+
+ // Generic code for unsupported operation.
+ UNSUPPORTED = 50500;
+ // Operation is not allowed in current version.
+ VERSION_UNSUPPORTED = 50501;
+ // Not allowed to verify message. Chances are that you are verifying
+ // a FIFO message, as is violating FIFO semantics.
+ VERIFY_FIFO_MESSAGE_UNSUPPORTED = 50502;
+
+ // Generic code for failed message consumption.
+ FAILED_TO_CONSUME_MESSAGE = 60000;
}
message Status {
diff --git a/cpp/proto/apache/rocketmq/v2/service.proto b/cpp/proto/apache/rocketmq/v2/service.proto
index fa2a2f0..c5d4cce 100644
--- a/cpp/proto/apache/rocketmq/v2/service.proto
+++ b/cpp/proto/apache/rocketmq/v2/service.proto
@@ -16,6 +16,7 @@
syntax = "proto3";
import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
import "apache/rocketmq/v2/definition.proto";
@@ -101,6 +102,8 @@ message ReceiveMessageResponse {
oneof content {
Status status = 1;
Message message = 2;
+ // The timestamp that brokers start to deliver status line or message.
+ google.protobuf.Timestamp delivery_timestamp = 3;
}
}
@@ -211,6 +214,10 @@ message Publishing {
// reject the request. As a result, it is advisable that Producer performs
// client-side check validation.
int32 max_body_size = 3;
+
+ // When `validate_message_type` flag set `false`, no need to validate message's type
+ // with messageQueue's `accept_message_types` before publising.
+ bool validate_message_type = 4;
}
message Subscription {
diff --git a/cpp/src/main/cpp/base/ErrorCategory.cpp b/cpp/src/main/cpp/base/ErrorCategory.cpp
index 4fe2ea0..5ca2528 100644
--- a/cpp/src/main/cpp/base/ErrorCategory.cpp
+++ b/cpp/src/main/cpp/base/ErrorCategory.cpp
@@ -16,6 +16,8 @@
*/
#include "rocketmq/ErrorCategory.h"
+#include "rocketmq/ErrorCode.h"
+
ROCKETMQ_NAMESPACE_BEGIN
std::string ErrorCategory::message(int code) const {
@@ -53,7 +55,7 @@ std::string ErrorCategory::message(int code) const {
case ErrorCode::TopicNotFound:
return "Topic is not found. Verify the request topic has already been created through console or management API";
- case ErrorCode::GroupNotFound:
+ case ErrorCode::ConsumerGroupNotFound:
return "Group is not found. Verify the request group has already been created through console or management API";
case ErrorCode::RequestTimeout:
@@ -65,7 +67,7 @@ std::string ErrorCategory::message(int code) const {
case ErrorCode::PreconditionRequired:
return "State of dependent procedure is not right";
- case ErrorCode::TooManyRequest:
+ case ErrorCode::TooManyRequests:
return "Quota exchausted. The user has sent too many requests in a given "
"amount of time.";
@@ -100,7 +102,7 @@ std::string ErrorCategory::message(int code) const {
"a timely response from the upstream "
"server.";
- case ErrorCode::ProtocolVersionNotSupported:
+ case ErrorCode::ProtocolUnsupported:
return "The server does not support the protocol version used in the "
"request.";
@@ -108,9 +110,26 @@ std::string ErrorCategory::message(int code) const {
return "The server is unable to store the representation needed to "
"complete the request.";
- default:
- return "Not-Implemented";
+ case ErrorCode::MultipleResults:
+ return "Multiple results are available";
+
+ case ErrorCode::IllegalAccessPoint:
+ return "Access point is either malformed or invalid";
+
+ case ErrorCode::IllegalTopic: {
+ return "Topic is illegal either due to length is too long or invalid character is included";
+ }
+
+ case ErrorCode::IllegalConsumerGroup: {
+ return "ConsumerGroup is illegal due to its length is too long or invalid character is included";
+ }
+
+ case ErrorCode::IllegalMessageTag: {
+ return "Format of message tag is illegal.";
+ }
}
+
+ return "";
}
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/base/Tracing.cpp b/cpp/src/main/cpp/base/Tracing.cpp
deleted file mode 100644
index 9eb7335..0000000
--- a/cpp/src/main/cpp/base/Tracing.cpp
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "rocketmq/Tracing.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-opencensus::trace::Sampler* traceSampler() {
- static opencensus::trace::NeverSampler sampler;
- return &sampler;
-}
-
-ROCKETMQ_NAMESPACE_END
diff --git a/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp b/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp
index 168b87a..1e3c7bf 100644
--- a/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp
+++ b/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp
@@ -20,12 +20,14 @@
ROCKETMQ_NAMESPACE_BEGIN
TEST(RetryPolicyTest, testBackoff) {
- RetryPolicy policy{.max_attempt = 3,
- .strategy = BackoffStrategy::Customized,
- .initial = absl::Milliseconds(0),
- .max = absl::Milliseconds(0),
- .multiplier = 0.0f,
- .next = {absl::Milliseconds(10), absl::Milliseconds(100), absl::Milliseconds(500)}};
+ RetryPolicy policy;
+ policy.max_attempt = 3;
+ policy.strategy = BackoffStrategy::Customized;
+ policy.initial = absl::Milliseconds(0);
+ policy.max = absl::Milliseconds(0);
+ policy.multiplier = 0.0f;
+ policy.next = {absl::Milliseconds(10), absl::Milliseconds(100), absl::Milliseconds(500)};
+
ASSERT_EQ(policy.backoff(1), 10);
ASSERT_EQ(policy.backoff(2), 100);
ASSERT_EQ(policy.backoff(3), 500);
diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
index 0c6e155..1f9eec6 100644
--- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
@@ -16,6 +16,8 @@
*/
#include "ClientManagerImpl.h"
+#include <apache/rocketmq/v2/definition.pb.h>
+
#include <atomic>
#include <cassert>
#include <chrono>
@@ -229,26 +231,57 @@ void ClientManagerImpl::heartbeat(const std::string& target_host, const Metadata
switch (status.code()) {
case rmq::Code::OK: {
cb(ec, invocation_context->response);
- } break;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+ SPDLOG_ERROR("IllegalConsumerGroup: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalConsumerGroup;
+ break;
+ }
+
+ case rmq::Code::TOO_MANY_REQUESTS: {
+ SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::TooManyRequests;
+ cb(ec, invocation_context->response);
+ break;
+ }
+
case rmq::Code::UNAUTHORIZED: {
- SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address);
+ SPDLOG_WARN("Unauthorized: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Unauthorized;
cb(ec, invocation_context->response);
- } break;
- case rmq::Code::FORBIDDEN: {
- SPDLOG_WARN("Forbidden: {}, host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::Forbidden;
+ break;
+ }
+
+ case rmq::Code::UNRECOGNIZED_CLIENT_TYPE: {
+ SPDLOG_ERROR("UnsupportedClientType: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::UnsupportedClientType;
+ cb(ec, invocation_context->response);
+ break;
+ }
+
+ case rmq::Code::CLIENT_ID_REQUIRED: {
+ SPDLOG_ERROR("ClientIdRequired: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::InternalClientError;
cb(ec, invocation_context->response);
- } break;
+ break;
+ }
+
case rmq::Code::INTERNAL_SERVER_ERROR: {
SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::InternalServerError;
cb(ec, invocation_context->response);
- } break;
+ break;
+ }
+
default: {
- SPDLOG_WARN("NotImplemented: Please upgrade SDK to latest release. Message={}, host={}", status.message(),
+ SPDLOG_WARN("NotSupported: Please upgrade SDK to latest release. Message={}, host={}", status.message(),
invocation_context->remote_address);
- } break;
+ ec = ErrorCode::NotSupported;
+ cb(ec, invocation_context->response);
+ break;
+ }
}
};
@@ -319,33 +352,122 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met
send_receipt.message_id = invocation_context->response.entries().begin()->message_id();
break;
}
+
+ case rmq::Code::ILLEGAL_TOPIC: {
+ SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalTopic;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_MESSAGE_TAG: {
+ SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalMessageTag;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_MESSAGE_KEY: {
+ SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalMessageKey;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_MESSAGE_GROUP: {
+ SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalMessageGroup;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: {
+ SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalMessageProperty;
+ break;
+ }
+
+ case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: {
+ SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::MessagePropertiesTooLarge;
+ break;
+ }
+
+ case rmq::Code::MESSAGE_BODY_TOO_LARGE: {
+ SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::MessageBodyTooLarge;
+ break;
+ }
+
case rmq::Code::TOPIC_NOT_FOUND: {
- SPDLOG_WARN("TopicNotFound: {}", status.message());
+ SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::TopicNotFound;
+ break;
+ }
+
+ case rmq::Code::NOT_FOUND: {
+ SPDLOG_WARN("NotFound: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::NotFound;
break;
}
+
case rmq::Code::UNAUTHORIZED: {
- SPDLOG_WARN("Unauthenticated: {}", status.message());
+ SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Unauthorized;
break;
}
+
case rmq::Code::FORBIDDEN: {
- SPDLOG_WARN("Forbidden: {}", status.message());
+ SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Forbidden;
- cb(ec, send_receipt);
break;
}
+
+ case rmq::Code::MESSAGE_CORRUPTED: {
+ SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::MessageCorrupted;
+ break;
+ }
+
+ case rmq::Code::TOO_MANY_REQUESTS: {
+ SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::TooManyRequests;
+ break;
+ }
+
case rmq::Code::INTERNAL_SERVER_ERROR: {
- SPDLOG_WARN("InternalServerError: {}", status.message());
+ SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::InternalServerError;
+ break;
+ }
+
+ case rmq::Code::HA_NOT_AVAILABLE: {
+ SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::InternalServerError;
break;
}
+
+ case rmq::Code::PROXY_TIMEOUT: {
+ SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::GatewayTimeout;
+ break;
+ }
+
+ case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: {
+ SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::GatewayTimeout;
+ break;
+ }
+
+ case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: {
+ SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::GatewayTimeout;
+ break;
+ }
+
default: {
- SPDLOG_WARN("Unsupported status code. Check and upgrade SDK to the latest");
- ec = ErrorCode::NotImplemented;
+ SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest. Host={}", invocation_context->remote_address);
+ ec = ErrorCode::NotSupported;
break;
}
}
+
cb(ec, send_receipt);
};
@@ -478,32 +600,64 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metad
}
auto ptr = std::make_shared<TopicRouteData>(std::move(message_queues));
cb(ec, ptr);
- } break;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_ACCESS_POINT: {
+ SPDLOG_WARN("IllegalAccessPoint: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalAccessPoint;
+ cb(ec, nullptr);
+ break;
+ }
+
case rmq::Code::UNAUTHORIZED: {
SPDLOG_WARN("Unauthorized: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Unauthorized;
cb(ec, nullptr);
- } break;
- case rmq::Code::FORBIDDEN: {
- SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::Forbidden;
- cb(ec, nullptr);
- } break;
+ break;
+ }
+
case rmq::Code::TOPIC_NOT_FOUND: {
SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::NotFound;
cb(ec, nullptr);
- } break;
+ break;
+ }
+
+ case rmq::Code::TOO_MANY_REQUESTS: {
+ SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::TooManyRequests;
+ cb(ec, nullptr);
+ break;
+ }
+
+ case rmq::Code::CLIENT_ID_REQUIRED: {
+ SPDLOG_ERROR("ClientIdRequired: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::InternalClientError;
+ cb(ec, nullptr);
+ break;
+ }
+
case rmq::Code::INTERNAL_SERVER_ERROR: {
SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::InternalServerError;
cb(ec, nullptr);
- } break;
+ break;
+ }
+
+ case rmq::Code::PROXY_TIMEOUT: {
+ SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::GatewayTimeout;
+ cb(ec, nullptr);
+ break;
+ }
+
default: {
SPDLOG_WARN("NotImplement: Please upgrade to latest SDK release. Host={}", invocation_context->remote_address);
ec = ErrorCode::NotImplemented;
cb(ec, nullptr);
- } break;
+ break;
+ }
}
};
invocation_context->callback = callback;
@@ -529,25 +683,75 @@ void ClientManagerImpl::queryAssignment(
std::error_code ec;
switch (status.code()) {
case rmq::Code::OK: {
- SPDLOG_DEBUG("Query assignment OK");
- } break;
+ SPDLOG_DEBUG("Query assignment OK. Host={}", invocation_context->remote_address);
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_ACCESS_POINT: {
+ SPDLOG_WARN("IllegalAccessPoint: {}, host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalAccessPoint;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_TOPIC: {
+ SPDLOG_WARN("IllegalAccessPoint: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalTopic;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+ SPDLOG_WARN("IllegalConsumerGroup: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalConsumerGroup;
+ break;
+ }
+
+ case rmq::Code::CLIENT_ID_REQUIRED: {
+ SPDLOG_WARN("ClientIdRequired: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::InternalClientError;
+ break;
+ }
+
case rmq::Code::UNAUTHORIZED: {
SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Unauthorized;
- } break;
+ break;
+ }
+
case rmq::Code::FORBIDDEN: {
SPDLOG_WARN("Forbidden: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Forbidden;
- } break;
+ break;
+ }
+
+ case rmq::Code::TOPIC_NOT_FOUND: {
+ SPDLOG_WARN("TopicNotFound: {}, host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::TopicNotFound;
+ break;
+ }
+
+ case rmq::Code::CONSUMER_GROUP_NOT_FOUND: {
+ SPDLOG_WARN("ConsumerGroupNotFound: {}, host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::ConsumerGroupNotFound;
+ break;
+ }
+
case rmq::Code::INTERNAL_SERVER_ERROR: {
SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::InternalServerError;
- } break;
+ break;
+ }
+
+ case rmq::Code::PROXY_TIMEOUT: {
+ SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::GatewayTimeout;
+ break;
+ }
+
default: {
- SPDLOG_WARN("NotImplemented: please upgrade SDK to latest release. Host={}",
- invocation_context->remote_address);
- ec = ErrorCode::NotImplemented;
- } break;
+ SPDLOG_WARN("NotSupported: please upgrade SDK to latest release. Host={}", invocation_context->remote_address);
+ ec = ErrorCode::NotSupported;
+ break;
+ }
}
cb(ec, invocation_context->response);
};
@@ -806,23 +1010,80 @@ void ClientManagerImpl::ack(const std::string& target, const Metadata& metadata,
switch (status.code()) {
case rmq::Code::OK: {
SPDLOG_DEBUG("Ack OK. host={}", invocation_context->remote_address);
- } break;
+ break;
+ }
+
+ case rmq::Code::MULTIPLE_RESULTS: {
+ SPDLOG_DEBUG("Server returns multiple results. host={}", invocation_context->remote_address);
+ // Treat it as successful, allowing top tier processing according to response entries.
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_TOPIC: {
+ SPDLOG_WARN("IllegalTopic: {}, host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalTopic;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+ SPDLOG_WARN("IllegalConsumerGroup: {}, host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalConsumerGroup;
+ break;
+ }
+
+ case rmq::Code::INVALID_RECEIPT_HANDLE: {
+ SPDLOG_WARN("InvalidReceiptHandle: {}, host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::InvalidReceiptHandle;
+ break;
+ }
+
+ case rmq::Code::CLIENT_ID_REQUIRED: {
+ SPDLOG_WARN("ClientIdRequired: {}, host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::InternalClientError;
+ break;
+ }
+
+ case rmq::Code::TOPIC_NOT_FOUND: {
+ SPDLOG_WARN("TopicNotFound: {}, host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::TopicNotFound;
+ break;
+ }
+
case rmq::Code::UNAUTHORIZED: {
SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Unauthorized;
- } break;
+ break;
+ }
+
case rmq::Code::FORBIDDEN: {
SPDLOG_WARN("PermissionDenied: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Forbidden;
- } break;
+ break;
+ }
+
+ case rmq::Code::TOO_MANY_REQUESTS: {
+ SPDLOG_WARN("TooManyRequests: {}, host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::TooManyRequests;
+ break;
+ }
+
case rmq::Code::INTERNAL_SERVER_ERROR: {
SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::InternalServerError;
- } break;
+ break;
+ }
+
+ case rmq::Code::PROXY_TIMEOUT: {
+ SPDLOG_WARN("GatewayTimeout: {}, host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::GatewayTimeout;
+ break;
+ }
+
default: {
- SPDLOG_WARN("NotImplement: please upgrade SDK to latest release. host={}", invocation_context->remote_address);
- ec = ErrorCode::NotImplemented;
- } break;
+ SPDLOG_WARN("NotSupported: please upgrade SDK to latest release. host={}", invocation_context->remote_address);
+ ec = ErrorCode::NotSupported;
+ break;
+ }
}
cb(ec);
};
@@ -856,29 +1117,64 @@ void ClientManagerImpl::changeInvisibleDuration(
std::error_code ec;
auto&& status = invocation_context->response.status();
+ auto&& peer_address = invocation_context->remote_address;
switch (status.code()) {
case rmq::Code::OK: {
- SPDLOG_DEBUG("Nack to {} OK", invocation_context->remote_address);
+ SPDLOG_DEBUG("ChangeInvisibleDuration to {} OK", peer_address);
break;
};
+
+ case rmq::Code::ILLEGAL_TOPIC: {
+ SPDLOG_WARN("IllegalTopic: {}. Host={}", status.message(), peer_address);
+ ec = ErrorCode::IllegalTopic;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+ SPDLOG_WARN("IllegalConsumerGroup: {}. Host={}", status.message(), peer_address);
+ ec = ErrorCode::IllegalConsumerGroup;
+ break;
+ }
+
case rmq::Code::UNAUTHORIZED: {
SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Unauthorized;
break;
}
+
+ case rmq::Code::INVALID_RECEIPT_HANDLE: {
+ SPDLOG_WARN("InvalidReceiptHandle: {}, host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::InvalidReceiptHandle;
+ break;
+ }
+
+ case rmq::Code::CLIENT_ID_REQUIRED: {
+ SPDLOG_WARN("ClientIdRequired: {}, host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::InternalClientError;
+ break;
+ }
+
case rmq::Code::FORBIDDEN: {
SPDLOG_WARN("Forbidden: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Forbidden;
break;
}
+
case rmq::Code::INTERNAL_SERVER_ERROR: {
SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::InternalServerError;
break;
}
+
+ case rmq::Code::TOO_MANY_REQUESTS: {
+ SPDLOG_WARN("TooManyRequests: {}, host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::TooManyRequests;
+ break;
+ }
+
default: {
- SPDLOG_WARN("NotImplemented: Please upgrade to latest SDK, host={}", invocation_context->remote_address);
- ec = ErrorCode::NotImplemented;
+ SPDLOG_WARN("NotSupported: Please upgrade to latest SDK, host={}", invocation_context->remote_address);
+ ec = ErrorCode::NotSupported;
break;
}
}
@@ -927,27 +1223,72 @@ void ClientManagerImpl::endTransaction(
}
auto&& status = invocation_context->response.status();
+ auto&& peer_address = invocation_context->remote_address;
switch (status.code()) {
case rmq::Code::OK: {
SPDLOG_DEBUG("endTransaction completed OK. Response: {}, host={}", invocation_context->response.DebugString(),
- invocation_context->remote_address);
- } break;
+ peer_address);
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_TOPIC: {
+ SPDLOG_WARN("IllegalTopic: {}, host={}", status.message(), peer_address);
+ ec = ErrorCode::IllegalTopic;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+ SPDLOG_WARN("IllegalConsumerGroup: {}, host={}", status.message(), peer_address);
+ ec = ErrorCode::IllegalConsumerGroup;
+ break;
+ }
+
+ case rmq::Code::INVALID_TRANSACTION_ID: {
+ SPDLOG_WARN("InvalidTransactionId: {}, host={}", status.message(), peer_address);
+ ec = ErrorCode::InvalidTransactionId;
+ break;
+ }
+
+ case rmq::Code::CLIENT_ID_REQUIRED: {
+ SPDLOG_WARN("ClientIdRequired: {}, host={}", status.message(), peer_address);
+ ec = ErrorCode::InternalClientError;
+ break;
+ }
+
+ case rmq::Code::TOPIC_NOT_FOUND: {
+ SPDLOG_WARN("TopicNotFound: {}, host={}", status.message(), peer_address);
+ ec = ErrorCode::TopicNotFound;
+ break;
+ }
+
case rmq::Code::UNAUTHORIZED: {
- SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address);
+ SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), peer_address);
ec = ErrorCode::Unauthorized;
- } break;
+ break;
+ }
+
case rmq::Code::FORBIDDEN: {
- SPDLOG_WARN("Forbidden: {}, host={}", status.message(), invocation_context->remote_address);
+ SPDLOG_WARN("Forbidden: {}, host={}", status.message(), peer_address);
ec = ErrorCode::Forbidden;
- } break;
+ break;
+ }
+
case rmq::Code::INTERNAL_SERVER_ERROR: {
- SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
+ SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), peer_address);
ec = ErrorCode::InternalServerError;
- } break;
+ break;
+ }
+
+ case rmq::Code::PROXY_TIMEOUT: {
+ SPDLOG_WARN("GatewayTimeout: {}, host={}", status.message(), peer_address);
+ ec = ErrorCode::GatewayTimeout;
+ break;
+ }
+
default: {
- SPDLOG_WARN("NotImplemented: please upgrade SDK to latest release. {}, host={}", status.message(),
- invocation_context->remote_address);
- ec = ErrorCode::NotImplemented;
+ SPDLOG_WARN("NotSupported: please upgrade SDK to latest release. {}, host={}", status.message(), peer_address);
+ ec = ErrorCode::NotSupported;
+ break;
}
}
cb(ec, invocation_context->response);
@@ -985,22 +1326,64 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe
SPDLOG_DEBUG("Received forwardToDeadLetterQueue response from server[host={}]", invocation_context->remote_address);
std::error_code ec;
- switch (invocation_context->response.status().code()) {
+ auto&& status = invocation_context->response.status();
+ auto&& peer_address = invocation_context->remote_address;
+ switch (status.code()) {
case rmq::Code::OK: {
break;
}
+ case rmq::Code::ILLEGAL_TOPIC: {
+ SPDLOG_WARN("IllegalTopic: {}. Host={}", status.message(), peer_address);
+ ec = ErrorCode::IllegalTopic;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+ SPDLOG_WARN("IllegalConsumerGroup: {}. Host={}", status.message(), peer_address);
+ ec = ErrorCode::IllegalConsumerGroup;
+ break;
+ }
+
+ case rmq::Code::INVALID_RECEIPT_HANDLE: {
+ SPDLOG_WARN("IllegalReceiptHandle: {}. Host={}", status.message(), peer_address);
+ ec = ErrorCode::InvalidReceiptHandle;
+ break;
+ }
+
+ case rmq::Code::CLIENT_ID_REQUIRED: {
+ SPDLOG_WARN("IllegalTopic: {}. Host={}", status.message(), peer_address);
+ ec = ErrorCode::InternalClientError;
+ break;
+ }
+
+ case rmq::Code::TOPIC_NOT_FOUND: {
+ ec = ErrorCode::TopicNotFound;
+ break;
+ }
+
case rmq::Code::INTERNAL_SERVER_ERROR: {
ec = ErrorCode::ServiceUnavailable;
+ break;
}
+
case rmq::Code::TOO_MANY_REQUESTS: {
- ec = ErrorCode::TooManyRequest;
+ ec = ErrorCode::TooManyRequests;
+ break;
}
+
+ case rmq::Code::PROXY_TIMEOUT: {
+ ec = ErrorCode::GatewayTimeout;
+ break;
+ }
+
default: {
ec = ErrorCode::NotImplemented;
+ break;
}
}
cb(ec);
};
+
invocation_context->callback = callback;
client->asyncForwardMessageToDeadLetterQueue(request, invocation_context);
}
@@ -1042,26 +1425,36 @@ std::error_code ClientManagerImpl::notifyClientTermination(const std::string& ta
SPDLOG_DEBUG("NotifyClientTermination OK. host={}", target_host);
break;
}
+
+ case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+ SPDLOG_ERROR("IllegalConsumerGroup: {}. Host={}", status.message(), target_host);
+ ec = ErrorCode::IllegalConsumerGroup;
+ break;
+ }
+
case rmq::Code::INTERNAL_SERVER_ERROR: {
SPDLOG_WARN("InternalServerError: Cause={}, host={}", status.message(), target_host);
ec = ErrorCode::InternalServerError;
break;
}
+
case rmq::Code::UNAUTHORIZED: {
SPDLOG_WARN("Unauthorized due to lack of valid authentication credentials: Cause={}, host={}", status.message(),
target_host);
ec = ErrorCode::Unauthorized;
break;
}
+
case rmq::Code::FORBIDDEN: {
SPDLOG_WARN("Forbidden due to insufficient permission to the resource: Cause={}, host={}", status.message(),
target_host);
ec = ErrorCode::Forbidden;
break;
}
+
default: {
- SPDLOG_WARN("NotImplemented. Please upgrade to latest SDK release. host={}", target_host);
- ec = ErrorCode::NotImplemented;
+ SPDLOG_WARN("NotSupported. Please upgrade to latest SDK release. host={}", target_host);
+ ec = ErrorCode::NotSupported;
break;
}
}
diff --git a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
index 223c879..af99874 100644
--- a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
@@ -17,7 +17,10 @@
#include "ReceiveMessageStreamReader.h"
+#include <apache/rocketmq/v2/definition.pb.h>
+
#include "LoggerImpl.h"
+#include "rocketmq/ErrorCode.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -63,26 +66,71 @@ void ReceiveMessageStreamReader::OnReadDone(bool ok) {
case rmq::Code::OK: {
break;
}
+ case rmq::Code::ILLEGAL_TOPIC: {
+ ec_ = ErrorCode::IllegalTopic;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+ ec_ = ErrorCode::IllegalConsumerGroup;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_FILTER_EXPRESSION: {
+ ec_ = ErrorCode::IllegalFilterExpression;
+ break;
+ }
+
+ case rmq::Code::CLIENT_ID_REQUIRED: {
+ ec_ = ErrorCode::InternalClientError;
+ break;
+ }
+
case rmq::Code::TOPIC_NOT_FOUND: {
ec_ = ErrorCode::TopicNotFound;
break;
}
case rmq::Code::CONSUMER_GROUP_NOT_FOUND: {
- ec_ = ErrorCode::GroupNotFound;
+ ec_ = ErrorCode::ConsumerGroupNotFound;
break;
}
+
case rmq::Code::TOO_MANY_REQUESTS: {
- ec_ = ErrorCode::TooManyRequest;
+ ec_ = ErrorCode::TooManyRequests;
break;
}
+
case rmq::Code::MESSAGE_NOT_FOUND: {
ec_ = ErrorCode::NoContent;
break;
}
- default:
+
+ case rmq::Code::UNAUTHORIZED: {
+ ec_ = ErrorCode::Unauthorized;
+ break;
+ }
+
+ case rmq::Code::FORBIDDEN: {
+ ec_ = ErrorCode::Forbidden;
+ break;
+ }
+
+ case rmq::Code::INTERNAL_SERVER_ERROR: {
+ ec_ = ErrorCode::InternalServerError;
+ break;
+ }
+
+ case rmq::Code::PROXY_TIMEOUT: {
+ ec_ = ErrorCode::GatewayTimeout;
+ break;
+ }
+
+ default: {
+ ec_ = ErrorCode::NotSupported;
SPDLOG_WARN("Unsupported code={}", response_.status().code());
break;
+ }
}
break;
}
diff --git a/cpp/src/main/cpp/client/RpcClientImpl.cpp b/cpp/src/main/cpp/client/RpcClientImpl.cpp
index 623547e..35016c3 100644
--- a/cpp/src/main/cpp/client/RpcClientImpl.cpp
+++ b/cpp/src/main/cpp/client/RpcClientImpl.cpp
@@ -21,14 +21,12 @@
#include <sstream>
#include <thread>
-#include "absl/time/time.h"
-
#include "ClientManager.h"
#include "ReceiveMessageStreamReader.h"
#include "RpcClient.h"
#include "TelemetryBidiReactor.h"
#include "TlsHelper.h"
-#include "include/ReceiveMessageContext.h"
+#include "absl/time/time.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp b/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp
index cf251e4..7479ba2 100644
--- a/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp
+++ b/cpp/src/main/cpp/client/TelemetryBidiReactor.cpp
@@ -17,6 +17,7 @@
#include "TelemetryBidiReactor.h"
#include <atomic>
+#include <cstdint>
#include <memory>
#include <utility>
@@ -47,7 +48,8 @@ TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
}
TelemetryBidiReactor::~TelemetryBidiReactor() {
- SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", peer_address_, stream_state_);
+ SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", peer_address_,
+ static_cast<std::uint8_t>(stream_state_));
}
bool TelemetryBidiReactor::await() {
diff --git a/cpp/src/main/cpp/client/include/ClientConfig.h b/cpp/src/main/cpp/client/include/ClientConfig.h
index 66ca230..bade9a4 100644
--- a/cpp/src/main/cpp/client/include/ClientConfig.h
+++ b/cpp/src/main/cpp/client/include/ClientConfig.h
@@ -21,12 +21,12 @@
#include <string>
#include <vector>
-#include "absl/container/flat_hash_map.h"
-#include "absl/time/time.h"
-
#include "Protocol.h"
#include "RetryPolicy.h"
+#include "absl/container/flat_hash_map.h"
+#include "absl/time/time.h"
#include "rocketmq/CredentialsProvider.h"
+#include "rocketmq/Tracing.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -61,6 +61,7 @@ struct ClientConfig {
PublisherConfig publisher;
SubscriberConfig subscriber;
Metric metric;
+ std::unique_ptr<opencensus::trace::Sampler> sampler_;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/client/include/RpcClient.h b/cpp/src/main/cpp/client/include/RpcClient.h
index 2fc8448..fbb3017 100644
--- a/cpp/src/main/cpp/client/include/RpcClient.h
+++ b/cpp/src/main/cpp/client/include/RpcClient.h
@@ -21,15 +21,12 @@
#include <memory>
#include <string>
-#include "ReceiveMessageResult.h"
+#include "Protocol.h"
+#include "ReceiveMessageContext.h"
#include "absl/container/flat_hash_map.h"
#include "absl/strings/string_view.h"
#include "grpcpp/grpcpp.h"
-#include "InvocationContext.h"
-#include "Protocol.h"
-#include "ReceiveMessageContext.h"
-
ROCKETMQ_NAMESPACE_BEGIN
using Channel = grpc::Channel;
diff --git a/cpp/src/main/cpp/client/include/RpcClientImpl.h b/cpp/src/main/cpp/client/include/RpcClientImpl.h
index 6406c74..35316ec 100644
--- a/cpp/src/main/cpp/client/include/RpcClientImpl.h
+++ b/cpp/src/main/cpp/client/include/RpcClientImpl.h
@@ -18,14 +18,11 @@
#include <memory>
-#include "InvocationContext.h"
-#include "ReceiveMessageCallback.h"
-#include "ReceiveMessageContext.h"
-#include "absl/container/flat_hash_map.h"
-
#include "Client.h"
#include "ClientManager.h"
+#include "ReceiveMessageContext.h"
#include "RpcClient.h"
+#include "absl/container/flat_hash_map.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp b/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp
index 0eeb08e..70ab77b 100644
--- a/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp
@@ -22,13 +22,13 @@
#include <system_error>
#include <utility>
+#include "AsyncReceiveMessageCallback.h"
#include "ClientManagerImpl.h"
#include "MetadataConstants.h"
#include "Protocol.h"
#include "PushConsumerImpl.h"
#include "ReceiveMessageResult.h"
#include "Signature.h"
-#include "include/AsyncReceiveMessageCallback.h"
#include "rocketmq/MessageListener.h"
using namespace std::chrono;
@@ -113,7 +113,7 @@ void ProcessQueueImpl::popMessage() {
std::weak_ptr<AsyncReceiveMessageCallback> cb{receive_callback_};
auto callback = [cb](const std::error_code& ec, const ReceiveMessageResult& result) {
- auto recv_cb = cb.lock();
+ std::shared_ptr<AsyncReceiveMessageCallback> recv_cb = cb.lock();
if (recv_cb) {
recv_cb->onCompletion(ec, result);
}
diff --git a/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp b/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
index 4b6c3f0..65cc5ba 100644
--- a/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -290,16 +290,17 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) {
{
// Trace Send RPC
- if (context->message_->traceContext().has_value()) {
+ if (context->message_->traceContext().has_value() && client_config_.sampler_) {
auto span_context =
opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext().value());
auto span = opencensus::trace::Span::BlankSpan();
std::string span_name = resourceNamespace() + "/" + context->message_->topic() + " " +
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION;
if (span_context.IsValid()) {
- span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {traceSampler()});
+ span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context,
+ {client_config_.sampler_.get()});
} else {
- span = opencensus::trace::Span::StartSpan(span_name, nullptr, {traceSampler()});
+ span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()});
}
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION);
@@ -380,7 +381,7 @@ bool ProducerImpl::endTransaction0(const Transaction& transaction, TransactionSt
bool completed = false;
bool success = false;
auto span = opencensus::trace::Span::BlankSpan();
- if (!transaction.traceContext().empty()) {
+ if (!transaction.traceContext().empty() && client_config_.sampler_) {
// Trace transactional message
opencensus::trace::SpanContext span_context =
opencensus::trace::propagation::FromTraceParentHeader(transaction.traceContext());
@@ -389,9 +390,9 @@ bool ProducerImpl::endTransaction0(const Transaction& transaction, TransactionSt
: MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION;
std::string span_name = resourceNamespace() + "/" + transaction.topic() + " " + trace_operation_name;
if (span_context.IsValid()) {
- span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {traceSampler()});
+ span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {client_config_.sampler_.get()});
} else {
- span = opencensus::trace::Span::StartSpan(span_name, nullptr, {traceSampler()});
+ span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()});
}
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, trace_operation_name);
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION, trace_operation_name);
diff --git a/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp b/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
index 89dc112..2acff23 100644
--- a/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
@@ -550,7 +550,7 @@ void PushConsumerImpl::onVerifyMessage(MessageConstSharedPtr message, std::funct
cmd.mutable_status()->set_message("Unexpected exception raised");
}
} else {
- cmd.mutable_status()->set_code(rmq::Code::VERIFY_MESSAGE_FORBIDDEN);
+ cmd.mutable_status()->set_code(rmq::Code::VERIFY_FIFO_MESSAGE_UNSUPPORTED);
cmd.mutable_status()->set_message("Unsupported Operation For FIFO Message");
}
} else {
diff --git a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
index ff61af3..c136cc9 100644
--- a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -110,6 +110,12 @@ public:
virtual void buildClientSettings(rmq::Settings& settings) {
}
+ void registerTracingSampler(TracingSamplerProvider *provider) {
+ if (provider) {
+ client_config_.sampler_ = provider->tracingSampler();
+ }
+ }
+
protected:
ClientConfig client_config_;
diff --git a/cpp/src/main/cpp/stats/PublishStats.cpp b/cpp/src/main/cpp/stats/PublishStats.cpp
index 37ce613..a012515 100644
--- a/cpp/src/main/cpp/stats/PublishStats.cpp
+++ b/cpp/src/main/cpp/stats/PublishStats.cpp
@@ -22,7 +22,7 @@ ROCKETMQ_NAMESPACE_BEGIN
PublishStats::PublishStats()
: success_(opencensus::stats::MeasureInt64::Register("publish_success", "Number of message published", "1")),
- failure_(opencensus::stats::MeasureInt64::Register("pubish_failure", "Number of publish failures", "1")),
+ failure_(opencensus::stats::MeasureInt64::Register("publish_failure", "Number of publish failures", "1")),
latency_(opencensus::stats::MeasureInt64::Register("publish_latency", "Publish latency in milliseconds", "ms")) {
opencensus::stats::ViewDescriptor()
.set_name("rocketmq_send_success_total")