You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/01 08:04:15 UTC

[rocketmq-clients] 02/18: Complete error handling of QueryRoute

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit acee141965ab701bdc2bc858a2715cb8d4b09b32
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jun 30 15:10:57 2022 +0800

    Complete error handling of QueryRoute
---
 cpp/api/rocketmq/ErrorCode.h                       | 95 +++++++++++++++++-----
 cpp/src/main/cpp/base/ErrorCategory.cpp            |  9 +-
 cpp/src/main/cpp/client/ClientManagerImpl.cpp      | 52 +++++++++---
 .../main/cpp/client/ReceiveMessageStreamReader.cpp |  2 +-
 4 files changed, 120 insertions(+), 38 deletions(-)

diff --git a/cpp/api/rocketmq/ErrorCode.h b/cpp/api/rocketmq/ErrorCode.h
index 5068c83..14ec504 100644
--- a/cpp/api/rocketmq/ErrorCode.h
+++ b/cpp/api/rocketmq/ErrorCode.h
@@ -33,6 +33,11 @@ enum class ErrorCode : int {
    */
   IllegalState = 1,
 
+  /**
+   * @brief To publish FIFO messages, only synchronous API is supported.
+   */
+  BadRequestAsyncPubFifoMessage = 10001,
+
   /**
    * @brief Broker has processed the request but is not going to return any content.
    */
@@ -45,53 +50,92 @@ enum class ErrorCode : int {
   BadConfiguration = 300,
 
   /**
-   * @brief The server cannot process the request due to apprent client-side
+   * @brief Generic code, representing multiple return results.
+   *
+   */
+  MultipleResults = 30000,
+
+  /**
+   * @brief The server cannot process the request due to apparent client-side
    * error. For example, topic contains invalid character or is excessively
    * long.
    *
    */
-  BadRequest = 400,
+  BadRequest = 40000,
 
   /**
-   * @brief To publish FIFO messages, only synchronous API is supported.
+   * @brief The access point is illegal
+   *
+   */
+  IllegalAccessPoint = 40001,
+
+  IllegalTopic = 40002,
+  IllegalConsumerGroup = 40003,
+  IllegalMessageTag = 40004,
+  IllegalMessageKey = 40005,
+  IllegalMessageGroup = 40006,
+  IllegalMessageProperty = 40007,
+  InvalidTransactionId = 40008,
+  IllegalMessageId = 40009,
+  IllegalFilterExpression = 40010,
+  InvalidReceiptHandle = 40011,
+
+  /**
+   * @brief Message property conflicts with its type.
    */
-  BadRequestAsyncPubFifoMessage = 40001,
+  MessagePropertyConflictWithType = 40012,
+
+  // Client type is not recognized by server
+  UnsupportedClientType = 40013,
+
+  // Received message is corrupted, generally failing to pass integrity checksum validation.
+  MessageCorrupted = 40014,
+
+  // Request is rejected due to missing of x-mq-client-id HTTP header in the metadata frame.
+  ClientIdRequired = 40015,
 
   /**
    * @brief Authentication failed. Possibly caused by invalid credentials.
    *
    */
-  Unauthorized = 401,
+  Unauthorized = 40100,
 
   /**
    * @brief Credentials are understood by server but authenticated user does not
    * have privilege to perform the requested action.
    *
    */
-  Forbidden = 403,
+  Forbidden = 40300,
 
   /**
    * @brief Topic not found, which should be created through console or
    * administration API before hand.
    *
    */
-  NotFound = 404,
+  NotFound = 40400,
+
+  MessageNotFound = 40401,
 
-  TopicNotFound = 404001,
+  TopicNotFound = 404002,
 
-  GroupNotFound = 404002,
+  ConsumerGroupNotFound = 404003,
 
   /**
    * @brief Timeout when connecting, reading from or writing to brokers.
    *
    */
-  RequestTimeout = 408,
+  RequestTimeout = 40800,
 
   /**
    * @brief Message body is too large.
    *
    */
-  PayloadTooLarge = 413,
+  PayloadTooLarge = 41300,
+
+  /**
+   * @brief Message body size exceeds limited allowed by server.
+   */
+  MessageBodyTooLarge = 41301,
 
   /**
    * @brief When trying to perform an action whose dependent procedure state is
@@ -100,14 +144,14 @@ enum class ErrorCode : int {
    * 2. Commit/Rollback a transactional message that does not exist;
    * 3. Commit an offset which is greater than maximum of partition;
    */
-  PreconditionRequired = 428,
+  PreconditionRequired = 42800,
 
   /**
    * @brief Quota exchausted. The user has sent too many requests in a given
    * amount of time.
    *
    */
-  TooManyRequest = 429,
+  TooManyRequest = 42900,
 
   /**
    * @brief The server is unwilling to process the request because either an
@@ -115,62 +159,69 @@ enum class ErrorCode : int {
    * large
    *
    */
-  HeaderFieldsTooLarge = 431,
+  HeaderFieldsTooLarge = 43100,
+
+  // Message properties total size exceeds the threshold.
+  MessagePropertiesTooLarge = 43101,
 
   /**
    * @brief A server operator has received a legal demand to deny access to a
    * resource or to a set of resources that includes the requested resource.
    *
    */
-  UnavailableForLegalReasons = 451,
+  UnavailableForLegalReasons = 45100,
 
   /**
    * @brief Server side interval error
    *
    */
-  InternalServerError = 500,
+  InternalServerError = 50000,
 
   /**
    * @brief The server either does not recognize the request method, or it lacks
    * the ability to fulfil the request.
    *
    */
-  NotImplemented = 501,
+  NotImplemented = 50100,
 
   /**
    * @brief The server was acting as a gateway or proxy and received an invalid
    * response from the upstream server.
    *
    */
-  BadGateway = 502,
+  BadGateway = 50200,
 
   /**
    * @brief The server cannot handle the request (because it is overloaded or
    * down for maintenance). Generally, this is a temporary state.
    *
    */
-  ServiceUnavailable = 503,
+  ServiceUnavailable = 50300,
 
   /**
    * @brief The server was acting as a gateway or proxy and did not receive a
    * timely response from the upstream server.
    *
    */
-  GatewayTimeout = 504,
+  GatewayTimeout = 50400,
 
   /**
    * @brief The server does not support the protocol version used in the
    * request.
    *
    */
-  ProtocolVersionNotSupported = 505,
+  NotSupported = 50500,
+
+  ProtocolUnsupported = 50501,
+
+  VerifyFifoMessageUnsupported = 50502,
 
   /**
    * @brief The server is unable to store the representation needed to complete
    * the request.
    *
    */
-  InsufficientStorage = 507,
+  InsufficientStorage = 50700,
 };
 
 std::error_code make_error_code(ErrorCode code);
diff --git a/cpp/src/main/cpp/base/ErrorCategory.cpp b/cpp/src/main/cpp/base/ErrorCategory.cpp
index 4fe2ea0..4f1b290 100644
--- a/cpp/src/main/cpp/base/ErrorCategory.cpp
+++ b/cpp/src/main/cpp/base/ErrorCategory.cpp
@@ -53,7 +53,7 @@ std::string ErrorCategory::message(int code) const {
     case ErrorCode::TopicNotFound:
       return "Topic is not found. Verify the request topic has already been created through console or management API";
 
-    case ErrorCode::GroupNotFound:
+    case ErrorCode::ConsumerGroupNotFound:
       return "Group is not found. Verify the request group has already been created through console or management API";
 
     case ErrorCode::RequestTimeout:
@@ -100,17 +100,16 @@ std::string ErrorCategory::message(int code) const {
              "a timely response from the upstream "
              "server.";
 
-    case ErrorCode::ProtocolVersionNotSupported:
+    case ErrorCode::ProtocolUnsupported:
       return "The server does not support the protocol version used in the "
              "request.";
 
     case ErrorCode::InsufficientStorage:
       return "The server is unable to store the representation needed to "
              "complete the request.";
-
-    default:
-      return "Not-Implemented";
   }
+
+  return "";
 }
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
index 0c6e155..05a670b 100644
--- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
@@ -478,32 +478,64 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metad
         }
         auto ptr = std::make_shared<TopicRouteData>(std::move(message_queues));
         cb(ec, ptr);
-      } break;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_ACCESS_POINT: {
+        SPDLOG_WARN("IllegalAccessPoint: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalAccessPoint;
+        cb(ec, nullptr);
+        break;
+      }
+
       case rmq::Code::UNAUTHORIZED: {
         SPDLOG_WARN("Unauthorized: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Unauthorized;
         cb(ec, nullptr);
-      } break;
-      case rmq::Code::FORBIDDEN: {
-        SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::Forbidden;
-        cb(ec, nullptr);
-      } break;
+        break;
+      }
+
       case rmq::Code::TOPIC_NOT_FOUND: {
         SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::NotFound;
         cb(ec, nullptr);
-      } break;
+        break;
+      }
+
+      case rmq::Code::TOO_MANY_REQUESTS: {
+        SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::TooManyRequest;
+        cb(ec, nullptr);
+        break;
+      }
+
+      case rmq::Code::CLIENT_ID_REQUIRED: {
+        SPDLOG_ERROR("ClientIdRequired: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::ClientIdRequired;
+        cb(ec, nullptr);
+        break;
+      }
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::InternalServerError;
         cb(ec, nullptr);
-      } break;
+        break;
+      }
+
+      case rmq::Code::PROXY_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::GatewayTimeout;
+        cb(ec, nullptr);
+        break;
+      }
+
       default: {
         SPDLOG_WARN("NotImplement: Please upgrade to latest SDK release. Host={}", invocation_context->remote_address);
         ec = ErrorCode::NotImplemented;
         cb(ec, nullptr);
-      } break;
+        break;
+      }
     }
   };
   invocation_context->callback = callback;
diff --git a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
index 223c879..4667de4 100644
--- a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
@@ -69,7 +69,7 @@ void ReceiveMessageStreamReader::OnReadDone(bool ok) {
         }
 
         case rmq::Code::CONSUMER_GROUP_NOT_FOUND: {
-          ec_ = ErrorCode::GroupNotFound;
+          ec_ = ErrorCode::ConsumerGroupNotFound;
           break;
         }
         case rmq::Code::TOO_MANY_REQUESTS: {