You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/30 07:35:21 UTC

[rocketmq-clients] branch cpp created (now 5bc970f)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a change to branch cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


      at 5bc970f  Complete error handling of QueryRoute

This branch includes the following new commits:

     new 979c226  Sync and update protobuf files to v2.0-alpha
     new 5bc970f  Complete error handling of QueryRoute

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[rocketmq-clients] 02/02: Complete error handling of QueryRoute

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 5bc970f417148d937c2836a6d944c21da508e9d0
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jun 30 15:10:57 2022 +0800

    Complete error handling of QueryRoute
---
 cpp/api/rocketmq/ErrorCode.h                       | 95 +++++++++++++++++-----
 cpp/src/main/cpp/base/ErrorCategory.cpp            |  9 +-
 cpp/src/main/cpp/client/ClientManagerImpl.cpp      | 52 +++++++++---
 .../main/cpp/client/ReceiveMessageStreamReader.cpp |  2 +-
 4 files changed, 120 insertions(+), 38 deletions(-)

diff --git a/cpp/api/rocketmq/ErrorCode.h b/cpp/api/rocketmq/ErrorCode.h
index 5068c83..14ec504 100644
--- a/cpp/api/rocketmq/ErrorCode.h
+++ b/cpp/api/rocketmq/ErrorCode.h
@@ -33,6 +33,11 @@ enum class ErrorCode : int {
    */
   IllegalState = 1,
 
+  /**
+   * @brief To publish FIFO messages, only synchronous API is supported.
+   */
+  BadRequestAsyncPubFifoMessage = 10001,
+
   /**
    * @brief Broker has processed the request but is not going to return any content.
    */
@@ -45,53 +50,92 @@ enum class ErrorCode : int {
   BadConfiguration = 300,
 
   /**
-   * @brief The server cannot process the request due to apprent client-side
+   * @brief Generic code, representing multiple return results.
+   *
+   */
+  MultipleResults = 30000,
+
+  /**
+   * @brief The server cannot process the request due to apparent client-side
    * error. For example, topic contains invalid character or is excessively
    * long.
    *
    */
-  BadRequest = 400,
+  BadRequest = 40000,
 
   /**
-   * @brief To publish FIFO messages, only synchronous API is supported.
+   * @brief The access point is illegal
+   *
+   */
+  IllegalAccessPoint = 40001,
+
+  IllegalTopic = 40002,
+  IllegalConsumerGroup = 40003,
+  IllegalMessageTag = 40004,
+  IllegalMessageKey = 40005,
+  IllegalMessageGroup = 40006,
+  IllegalMessageProperty = 40007,
+  InvalidTransactionId = 40008,
+  IllegalMessageId = 40009,
+  IllegalFilterExpression = 40010,
+  InvalidReceiptHandle = 40011,
+
+  /**
+   * @brief Message property conflicts with its type.
    */
-  BadRequestAsyncPubFifoMessage = 40001,
+  MessagePropertyConflictWithType = 40012,
+
+  // Client type is not recognized by server
+  UnsupportedClientType = 40013,
+
+  // Received message is corrupted, generally failing to pass integrity checksum validation.
+  MessageCorrupted = 40014,
+
+  // Request is rejected due to missing of x-mq-client-id HTTP header in the metadata frame.
+  ClientIdRequired = 40015,
 
   /**
    * @brief Authentication failed. Possibly caused by invalid credentials.
    *
    */
-  Unauthorized = 401,
+  Unauthorized = 40100,
 
   /**
    * @brief Credentials are understood by server but authenticated user does not
    * have privilege to perform the requested action.
    *
    */
-  Forbidden = 403,
+  Forbidden = 40300,
 
   /**
    * @brief Topic not found, which should be created through console or
    * administration API before hand.
    *
    */
-  NotFound = 404,
+  NotFound = 40400,
+
+  MessageNotFound = 40401,
 
-  TopicNotFound = 404001,
+  TopicNotFound = 404002,
 
-  GroupNotFound = 404002,
+  ConsumerGroupNotFound = 404003,
 
   /**
    * @brief Timeout when connecting, reading from or writing to brokers.
    *
    */
-  RequestTimeout = 408,
+  RequestTimeout = 40800,
 
   /**
    * @brief Message body is too large.
    *
    */
-  PayloadTooLarge = 413,
+  PayloadTooLarge = 41300,
+
+  /**
+   * @brief Message body size exceeds limited allowed by server.
+   */
+  MessageBodyTooLarge = 41301,
 
   /**
    * @brief When trying to perform an action whose dependent procedure state is
@@ -100,14 +144,14 @@ enum class ErrorCode : int {
    * 2. Commit/Rollback a transactional message that does not exist;
    * 3. Commit an offset which is greater than maximum of partition;
    */
-  PreconditionRequired = 428,
+  PreconditionRequired = 42800,
 
   /**
    * @brief Quota exchausted. The user has sent too many requests in a given
    * amount of time.
    *
    */
-  TooManyRequest = 429,
+  TooManyRequest = 42900,
 
   /**
    * @brief The server is unwilling to process the request because either an
@@ -115,62 +159,69 @@ enum class ErrorCode : int {
    * large
    *
    */
-  HeaderFieldsTooLarge = 431,
+  HeaderFieldsTooLarge = 43100,
+
+  // Message properties total size exceeds the threshold.
+  MessagePropertiesTooLarge = 43101,
 
   /**
    * @brief A server operator has received a legal demand to deny access to a
    * resource or to a set of resources that includes the requested resource.
    *
    */
-  UnavailableForLegalReasons = 451,
+  UnavailableForLegalReasons = 45100,
 
   /**
    * @brief Server side interval error
    *
    */
-  InternalServerError = 500,
+  InternalServerError = 50000,
 
   /**
    * @brief The server either does not recognize the request method, or it lacks
    * the ability to fulfil the request.
    *
    */
-  NotImplemented = 501,
+  NotImplemented = 50100,
 
   /**
    * @brief The server was acting as a gateway or proxy and received an invalid
    * response from the upstream server.
    *
    */
-  BadGateway = 502,
+  BadGateway = 50200,
 
   /**
    * @brief The server cannot handle the request (because it is overloaded or
    * down for maintenance). Generally, this is a temporary state.
    *
    */
-  ServiceUnavailable = 503,
+  ServiceUnavailable = 50300,
 
   /**
    * @brief The server was acting as a gateway or proxy and did not receive a
    * timely response from the upstream server.
    *
    */
-  GatewayTimeout = 504,
+  GatewayTimeout = 50400,
 
   /**
    * @brief The server does not support the protocol version used in the
    * request.
    *
    */
-  ProtocolVersionNotSupported = 505,
+  NotSupported = 50500,
+
+  ProtocolUnsupported = 50501,
+
+  VerifyFifoMessageUnsupported = 50502,
 
   /**
    * @brief The server is unable to store the representation needed to complete
    * the request.
    *
    */
-  InsufficientStorage = 507,
+  InsufficientStorage = 50700,
 };
 
 std::error_code make_error_code(ErrorCode code);
diff --git a/cpp/src/main/cpp/base/ErrorCategory.cpp b/cpp/src/main/cpp/base/ErrorCategory.cpp
index 4fe2ea0..4f1b290 100644
--- a/cpp/src/main/cpp/base/ErrorCategory.cpp
+++ b/cpp/src/main/cpp/base/ErrorCategory.cpp
@@ -53,7 +53,7 @@ std::string ErrorCategory::message(int code) const {
     case ErrorCode::TopicNotFound:
       return "Topic is not found. Verify the request topic has already been created through console or management API";
 
-    case ErrorCode::GroupNotFound:
+    case ErrorCode::ConsumerGroupNotFound:
       return "Group is not found. Verify the request group has already been created through console or management API";
 
     case ErrorCode::RequestTimeout:
@@ -100,17 +100,16 @@ std::string ErrorCategory::message(int code) const {
              "a timely response from the upstream "
              "server.";
 
-    case ErrorCode::ProtocolVersionNotSupported:
+    case ErrorCode::ProtocolUnsupported:
       return "The server does not support the protocol version used in the "
              "request.";
 
     case ErrorCode::InsufficientStorage:
       return "The server is unable to store the representation needed to "
              "complete the request.";
-
-    default:
-      return "Not-Implemented";
   }
+
+  return "";
 }
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
index 0c6e155..05a670b 100644
--- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
@@ -478,32 +478,64 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metad
         }
         auto ptr = std::make_shared<TopicRouteData>(std::move(message_queues));
         cb(ec, ptr);
-      } break;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_ACCESS_POINT: {
+        SPDLOG_WARN("IllegalAccessPoint: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::IllegalAccessPoint;
+        cb(ec, nullptr);
+        break;
+      }
+
       case rmq::Code::UNAUTHORIZED: {
         SPDLOG_WARN("Unauthorized: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::Unauthorized;
         cb(ec, nullptr);
-      } break;
-      case rmq::Code::FORBIDDEN: {
-        SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address);
-        ec = ErrorCode::Forbidden;
-        cb(ec, nullptr);
-      } break;
+        break;
+      }
+
       case rmq::Code::TOPIC_NOT_FOUND: {
         SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::NotFound;
         cb(ec, nullptr);
-      } break;
+        break;
+      }
+
+      case rmq::Code::TOO_MANY_REQUESTS: {
+        SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::TooManyRequest;
+        cb(ec, nullptr);
+        break;
+      }
+
+      case rmq::Code::CLIENT_ID_REQUIRED: {
+        SPDLOG_ERROR("ClientIdRequired: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::ClientIdRequired;
+        cb(ec, nullptr);
+        break;
+      }
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address);
         ec = ErrorCode::InternalServerError;
         cb(ec, nullptr);
-      } break;
+        break;
+      }
+
+      case rmq::Code::PROXY_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address);
+        ec = ErrorCode::GatewayTimeout;
+        cb(ec, nullptr);
+        break;
+      }
+
       default: {
         SPDLOG_WARN("NotImplement: Please upgrade to latest SDK release. Host={}", invocation_context->remote_address);
         ec = ErrorCode::NotImplemented;
         cb(ec, nullptr);
-      } break;
+        break;
+      }
     }
   };
   invocation_context->callback = callback;
diff --git a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
index 223c879..4667de4 100644
--- a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp
@@ -69,7 +69,7 @@ void ReceiveMessageStreamReader::OnReadDone(bool ok) {
         }
 
         case rmq::Code::CONSUMER_GROUP_NOT_FOUND: {
-          ec_ = ErrorCode::GroupNotFound;
+          ec_ = ErrorCode::ConsumerGroupNotFound;
           break;
         }
         case rmq::Code::TOO_MANY_REQUESTS: {


[rocketmq-clients] 01/02: Sync and update protobuf files to v2.0-alpha

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 979c226efafe54f8606f6618be8bbde8f443c321
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jun 30 10:51:43 2022 +0800

    Sync and update protobuf files to v2.0-alpha
---
 cpp/proto/apache/rocketmq/v2/definition.proto  | 161 +++++++++++--------------
 cpp/proto/apache/rocketmq/v2/service.proto     |   7 ++
 cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp |   2 +-
 3 files changed, 81 insertions(+), 89 deletions(-)

diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto b/cpp/proto/apache/rocketmq/v2/definition.proto
index 58bf06d..ab1fd09 100644
--- a/cpp/proto/apache/rocketmq/v2/definition.proto
+++ b/cpp/proto/apache/rocketmq/v2/definition.proto
@@ -292,106 +292,74 @@ message Assignment {
 }
 
 enum Code {
-  // Success.
-  OK = 0;
+  CODE_UNSPECIFIED = 0;
+
+  // Generic code for success.
+  OK = 20000;
+
+  // Generic code for multiple return results.
+  MULTIPLE_RESULTS = 30000;
+
+  // Generic code for bad request, indicating that required fields or headers are missing.
+  BAD_REQUEST = 40000;
   // Format of access point is illegal.
-  ILLEGAL_ACCESS_POINT = 1;
+  ILLEGAL_ACCESS_POINT = 40001;
   // Format of topic is illegal.
-  ILLEGAL_TOPIC = 2;
+  ILLEGAL_TOPIC = 40002;
   // Format of consumer group is illegal.
-  ILLEGAL_CONSUMER_GROUP = 3;
+  ILLEGAL_CONSUMER_GROUP = 40003;
   // Format of message tag is illegal.
-  ILLEGAL_MESSAGE_TAG = 4;
+  ILLEGAL_MESSAGE_TAG = 40004;
   // Format of message key is illegal.
-  ILLEGAL_MESSAGE_KEY = 5;
-  // Size of message keys exceeds the threshold.
-  MESSAGE_KEYS_TOO_LARGE = 6;
+  ILLEGAL_MESSAGE_KEY = 40005;
   // Format of message group is illegal.
-  ILLEGAL_MESSAGE_GROUP = 7;
+  ILLEGAL_MESSAGE_GROUP = 40006;
   // Format of message property key is illegal.
-  ILLEGAL_MESSAGE_PROPERTY_KEY = 8;
+  ILLEGAL_MESSAGE_PROPERTY_KEY = 40007;
   // Message properties total size exceeds the threshold.
-  MESSAGE_PROPERTIES_TOO_LARGE = 9;
+  MESSAGE_PROPERTIES_TOO_LARGE = 40008;
   // Message body size exceeds the threshold.
-  MESSAGE_BODY_TOO_LARGE = 10;
-
-  // User does not have the permission to operate.
-  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/403
-  FORBIDDEN = 403;
-
-  // Code indicates that the client request has not been completed
-  // because it lacks valid authentication credentials for the
-  // requested resource.
-  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/401
-  UNAUTHORIZED = 401;
-
-  // Topic resource does not exist.
-  TOPIC_NOT_FOUND = 13;
-
-  // Consumer group resource does not exist.
-  CONSUMER_GROUP_NOT_FOUND = 14;
-
-  // Not allowed to verify message. Chances are that you are verifying
-  // a FIFO message, as is violating FIFO semantics.
-  VERIFY_MESSAGE_FORBIDDEN = 15;
-
-  // Failed to consume message.
-  FAILED_TO_CONSUME_MESSAGE = 16;
-
-  // Message is corrupted.
-  MESSAGE_CORRUPTED = 17;
-
-  // Too many requests are made in short period of duration.
-  // Requests are throttled.
-  TOO_MANY_REQUESTS = 18;
-
-  // Expired receipt-handle is used when trying to acknowledge or change
-  // invisible duration of a message
-  RECEIPT_HANDLE_EXPIRED = 19;
-
-  // Message property is not match the message type.
-  MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 20;
-
-  // Format of message id is illegal.
-  ILLEGAL_MESSAGE_ID = 21;
-
+  MESSAGE_BODY_TOO_LARGE = 40009;
   // Transaction id is invalid.
-  INVALID_TRANSACTION_ID = 22;
-
+  INVALID_TRANSACTION_ID = 40010;
+  // Format of message id is illegal.
+  ILLEGAL_MESSAGE_ID = 40011;
   // Format of filter expression is illegal.
-  ILLEGAL_FILTER_EXPRESSION = 23;
-
+  ILLEGAL_FILTER_EXPRESSION = 40012;
   // Receipt handle of message is invalid.
-  INVALID_RECEIPT_HANDLE = 24;
-
-  // Message persistence timeout.
-  MASTER_PERSISTENCE_TIMEOUT = 25;
-
-  // Slave persistence timeout.
-  SLAVE_PERSISTENCE_TIMEOUT = 26;
+  INVALID_RECEIPT_HANDLE = 40013;
+  // Message property is not match the message type.
+  MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 40014;
+  // Client type could not be recognized.
+  UNRECOGNIZED_CLIENT_TYPE = 40015;
+  // Message is corrupted.
+  MESSAGE_CORRUPTED = 40016;
+  // Request is rejected due to missing of x-mq-client-id header.
+  CLIENT_ID_REQUIRED = 40017;
 
-  // The HA-mechanism is not working now.
-  HA_NOT_AVAILABLE = 27;
+  // Generic code indicates that the client request lacks valid authentication
+  // credentials for the requested resource.
+  UNAUTHORIZED = 40100;
 
-  // Operation is not allowed in current version.
-  VERSION_UNSUPPORTED = 28;
+  // Generic code for the case that user does not have the permission to operate.
+  FORBIDDEN = 40300;
 
+  // Generic code for resource not found.
+  NOT_FOUND = 40400;
   // Message not found from server.
-  MESSAGE_NOT_FOUND = 29;
-
-  // Message offset is illegal.
-  ILLEGAL_MESSAGE_OFFSET = 30;
-
-  // Illegal message is for the sake of backward compatibility. In most case,
-  // more definitive code is better, e.g. `ILLEGAL_MESSAGE_TAG`.
-  ILLEGAL_MESSAGE = 31;
-
-  // Client type could not be recognized.
-  UNRECOGNIZED_CLIENT_TYPE = 32;
+  MESSAGE_NOT_FOUND = 40401;
+  // Topic resource does not exist.
+  TOPIC_NOT_FOUND = 40402;
+  // Consumer group resource does not exist.
+  CONSUMER_GROUP_NOT_FOUND = 40403;
 
-  // Return different results for entries in composite request.
-  MULTIPLE_RESULTS = 33;
+  // Generic code indicates that too many requests are made in short period of duration.
+  // Requests are throttled.
+  TOO_MANY_REQUESTS = 42900;
 
+  // Generic code indicates that server/client encountered an unexpected
+  // condition that prevented it from fulfilling the request.
+  INTERNAL_ERROR = 50000;
   // Code indicates that the server encountered an unexpected condition
   // that prevented it from fulfilling the request.
   // This error response is a generic "catch-all" response.
@@ -401,17 +369,34 @@ enum Code {
   // to prevent the error from happening again in the future.
   //
   // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500
-  INTERNAL_SERVER_ERROR = 500;
+  INTERNAL_SERVER_ERROR = 50001;
+  // The HA-mechanism is not working now.
+  HA_NOT_AVAILABLE = 50002;
 
-  // Code means that the server or client does not support the functionality
-  // required to fulfill the request.
-  NOT_IMPLEMENTED = 501;
+  // Generic code means that the server or client does not support the
+  // functionality required to fulfill the request.
+  NOT_IMPLEMENTED = 50100;
 
+  // Generic code for timeout.
+  TIMEOUT = 50400;
+  // Message persistence timeout.
+  MASTER_PERSISTENCE_TIMEOUT = 50401;
+  // Slave persistence timeout.
+  SLAVE_PERSISTENCE_TIMEOUT = 50402;
   // Code indicates that the server, while acting as a gateway or proxy,
   // did not get a response in time from the upstream server that
   // it needed in order to complete the request.
-  // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504
-  GATEWAY_TIMEOUT = 504;
+  PROXY_TIMEOUT = 50403;
+
+  UNSUPPORTED = 50500;
+  // Operation is not allowed in current version.
+  VERSION_UNSUPPORTED = 50501;
+  // Not allowed to verify message. Chances are that you are verifying
+  // a FIFO message, as is violating FIFO semantics.
+  VERIFY_FIFO_MESSAGE_UNSUPPORTED = 50502;
+
+  // Generic code for failed message consumption.
+  FAILED_TO_CONSUME_MESSAGE = 60000;
 }
 
 message Status {
diff --git a/cpp/proto/apache/rocketmq/v2/service.proto b/cpp/proto/apache/rocketmq/v2/service.proto
index fa2a2f0..c5d4cce 100644
--- a/cpp/proto/apache/rocketmq/v2/service.proto
+++ b/cpp/proto/apache/rocketmq/v2/service.proto
@@ -16,6 +16,7 @@
 syntax = "proto3";
 
 import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
 
 import "apache/rocketmq/v2/definition.proto";
 
@@ -101,6 +102,8 @@ message ReceiveMessageResponse {
   oneof content {
     Status status = 1;
     Message message = 2;
+    // The timestamp that brokers start to deliver status line or message.
+    google.protobuf.Timestamp delivery_timestamp = 3;
   }
 }
 
@@ -211,6 +214,10 @@ message Publishing {
   // reject the request. As a result, it is advisable that Producer performs
   // client-side check validation.
   int32 max_body_size = 3;
+
+  // When `validate_message_type` flag set `false`, no need to validate message's type
+  // with messageQueue's `accept_message_types` before publising.
+  bool validate_message_type = 4;
 }
 
 message Subscription {
diff --git a/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp b/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
index 89dc112..2acff23 100644
--- a/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp
@@ -550,7 +550,7 @@ void PushConsumerImpl::onVerifyMessage(MessageConstSharedPtr message, std::funct
         cmd.mutable_status()->set_message("Unexpected exception raised");
       }
     } else {
-      cmd.mutable_status()->set_code(rmq::Code::VERIFY_MESSAGE_FORBIDDEN);
+      cmd.mutable_status()->set_code(rmq::Code::VERIFY_FIFO_MESSAGE_UNSUPPORTED);
       cmd.mutable_status()->set_message("Unsupported Operation For FIFO Message");
     }
   } else {