You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/06/30 11:15:31 UTC

[rocketmq-clients] branch cpp updated: Add error handling for EndTransaction

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/cpp by this push:
     new b477272  Add error handling for EndTransaction
b477272 is described below

commit b477272fe97aab72593f638e7f9a93c5201aff2c
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Thu Jun 30 19:15:22 2022 +0800

    Add error handling for EndTransaction
---
 cpp/api/rocketmq/ErrorCode.h                  |  16 ++--
 cpp/src/main/cpp/client/ClientManagerImpl.cpp | 113 +++++++++++++++++++++++---
 2 files changed, 112 insertions(+), 17 deletions(-)

diff --git a/cpp/api/rocketmq/ErrorCode.h b/cpp/api/rocketmq/ErrorCode.h
index e911533..4f05ded 100644
--- a/cpp/api/rocketmq/ErrorCode.h
+++ b/cpp/api/rocketmq/ErrorCode.h
@@ -31,29 +31,35 @@ enum class ErrorCode : int {
    * @brief Client state not as expected. Call Producer#start() first.
    *
    */
-  IllegalState = 1,
+  IllegalState = 10000,
 
   /**
    * @brief To publish FIFO messages, only synchronous API is supported.
    */
-  BadRequestAsyncPubFifoMessage = 10001,
+  BadRequestAsyncPubFifoMessage = 10100,
+
+  /**
+   * @brief 102XX is used for client side error.
+   *
+   */
+  InternalClientError = 10200,
 
   /**
    * @brief Broker has processed the request but is not going to return any content.
    */
-  NoContent = 204,
+  NoContent = 20400,
 
   /**
    * @brief Bad configuration. For example, negative max-attempt-times.
    *
    */
-  BadConfiguration = 300,
+  BadConfiguration = 30000,
 
   /**
    * @brief Generic code, representing multiple return results.
    *
    */
-  MultipleResults = 30000,
+  MultipleResults = 30100,
 
   /**
    * @brief The server cannot process the request due to apparent client-side
diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
index 3f572b3..1602769 100644
--- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
@@ -1188,27 +1188,72 @@ void ClientManagerImpl::endTransaction(
     }
 
     auto&& status = invocation_context->response.status();
+    auto&& peer_address = invocation_context->remote_address;
     switch (status.code()) {
       case rmq::Code::OK: {
         SPDLOG_DEBUG("endTransaction completed OK. Response: {}, host={}", invocation_context->response.DebugString(),
-                     invocation_context->remote_address);
-      } break;
+                     peer_address);
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_TOPIC: {
+        SPDLOG_WARN("IllegalTopic: {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::IllegalTopic;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+        SPDLOG_WARN("IllegalConsumerGroup: {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::IllegalConsumerGroup;
+        break;
+      }
+
+      case rmq::Code::INVALID_TRANSACTION_ID: {
+        SPDLOG_WARN("InvalidTransactionId: {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::InvalidTransactionId;
+        break;
+      }
+
+      case rmq::Code::CLIENT_ID_REQUIRED: {
+        SPDLOG_WARN("ClientIdRequired: {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::ClientIdRequired;
+        break;
+      }
+
+      case rmq::Code::TOPIC_NOT_FOUND: {
+        SPDLOG_WARN("TopicNotFound: {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::TopicNotFound;
+        break;
+      }
+
       case rmq::Code::UNAUTHORIZED: {
-        SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address);
+        SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), peer_address);
         ec = ErrorCode::Unauthorized;
-      } break;
+        break;
+      }
+
       case rmq::Code::FORBIDDEN: {
-        SPDLOG_WARN("Forbidden: {}, host={}", status.message(), invocation_context->remote_address);
+        SPDLOG_WARN("Forbidden: {}, host={}", status.message(), peer_address);
         ec = ErrorCode::Forbidden;
-      } break;
+        break;
+      }
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
-        SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address);
+        SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), peer_address);
         ec = ErrorCode::InternalServerError;
-      } break;
+        break;
+      }
+
+      case rmq::Code::PROXY_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::GatewayTimeout;
+        break;
+      }
+
       default: {
-        SPDLOG_WARN("NotImplemented: please upgrade SDK to latest release. {}, host={}", status.message(),
-                    invocation_context->remote_address);
-        ec = ErrorCode::NotImplemented;
+        SPDLOG_WARN("NotSupported: please upgrade SDK to latest release. {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::NotSupported;
+        break;
       }
     }
     cb(ec, invocation_context->response);
@@ -1246,22 +1291,66 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe
 
     SPDLOG_DEBUG("Received forwardToDeadLetterQueue response from server[host={}]", invocation_context->remote_address);
     std::error_code ec;
-    switch (invocation_context->response.status().code()) {
+    auto&& status = invocation_context->response.status();
+    auto&& peer_address = invocation_context->remote_address;
+    switch (status.code()) {
       case rmq::Code::OK: {
         break;
       }
+      case rmq::Code::ILLEGAL_TOPIC: {
+        SPDLOG_WARN("IllegalTopic: {}. Host={}", status.message(), peer_address);
+        ec = ErrorCode::IllegalTopic;
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_CONSUMER_GROUP: {
+        SPDLOG_WARN("IllegalConsumerGroup: {}. Host={}", status.message(), peer_address);
+        ec = ErrorCode::IllegalConsumerGroup;
+        break;
+      }
+
+      case rmq::Code::INVALID_RECEIPT_HANDLE: {
+        SPDLOG_WARN("IllegalReceiptHandle: {}. Host={}", status.message(), peer_address);
+        ec = ErrorCode::InvalidReceiptHandle;
+        break;
+      }
+
+      case rmq::Code::CLIENT_ID_REQUIRED: {
+        SPDLOG_WARN("IllegalTopic: {}. Host={}", status.message(), peer_address);
+
+        // TODO: translate to client internal error?
+        ec = ErrorCode::InternalServerError;
+        break;
+      }
+
+      case rmq::Code::TOPIC_NOT_FOUND: {
+        ec = ErrorCode::TopicNotFound;
+        break;
+      }
+
       case rmq::Code::INTERNAL_SERVER_ERROR: {
         ec = ErrorCode::ServiceUnavailable;
+        break;
       }
+        
       case rmq::Code::TOO_MANY_REQUESTS: {
         ec = ErrorCode::TooManyRequests;
+        break;
+      }
+
+      case rmq::Code::PROXY_TIMEOUT: {
+        ec = ErrorCode::GatewayTimeout;
+        break;
       }
+
       default: {
         ec = ErrorCode::NotImplemented;
+        break;
       }
     }
     cb(ec);
   };
+
   invocation_context->callback = callback;
   client->asyncForwardMessageToDeadLetterQueue(request, invocation_context);
 }