You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/30 08:53:17 UTC

[rocketmq-clients] branch cpp updated: Finish error handling of SendMessage

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/cpp by this push:
     new 9d6b5ed  Finish error handling of SendMessage
9d6b5ed is described below

commit 9d6b5ed8aa1fd62acc984aee1b47c3af11c41123
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jun 30 16:53:08 2022 +0800

    Finish error handling of SendMessage
---
 cpp/api/rocketmq/ErrorCode.h                       |   2 +-
 cpp/proto/apache/rocketmq/v2/definition.proto      |  55 +++++++----
 cpp/src/main/cpp/base/ErrorCategory.cpp            |  22 ++++-
 cpp/src/main/cpp/client/ClientManagerImpl.cpp      | 109 +++++++++++++++++++--
 .../main/cpp/client/ReceiveMessageStreamReader.cpp |   2 +-
 5 files changed, 158 insertions(+), 32 deletions(-)

diff --git a/cpp/api/rocketmq/ErrorCode.h b/cpp/api/rocketmq/ErrorCode.h
index 14ec504..e911533 100644
--- a/cpp/api/rocketmq/ErrorCode.h
+++ b/cpp/api/rocketmq/ErrorCode.h
@@ -151,7 +151,7 @@ enum class ErrorCode : int {
    * amount of time.
    *
    */
-  TooManyRequest = 42900,
+  TooManyRequests = 42900,
 
   /**
    * @brief The server is unwilling to process the request because either an
diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto b/cpp/proto/apache/rocketmq/v2/definition.proto
index ab1fd09..67520fe 100644
--- a/cpp/proto/apache/rocketmq/v2/definition.proto
+++ b/cpp/proto/apache/rocketmq/v2/definition.proto
@@ -316,31 +316,30 @@ enum Code {
   ILLEGAL_MESSAGE_GROUP = 40006;
   // Format of message property key is illegal.
   ILLEGAL_MESSAGE_PROPERTY_KEY = 40007;
-  // Message properties total size exceeds the threshold.
-  MESSAGE_PROPERTIES_TOO_LARGE = 40008;
-  // Message body size exceeds the threshold.
-  MESSAGE_BODY_TOO_LARGE = 40009;
   // Transaction id is invalid.
-  INVALID_TRANSACTION_ID = 40010;
+  INVALID_TRANSACTION_ID = 40008;
   // Format of message id is illegal.
-  ILLEGAL_MESSAGE_ID = 40011;
+  ILLEGAL_MESSAGE_ID = 40009;
   // Format of filter expression is illegal.
-  ILLEGAL_FILTER_EXPRESSION = 40012;
+  ILLEGAL_FILTER_EXPRESSION = 40010;
   // Receipt handle of message is invalid.
-  INVALID_RECEIPT_HANDLE = 40013;
-  // Message property is not match the message type.
-  MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 40014;
+  INVALID_RECEIPT_HANDLE = 40011;
+  // Message property conflicts with its type.
+  MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40012;
   // Client type could not be recognized.
-  UNRECOGNIZED_CLIENT_TYPE = 40015;
+  UNRECOGNIZED_CLIENT_TYPE = 40013;
   // Message is corrupted.
-  MESSAGE_CORRUPTED = 40016;
+  MESSAGE_CORRUPTED = 40014;
   // Request is rejected due to missing of x-mq-client-id header.
-  CLIENT_ID_REQUIRED = 40017;
+  CLIENT_ID_REQUIRED = 40015;
 
   // Generic code indicates that the client request lacks valid authentication
   // credentials for the requested resource.
   UNAUTHORIZED = 40100;
 
+  // Generic code indicates that the account is suspended due to overdue of payment.
+  PAYMENT_REQUIRED = 40200;
+
   // Generic code for the case that user does not have the permission to operate.
   FORBIDDEN = 40300;
 
@@ -353,10 +352,29 @@ enum Code {
   // Consumer group resource does not exist.
   CONSUMER_GROUP_NOT_FOUND = 40403;
 
+  // Generic code representing client side timeout when connecting to, reading data from, or write data to server.
+  REQUEST_TIMEOUT = 40800;
+
+  // Generic code represents that the request entity is larger than limits defined by server.
+  PAYLOAD_TOO_LARGE = 41300;
+  // Message body size exceeds the threshold.
+  MESSAGE_BODY_TOO_LARGE = 41301;
+
+  // Generic code for use cases where pre-conditions are not met.
+  // For example, if a producer instance is used to publish messages without prior start() invocation,
+  // this error code will be raised.
+  PRECONDITION_FAILED = 42800;
+
   // Generic code indicates that too many requests are made in short period of duration.
   // Requests are throttled.
   TOO_MANY_REQUESTS = 42900;
 
+  // Generic code for the case that the server is unwilling to process the request because its header fields are too large.
+  // The request may be resubmitted after reducing the size of the request header fields.
+  REQUEST_HEADER_FIELDS_TOO_LARGE = 43100;
+  // Message properties total size exceeds the threshold.
+  MESSAGE_PROPERTIES_TOO_LARGE = 43101;
+
   // Generic code indicates that server/client encountered an unexpected
   // condition that prevented it from fulfilling the request.
   INTERNAL_ERROR = 50000;
@@ -377,17 +395,16 @@ enum Code {
   // functionality required to fulfill the request.
   NOT_IMPLEMENTED = 50100;
 
-  // Generic code for timeout.
-  TIMEOUT = 50400;
+  // Generic code represents that the server, which acts as a gateway or proxy,
+  // does not get an satisfied response in time from its upstream servers.
+  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
+  PROXY_TIMEOUT = 50400;
   // Message persistence timeout.
   MASTER_PERSISTENCE_TIMEOUT = 50401;
   // Slave persistence timeout.
   SLAVE_PERSISTENCE_TIMEOUT = 50402;
-  // Code indicates that the server, while acting as a gateway or proxy,
-  // did not get a response in time from the upstream server that
-  // it needed in order to complete the request.
-  PROXY_TIMEOUT = 50403;
 
+  // Generic code for unsupported operation.
   UNSUPPORTED = 50500;
   // Operation is not allowed in current version.
   VERSION_UNSUPPORTED = 50501;
diff --git a/cpp/src/main/cpp/base/ErrorCategory.cpp b/cpp/src/main/cpp/base/ErrorCategory.cpp
index 4f1b290..5ca2528 100644
--- a/cpp/src/main/cpp/base/ErrorCategory.cpp
+++ b/cpp/src/main/cpp/base/ErrorCategory.cpp
@@ -16,6 +16,8 @@
  */
 #include "rocketmq/ErrorCategory.h"
 
+#include "rocketmq/ErrorCode.h"
+
 ROCKETMQ_NAMESPACE_BEGIN
 
 std::string ErrorCategory::message(int code) const {
@@ -65,7 +67,7 @@ std::string ErrorCategory::message(int code) const {
     case ErrorCode::PreconditionRequired:
       return "State of dependent procedure is not right";
 
-    case ErrorCode::TooManyRequest:
+    case ErrorCode::TooManyRequests:
       return "Quota exchausted. The user has sent too many requests in a given "
              "amount of time.";
 
@@ -107,6 +109,24 @@ std::string ErrorCategory::message(int code) const {
     case ErrorCode::InsufficientStorage:
       return "The server is unable to store the representation needed to "
              "complete the request.";
+
+    case ErrorCode::MultipleResults:
+      return "Multiple results are available";
+
+    case ErrorCode::IllegalAccessPoint:
+      return "Access point is either malformed or invalid";
+
+    case ErrorCode::IllegalTopic: {
+      return "Topic is illegal either due to length is too long or invalid character is included";
+    }
+
+    case ErrorCode::IllegalConsumerGroup: {
+      return "ConsumerGroup is illegal due to its length is too long or invalid character is included";
+    }
+
+    case ErrorCode::IllegalMessageTag: {
+      return "Format of message tag is illegal.";
+    }
   }
 
   return "";
diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
index e4be88b..b6e8a2e 100644
--- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
@@ -242,7 +242,7 @@ void ClientManagerImpl::heartbeat(const std::string& target_host, const Metadata
 
       case rmq::Code::TOO_MANY_REQUESTS: {
         SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::TooManyRequest;
+        ec = ErrorCode::TooManyRequests;
         cb(ec, invocation_context->response);
         break;
       }
@@ -352,33 +352,122 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met
         send_receipt.message_id = invocation_context->response.entries().begin()->message_id();
         break;
       }
+
+      case rmq::Code::ILLEGAL_TOPIC: {
+        SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalTopic;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_MESSAGE_TAG: {
+        SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalMessageTag;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_MESSAGE_KEY: {
+        SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalMessageKey;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_MESSAGE_GROUP: {
+        SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalMessageGroup;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: {
+        SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalMessageProperty;
+        break;
+      }
+
+      case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: {
+        SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::MessagePropertiesTooLarge;
+        break;
+      }
+
+      case rmq::Code::MESSAGE_BODY_TOO_LARGE: {
+        SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::MessageBodyTooLarge;
+        break;
+      }
+
       case rmq::Code::TOPIC_NOT_FOUND: {
-        SPDLOG_WARN("TopicNotFound: {}", status.message());
+        SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::TopicNotFound;
+        break;
+      }
+
+      case rmq::Code::NOT_FOUND: {
+        SPDLOG_WARN("NotFound: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::NotFound;
         break;
       }
+
       case rmq::Code::UNAUTHORIZED: {
-        SPDLOG_WARN("Unauthenticated: {}", status.message());
+        SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Unauthorized;
         break;
       }
+        
       case rmq::Code::FORBIDDEN: {
-        SPDLOG_WARN("Forbidden: {}", status.message());
+        SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Forbidden;
-        cb(ec, send_receipt);
         break;
       }
+
+      case rmq::Code::MESSAGE_CORRUPTED: {
+        SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::MessageCorrupted;
+        break;
+      }
+
+      case rmq::Code::TOO_MANY_REQUESTS: {
+        SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::TooManyRequests;
+        break;
+      }
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
-        SPDLOG_WARN("InternalServerError: {}", status.message());
+        SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::InternalServerError;
         break;
       }
+
+      case rmq::Code::HA_NOT_AVAILABLE: {
+        SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::InternalServerError;
+        break;
+      }
+
+      case rmq::Code::PROXY_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::GatewayTimeout;
+        break;
+      }
+
+      case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::GatewayTimeout;
+        break;
+      }
+
+      case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::GatewayTimeout;
+        break;
+      }
+
       default: {
-        SPDLOG_WARN("Unsupported status code. Check and upgrade SDK to the latest");
-        ec = ErrorCode::NotImplemented;
+        SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest. Host={}", invocation_context->remote_address);
+        ec = ErrorCode::NotSupported;
         break;
       }
     }
+    
     cb(ec, send_receipt);
   };
 
@@ -537,7 +626,7 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metad
 
       case rmq::Code::TOO_MANY_REQUESTS: {
         SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::TooManyRequest;
+        ec = ErrorCode::TooManyRequests;
         cb(ec, nullptr);
         break;
       }
@@ -1058,7 +1147,7 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe
         ec = ErrorCode::ServiceUnavailable;
       }
       case rmq::Code::TOO_MANY_REQUESTS: {
-        ec = ErrorCode::TooManyRequest;
+        ec = ErrorCode::TooManyRequests;
       }
       default: {
         ec = ErrorCode::NotImplemented;
diff --git a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
index 4667de4..3ee8f68 100644
--- a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
@@ -73,7 +73,7 @@ void ReceiveMessageStreamReader::OnReadDone(bool ok) {
           break;
         }
         case rmq::Code::TOO_MANY_REQUESTS: {
-          ec_ = ErrorCode::TooManyRequest;
+          ec_ = ErrorCode::TooManyRequests;
           break;
         }
         case rmq::Code::MESSAGE_NOT_FOUND: {