You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/30 08:53:17 UTC
[rocketmq-clients] branch cpp updated: Finish error handling of SendMessage
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/cpp by this push:
new 9d6b5ed Finish error handling of SendMessage
9d6b5ed is described below
commit 9d6b5ed8aa1fd62acc984aee1b47c3af11c41123
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jun 30 16:53:08 2022 +0800
Finish error handling of SendMessage
---
cpp/api/rocketmq/ErrorCode.h | 2 +-
cpp/proto/apache/rocketmq/v2/definition.proto | 55 +++++++----
cpp/src/main/cpp/base/ErrorCategory.cpp | 22 ++++-
cpp/src/main/cpp/client/ClientManagerImpl.cpp | 109 +++++++++++++++++++--
.../main/cpp/client/ReceiveMessageStreamReader.cpp | 2 +-
5 files changed, 158 insertions(+), 32 deletions(-)
diff --git a/cpp/api/rocketmq/ErrorCode.h b/cpp/api/rocketmq/ErrorCode.h
index 14ec504..e911533 100644
--- a/cpp/api/rocketmq/ErrorCode.h
+++ b/cpp/api/rocketmq/ErrorCode.h
@@ -151,7 +151,7 @@ enum class ErrorCode : int {
* amount of time.
*
*/
- TooManyRequest = 42900,
+ TooManyRequests = 42900,
/**
* @brief The server is unwilling to process the request because either an
diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto b/cpp/proto/apache/rocketmq/v2/definition.proto
index ab1fd09..67520fe 100644
--- a/cpp/proto/apache/rocketmq/v2/definition.proto
+++ b/cpp/proto/apache/rocketmq/v2/definition.proto
@@ -316,31 +316,30 @@ enum Code {
ILLEGAL_MESSAGE_GROUP = 40006;
// Format of message property key is illegal.
ILLEGAL_MESSAGE_PROPERTY_KEY = 40007;
- // Message properties total size exceeds the threshold.
- MESSAGE_PROPERTIES_TOO_LARGE = 40008;
- // Message body size exceeds the threshold.
- MESSAGE_BODY_TOO_LARGE = 40009;
// Transaction id is invalid.
- INVALID_TRANSACTION_ID = 40010;
+ INVALID_TRANSACTION_ID = 40008;
// Format of message id is illegal.
- ILLEGAL_MESSAGE_ID = 40011;
+ ILLEGAL_MESSAGE_ID = 40009;
// Format of filter expression is illegal.
- ILLEGAL_FILTER_EXPRESSION = 40012;
+ ILLEGAL_FILTER_EXPRESSION = 40010;
// Receipt handle of message is invalid.
- INVALID_RECEIPT_HANDLE = 40013;
- // Message property is not match the message type.
- MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 40014;
+ INVALID_RECEIPT_HANDLE = 40011;
+ // Message property conflicts with its type.
+ MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40012;
// Client type could not be recognized.
- UNRECOGNIZED_CLIENT_TYPE = 40015;
+ UNRECOGNIZED_CLIENT_TYPE = 40013;
// Message is corrupted.
- MESSAGE_CORRUPTED = 40016;
+ MESSAGE_CORRUPTED = 40014;
// Request is rejected due to missing of x-mq-client-id header.
- CLIENT_ID_REQUIRED = 40017;
+ CLIENT_ID_REQUIRED = 40015;
// Generic code indicates that the client request lacks valid authentication
// credentials for the requested resource.
UNAUTHORIZED = 40100;
+ // Generic code indicates that the account is suspended due to overdue of payment.
+ PAYMENT_REQUIRED = 40200;
+
// Generic code for the case that user does not have the permission to operate.
FORBIDDEN = 40300;
@@ -353,10 +352,29 @@ enum Code {
// Consumer group resource does not exist.
CONSUMER_GROUP_NOT_FOUND = 40403;
+ // Generic code representing client side timeout when connecting to, reading data from, or write data to server.
+ REQUEST_TIMEOUT = 40800;
+
+ // Generic code represents that the request entity is larger than limits defined by server.
+ PAYLOAD_TOO_LARGE = 41300;
+ // Message body size exceeds the threshold.
+ MESSAGE_BODY_TOO_LARGE = 41301;
+
+ // Generic code for use cases where pre-conditions are not met.
+ // For example, if a producer instance is used to publish messages without prior start() invocation,
+ // this error code will be raised.
+ PRECONDITION_FAILED = 42800;
+
// Generic code indicates that too many requests are made in short period of duration.
// Requests are throttled.
TOO_MANY_REQUESTS = 42900;
+ // Generic code for the case that the server is unwilling to process the request because its header fields are too large.
+ // The request may be resubmitted after reducing the size of the request header fields.
+ REQUEST_HEADER_FIELDS_TOO_LARGE = 43100;
+ // Message properties total size exceeds the threshold.
+ MESSAGE_PROPERTIES_TOO_LARGE = 43101;
+
// Generic code indicates that server/client encountered an unexpected
// condition that prevented it from fulfilling the request.
INTERNAL_ERROR = 50000;
@@ -377,17 +395,16 @@ enum Code {
// functionality required to fulfill the request.
NOT_IMPLEMENTED = 50100;
- // Generic code for timeout.
- TIMEOUT = 50400;
+ // Generic code represents that the server, which acts as a gateway or proxy,
+ // does not get an satisfied response in time from its upstream servers.
+ // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
+ PROXY_TIMEOUT = 50400;
// Message persistence timeout.
MASTER_PERSISTENCE_TIMEOUT = 50401;
// Slave persistence timeout.
SLAVE_PERSISTENCE_TIMEOUT = 50402;
- // Code indicates that the server, while acting as a gateway or proxy,
- // did not get a response in time from the upstream server that
- // it needed in order to complete the request.
- PROXY_TIMEOUT = 50403;
+ // Generic code for unsupported operation.
UNSUPPORTED = 50500;
// Operation is not allowed in current version.
VERSION_UNSUPPORTED = 50501;
diff --git a/cpp/src/main/cpp/base/ErrorCategory.cpp b/cpp/src/main/cpp/base/ErrorCategory.cpp
index 4f1b290..5ca2528 100644
--- a/cpp/src/main/cpp/base/ErrorCategory.cpp
+++ b/cpp/src/main/cpp/base/ErrorCategory.cpp
@@ -16,6 +16,8 @@
*/
#include "rocketmq/ErrorCategory.h"
+#include "rocketmq/ErrorCode.h"
+
ROCKETMQ_NAMESPACE_BEGIN
std::string ErrorCategory::message(int code) const {
@@ -65,7 +67,7 @@ std::string ErrorCategory::message(int code) const {
case ErrorCode::PreconditionRequired:
return "State of dependent procedure is not right";
- case ErrorCode::TooManyRequest:
+ case ErrorCode::TooManyRequests:
return "Quota exchausted. The user has sent too many requests in a given "
"amount of time.";
@@ -107,6 +109,24 @@ std::string ErrorCategory::message(int code) const {
case ErrorCode::InsufficientStorage:
return "The server is unable to store the representation needed to "
"complete the request.";
+
+ case ErrorCode::MultipleResults:
+ return "Multiple results are available";
+
+ case ErrorCode::IllegalAccessPoint:
+ return "Access point is either malformed or invalid";
+
+ case ErrorCode::IllegalTopic: {
+ return "Topic is illegal either due to length is too long or invalid character is included";
+ }
+
+ case ErrorCode::IllegalConsumerGroup: {
+ return "ConsumerGroup is illegal due to its length is too long or invalid character is included";
+ }
+
+ case ErrorCode::IllegalMessageTag: {
+ return "Format of message tag is illegal.";
+ }
}
return "";
diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
index e4be88b..b6e8a2e 100644
--- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
@@ -242,7 +242,7 @@ void ClientManagerImpl::heartbeat(const std::string& target_host, const Metadata
case rmq::Code::TOO_MANY_REQUESTS: {
SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::TooManyRequest;
+ ec = ErrorCode::TooManyRequests;
cb(ec, invocation_context->response);
break;
}
@@ -352,33 +352,122 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met
send_receipt.message_id = invocation_context->response.entries().begin()->message_id();
break;
}
+
+ case rmq::Code::ILLEGAL_TOPIC: {
+ SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalTopic;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_MESSAGE_TAG: {
+ SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalMessageTag;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_MESSAGE_KEY: {
+ SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalMessageKey;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_MESSAGE_GROUP: {
+ SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalMessageGroup;
+ break;
+ }
+
+ case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: {
+ SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::IllegalMessageProperty;
+ break;
+ }
+
+ case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: {
+ SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::MessagePropertiesTooLarge;
+ break;
+ }
+
+ case rmq::Code::MESSAGE_BODY_TOO_LARGE: {
+ SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::MessageBodyTooLarge;
+ break;
+ }
+
case rmq::Code::TOPIC_NOT_FOUND: {
- SPDLOG_WARN("TopicNotFound: {}", status.message());
+ SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::TopicNotFound;
+ break;
+ }
+
+ case rmq::Code::NOT_FOUND: {
+ SPDLOG_WARN("NotFound: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::NotFound;
break;
}
+
case rmq::Code::UNAUTHORIZED: {
- SPDLOG_WARN("Unauthenticated: {}", status.message());
+ SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Unauthorized;
break;
}
+
case rmq::Code::FORBIDDEN: {
- SPDLOG_WARN("Forbidden: {}", status.message());
+ SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::Forbidden;
- cb(ec, send_receipt);
break;
}
+
+ case rmq::Code::MESSAGE_CORRUPTED: {
+ SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::MessageCorrupted;
+ break;
+ }
+
+ case rmq::Code::TOO_MANY_REQUESTS: {
+ SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::TooManyRequests;
+ break;
+ }
+
case rmq::Code::INTERNAL_SERVER_ERROR: {
- SPDLOG_WARN("InternalServerError: {}", status.message());
+ SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
ec = ErrorCode::InternalServerError;
break;
}
+
+ case rmq::Code::HA_NOT_AVAILABLE: {
+ SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::InternalServerError;
+ break;
+ }
+
+ case rmq::Code::PROXY_TIMEOUT: {
+ SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::GatewayTimeout;
+ break;
+ }
+
+ case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: {
+ SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::GatewayTimeout;
+ break;
+ }
+
+ case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: {
+ SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+ ec = ErrorCode::GatewayTimeout;
+ break;
+ }
+
default: {
- SPDLOG_WARN("Unsupported status code. Check and upgrade SDK to the latest");
- ec = ErrorCode::NotImplemented;
+ SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest. Host={}", invocation_context->remote_address);
+ ec = ErrorCode::NotSupported;
break;
}
}
+
cb(ec, send_receipt);
};
@@ -537,7 +626,7 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metad
case rmq::Code::TOO_MANY_REQUESTS: {
SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
- ec = ErrorCode::TooManyRequest;
+ ec = ErrorCode::TooManyRequests;
cb(ec, nullptr);
break;
}
@@ -1058,7 +1147,7 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe
ec = ErrorCode::ServiceUnavailable;
}
case rmq::Code::TOO_MANY_REQUESTS: {
- ec = ErrorCode::TooManyRequest;
+ ec = ErrorCode::TooManyRequests;
}
default: {
ec = ErrorCode::NotImplemented;
diff --git a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
index 4667de4..3ee8f68 100644
--- a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
@@ -73,7 +73,7 @@ void ReceiveMessageStreamReader::OnReadDone(bool ok) {
break;
}
case rmq::Code::TOO_MANY_REQUESTS: {
- ec_ = ErrorCode::TooManyRequest;
+ ec_ = ErrorCode::TooManyRequests;
break;
}
case rmq::Code::MESSAGE_NOT_FOUND: {