You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/02 07:13:44 UTC

[rocketmq-clients] branch master updated: Add request-id support

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new a905eba6 Add request-id support
a905eba6 is described below

commit a905eba687ec9fa88eadfffdb3d53c17c8fe02ad
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Thu Mar 2 14:46:56 2023 +0800

    Add request-id support
---
 csharp/README.md                                   |  2 +-
 csharp/examples/QuickStart.cs                      |  4 +-
 csharp/rocketmq-client-csharp/Client.cs            | 26 +++----
 csharp/rocketmq-client-csharp/ClientManager.cs     | 81 ++++++++++++++--------
 csharp/rocketmq-client-csharp/Consumer.cs          |  6 +-
 csharp/rocketmq-client-csharp/IClientManager.cs    | 31 +++++----
 csharp/rocketmq-client-csharp/Producer.cs          | 14 ++--
 .../RpcInvocation.cs}                              | 68 ++++++++++--------
 csharp/rocketmq-client-csharp/SendReceipt.cs       | 11 +--
 csharp/rocketmq-client-csharp/Signature.cs         |  1 +
 csharp/rocketmq-client-csharp/SimpleConsumer.cs    |  8 +--
 csharp/rocketmq-client-csharp/StatusChecker.cs     | 31 ++++-----
 12 files changed, 162 insertions(+), 121 deletions(-)

diff --git a/csharp/README.md b/csharp/README.md
index 774e1400..3de6bc65 100644
--- a/csharp/README.md
+++ b/csharp/README.md
@@ -18,7 +18,7 @@ The client would be developed using the protocols outlined in [rocketmq-apis](ht
 dotnet add package RocketMQ.Client
 ```
 
-You can obtain the latest version of `RocketMQ.Client` from [NuGet Gallery](https://www.nuget.org/packages/RocketMQ.Client). To assist with getting started quickly and working with various message types and clients, we offer additional code [here](./examples) here.
+You can obtain the latest version of `RocketMQ.Client` from [NuGet Gallery](https://www.nuget.org/packages/RocketMQ.Client). To assist with getting started quickly and working with various message types and clients, we offer examples [here](./examples) here.
 
 ## Build
 
diff --git a/csharp/examples/QuickStart.cs b/csharp/examples/QuickStart.cs
index 291042d9..174fccd2 100644
--- a/csharp/examples/QuickStart.cs
+++ b/csharp/examples/QuickStart.cs
@@ -21,11 +21,11 @@ namespace examples
     {
         public static void Main()
         {
-            // ProducerNormalMessageExample.QuickStart().Wait();
+            ProducerNormalMessageExample.QuickStart().Wait();
             // await ProducerFifoMessageExample.QuickStart();
             // await ProducerDelayMessageExample.QuickStart();
             // await SimpleConsumerExample.QuickStart();
-            ProducerBenchmark.QuickStart().Wait();
+            // ProducerBenchmark.QuickStart().Wait();
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 1b3ff59d..a9bdb705 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -322,18 +322,18 @@ namespace Org.Apache.Rocketmq
                 Endpoints = Endpoints.ToProtobuf()
             };
 
-            var response =
+            var invocation =
                 await ClientManager.QueryRoute(Endpoints, request, ClientConfig.RequestTimeout);
-            var code = response.Status.Code;
+            var code = invocation.Response.Status.Code;
             if (!Proto.Code.Ok.Equals(code))
             {
                 Logger.Error($"Failed to fetch topic route, clientId={ClientId}, topic={topic}, code={code}, " +
-                             $"statusMessage={response.Status.Message}");
+                             $"statusMessage={invocation.Response.Status.Message}");
             }
 
-            StatusChecker.Check(response.Status, request);
+            StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
 
-            var messageQueues = response.MessageQueues.ToList();
+            var messageQueues = invocation.Response.MessageQueues.ToList();
             return new TopicRouteData(messageQueues);
         }
 
@@ -343,21 +343,21 @@ namespace Org.Apache.Rocketmq
             {
                 var endpoints = GetTotalRouteEndpoints();
                 var request = WrapHeartbeatRequest();
-                Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new();
+                Dictionary<Endpoints, Task<RpcInvocation<Proto.HeartbeatRequest, Proto.HeartbeatResponse>>> invocations = new();
 
                 // Collect task into a map.
                 foreach (var item in endpoints)
                 {
                     var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout);
-                    responses[item] = task;
+                    invocations[item] = task;
                 }
 
-                foreach (var item in responses.Keys)
+                foreach (var item in invocations.Keys)
                 {
                     try
                     {
-                        var response = await responses[item];
-                        var code = response.Status.Code;
+                        var invocation = await invocations[item];
+                        var code = invocation.Response.Status.Code;
 
                         if (code.Equals(Proto.Code.Ok))
                         {
@@ -371,7 +371,7 @@ namespace Org.Apache.Rocketmq
                             return;
                         }
 
-                        var statusMessage = response.Status.Message;
+                        var statusMessage = invocation.Response.Status.Message;
                         Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, " +
                                     $"statusMessage={statusMessage}, clientId={ClientId}");
                     }
@@ -404,10 +404,10 @@ namespace Org.Apache.Rocketmq
             var request = WrapNotifyClientTerminationRequest();
             foreach (var item in endpoints)
             {
-                var response = await ClientManager.NotifyClientTermination(item, request, ClientConfig.RequestTimeout);
+                var invocation = await ClientManager.NotifyClientTermination(item, request, ClientConfig.RequestTimeout);
                 try
                 {
-                    StatusChecker.Check(response.Status, request);
+                    StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
                 }
                 catch (Exception e)
                 {
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs
index c8a525a4..da46af30 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -98,59 +98,86 @@ namespace Org.Apache.Rocketmq
             return GetRpcClient(endpoints).Telemetry(_client.Sign());
         }
 
-        public async Task<Proto.QueryRouteResponse> QueryRoute(Endpoints endpoints, Proto.QueryRouteRequest request,
-            TimeSpan timeout)
+        public async Task<RpcInvocation<Proto.QueryRouteRequest, Proto.QueryRouteResponse>> QueryRoute(
+            Endpoints endpoints,
+            Proto.QueryRouteRequest request, TimeSpan timeout)
         {
-            return await GetRpcClient(endpoints).QueryRoute(_client.Sign(), request, timeout);
+            var metadata = _client.Sign();
+            var response = await GetRpcClient(endpoints).QueryRoute(metadata, request, timeout);
+            return new RpcInvocation<Proto.QueryRouteRequest, Proto.QueryRouteResponse>(request, response, metadata);
         }
 
-        public async Task<Proto.HeartbeatResponse> Heartbeat(Endpoints endpoints, Proto.HeartbeatRequest request,
-            TimeSpan timeout)
+        public async Task<RpcInvocation<Proto.HeartbeatRequest, Proto.HeartbeatResponse>> Heartbeat(Endpoints endpoints,
+            Proto.HeartbeatRequest request, TimeSpan timeout)
         {
-            return await GetRpcClient(endpoints).Heartbeat(_client.Sign(), request, timeout);
+            var metadata = _client.Sign();
+            var response = await GetRpcClient(endpoints).Heartbeat(metadata, request, timeout);
+            return new RpcInvocation<Proto.HeartbeatRequest, Proto.HeartbeatResponse>(request, response, metadata);
         }
 
-        public async Task<Proto.NotifyClientTerminationResponse> NotifyClientTermination(Endpoints endpoints,
-            Proto.NotifyClientTerminationRequest request, TimeSpan timeout)
+        public async Task<RpcInvocation<Proto.NotifyClientTerminationRequest, Proto.NotifyClientTerminationResponse>>
+            NotifyClientTermination(Endpoints endpoints, Proto.NotifyClientTerminationRequest request, TimeSpan timeout)
         {
-            return await GetRpcClient(endpoints).NotifyClientTermination(_client.Sign(), request, timeout);
+            var metadata = _client.Sign();
+            var response = await GetRpcClient(endpoints).NotifyClientTermination(metadata, request, timeout);
+            return new RpcInvocation<Proto.NotifyClientTerminationRequest, Proto.NotifyClientTerminationResponse>(
+                request, response, metadata);
         }
 
-        public async Task<Proto::SendMessageResponse> SendMessage(Endpoints endpoints,
-            Proto::SendMessageRequest request,
-            TimeSpan timeout)
+        public async Task<RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>> SendMessage(
+            Endpoints endpoints, Proto::SendMessageRequest request, TimeSpan timeout)
         {
-            return await GetRpcClient(endpoints).SendMessage(_client.Sign(), request, timeout);
+            var metadata = _client.Sign();
+            var response = await GetRpcClient(endpoints).SendMessage(metadata, request, timeout);
+            return new RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>(
+                request, response, metadata);
         }
 
-        public async Task<Proto::QueryAssignmentResponse> QueryAssignment(Endpoints endpoints,
-            Proto.QueryAssignmentRequest request, TimeSpan timeout)
+        public async Task<RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse>> QueryAssignment(
+            Endpoints endpoints, Proto.QueryAssignmentRequest request, TimeSpan timeout)
         {
-            return await GetRpcClient(endpoints).QueryAssignment(_client.Sign(), request, timeout);
+            var metadata = _client.Sign();
+            var response = await GetRpcClient(endpoints).QueryAssignment(metadata, request, timeout);
+            return new RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse>(
+                request, response, metadata);
         }
 
-        public async Task<List<Proto::ReceiveMessageResponse>> ReceiveMessage(Endpoints endpoints,
-            Proto.ReceiveMessageRequest request, TimeSpan timeout)
+        public async Task<RpcInvocation<Proto.ReceiveMessageRequest, List<Proto.ReceiveMessageResponse>>>
+            ReceiveMessage(Endpoints endpoints, Proto.ReceiveMessageRequest request, TimeSpan timeout)
         {
-            return await GetRpcClient(endpoints).ReceiveMessage(_client.Sign(), request, timeout);
+            var metadata = _client.Sign();
+            var response = await GetRpcClient(endpoints).ReceiveMessage(metadata, request, timeout);
+            return new RpcInvocation<Proto.ReceiveMessageRequest, List<Proto.ReceiveMessageResponse>>(
+                request, response, metadata);
         }
 
-        public async Task<Proto::AckMessageResponse> AckMessage(Endpoints endpoints,
-            Proto.AckMessageRequest request, TimeSpan timeout)
+        public async Task<RpcInvocation<Proto.AckMessageRequest, Proto.AckMessageResponse>> AckMessage(
+            Endpoints endpoints, Proto.AckMessageRequest request, TimeSpan timeout)
         {
-            return await GetRpcClient(endpoints).AckMessage(_client.Sign(), request, timeout);
+            var metadata = _client.Sign();
+            var response = await GetRpcClient(endpoints).AckMessage(metadata, request, timeout);
+            return new RpcInvocation<Proto.AckMessageRequest, Proto.AckMessageResponse>(
+                request, response, metadata);
         }
 
-        public async Task<Proto::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Endpoints endpoints,
-            Proto.ChangeInvisibleDurationRequest request, TimeSpan timeout)
+        public async Task<RpcInvocation<Proto.ChangeInvisibleDurationRequest, Proto.ChangeInvisibleDurationResponse>>
+            ChangeInvisibleDuration(Endpoints endpoints,
+                Proto.ChangeInvisibleDurationRequest request, TimeSpan timeout)
         {
-            return await GetRpcClient(endpoints).ChangeInvisibleDuration(_client.Sign(), request, timeout);
+            var metadata = _client.Sign();
+            var response = await GetRpcClient(endpoints).ChangeInvisibleDuration(metadata, request, timeout);
+            return new RpcInvocation<Proto.ChangeInvisibleDurationRequest, Proto.ChangeInvisibleDurationResponse>(
+                request, response, metadata);
         }
 
-        public async Task<Proto.EndTransactionResponse> EndTransaction(Endpoints endpoints,
+        public async Task<RpcInvocation<Proto.EndTransactionRequest, Proto.EndTransactionResponse>> EndTransaction(
+            Endpoints endpoints,
             Proto.EndTransactionRequest request, TimeSpan timeout)
         {
-            return await GetRpcClient(endpoints).EndTransaction(_client.Sign(), request, timeout);
+            var metadata = _client.Sign();
+            var response = await GetRpcClient(endpoints).EndTransaction(metadata, request, timeout);
+            return new RpcInvocation<Proto.EndTransactionRequest, Proto.EndTransactionResponse>(
+                request, response, metadata);
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Consumer.cs b/csharp/rocketmq-client-csharp/Consumer.cs
index 2ba1de1e..3444fbc0 100644
--- a/csharp/rocketmq-client-csharp/Consumer.cs
+++ b/csharp/rocketmq-client-csharp/Consumer.cs
@@ -41,14 +41,14 @@ namespace Org.Apache.Rocketmq
         {
             var tolerance = ClientConfig.RequestTimeout;
             var timeout = tolerance.Add(awaitDuration);
-            var response = await ClientManager.ReceiveMessage(mq.Broker.Endpoints, request, timeout);
+            var invocation = await ClientManager.ReceiveMessage(mq.Broker.Endpoints, request, timeout);
             var status = new Proto.Status()
             {
                 Code = Proto.Code.InternalServerError,
                 Message = "Status was not set by server"
             };
             var messageList = new List<Proto.Message>();
-            foreach (var entry in response)
+            foreach (var entry in invocation.Response)
             {
                 switch (entry.ContentCase)
                 {
@@ -66,7 +66,7 @@ namespace Org.Apache.Rocketmq
             }
 
             var messages = messageList.Select(message => MessageView.FromProtobuf(message, mq)).ToList();
-            StatusChecker.Check(status, request);
+            StatusChecker.Check(status, request, invocation.RequestId);
             return new ReceiveMessageResult(mq.Broker.Endpoints, messages);
         }
 
diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs
index 2082584c..47af0280 100644
--- a/csharp/rocketmq-client-csharp/IClientManager.cs
+++ b/csharp/rocketmq-client-csharp/IClientManager.cs
@@ -39,7 +39,8 @@ namespace Org.Apache.Rocketmq
         /// <param name="request">gRPC request of querying topic route.</param>
         /// <param name="timeout">Request max duration.</param>
         /// <returns>Task of response.</returns>
-        Task<QueryRouteResponse> QueryRoute(Endpoints endpoints, QueryRouteRequest request, TimeSpan timeout);
+        Task<RpcInvocation<QueryRouteRequest, QueryRouteResponse>> QueryRoute(Endpoints endpoints,
+            QueryRouteRequest request, TimeSpan timeout);
 
         /// <summary>
         /// Send heartbeat to remote endpoints.
@@ -48,7 +49,8 @@ namespace Org.Apache.Rocketmq
         /// <param name="request">gRPC request of heartbeat.</param>
         /// <param name="timeout">Request max duration.</param>
         /// <returns>Task of response.</returns>
-        Task<HeartbeatResponse> Heartbeat(Endpoints endpoints, HeartbeatRequest request, TimeSpan timeout);
+        Task<RpcInvocation<HeartbeatRequest, HeartbeatResponse>> Heartbeat(Endpoints endpoints,
+            HeartbeatRequest request, TimeSpan timeout);
 
         /// <summary>
         /// Notify client's termination.
@@ -57,8 +59,8 @@ namespace Org.Apache.Rocketmq
         /// <param name="request">gRPC request of notifying client's termination.</param>
         /// <param name="timeout">Request max duration.</param>
         /// <returns>Task of response.</returns>
-        Task<NotifyClientTerminationResponse> NotifyClientTermination(Endpoints endpoints,
-            NotifyClientTerminationRequest request, TimeSpan timeout);
+        Task<RpcInvocation<NotifyClientTerminationRequest, NotifyClientTerminationResponse>> NotifyClientTermination(
+            Endpoints endpoints, NotifyClientTerminationRequest request, TimeSpan timeout);
 
         /// <summary>
         /// Send message to remote endpoints.
@@ -67,22 +69,25 @@ namespace Org.Apache.Rocketmq
         /// <param name="request"></param>
         /// <param name="timeout"></param>
         /// <returns></returns>
-        Task<SendMessageResponse> SendMessage(Endpoints endpoints, SendMessageRequest request,
-            TimeSpan timeout);
+        Task<RpcInvocation<SendMessageRequest, SendMessageResponse>> SendMessage(Endpoints endpoints,
+            SendMessageRequest request, TimeSpan timeout);
 
-        Task<QueryAssignmentResponse> QueryAssignment(Endpoints endpoints, QueryAssignmentRequest request,
-            TimeSpan timeout);
+        Task<RpcInvocation<QueryAssignmentRequest, QueryAssignmentResponse>> QueryAssignment(Endpoints endpoints,
+            QueryAssignmentRequest request, TimeSpan timeout);
 
-        Task<List<ReceiveMessageResponse>> ReceiveMessage(Endpoints endpoints, ReceiveMessageRequest request,
+        Task<RpcInvocation<ReceiveMessageRequest, List<ReceiveMessageResponse>>> ReceiveMessage(Endpoints endpoints,
+            ReceiveMessageRequest request,
             TimeSpan timeout);
 
-        Task<AckMessageResponse> AckMessage(Endpoints endpoints, AckMessageRequest request, TimeSpan timeout);
+        Task<RpcInvocation<AckMessageRequest, AckMessageResponse>> AckMessage(Endpoints endpoints,
+            AckMessageRequest request, TimeSpan timeout);
 
-        Task<ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Endpoints endpoints,
+        Task<RpcInvocation<ChangeInvisibleDurationRequest, ChangeInvisibleDurationResponse>> ChangeInvisibleDuration(
+            Endpoints endpoints,
             ChangeInvisibleDurationRequest request, TimeSpan timeout);
 
-        Task<EndTransactionResponse> EndTransaction(Endpoints endpoints, EndTransactionRequest request,
-            TimeSpan timeout);
+        Task<RpcInvocation<EndTransactionRequest, EndTransactionResponse>> EndTransaction(Endpoints endpoints,
+            EndTransactionRequest request, TimeSpan timeout);
 
         Task Shutdown();
     }
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 734a0902..6510909b 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -239,10 +239,10 @@ namespace Org.Apache.Rocketmq
 
             var sendMessageRequest = WrapSendMessageRequest(message, mq);
             var endpoints = mq.Broker.Endpoints;
-            var response = await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
+            var invocation = await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
             try
             {
-                var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, response);
+                var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, invocation);
 
                 var sendReceipt = sendReceipts.First();
                 if (attempt > 1)
@@ -260,9 +260,9 @@ namespace Org.Apache.Rocketmq
                 Isolated[endpoints] = true;
                 if (attempt >= maxAttempts)
                 {
-                    Logger.Error("Failed to send message finally, run out of attempt times, " +
-                                 $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " +
-                                 $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
+                    Logger.Error(e, "Failed to send message finally, run out of attempt times, " +
+                                    $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " +
+                                    $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
                     throw;
                 }
 
@@ -325,8 +325,8 @@ namespace Org.Apache.Rocketmq
                     ? Proto.TransactionResolution.Commit
                     : Proto.TransactionResolution.Rollback
             };
-            var response = await ClientManager.EndTransaction(endpoints, request, ClientConfig.RequestTimeout);
-            StatusChecker.Check(response.Status, request);
+            var invocation = await ClientManager.EndTransaction(endpoints, request, ClientConfig.RequestTimeout);
+            StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
         }
 
         public class Builder
diff --git a/csharp/examples/QuickStart.cs b/csharp/rocketmq-client-csharp/RpcInvocation.cs
similarity index 64%
copy from csharp/examples/QuickStart.cs
copy to csharp/rocketmq-client-csharp/RpcInvocation.cs
index 291042d9..c8242993 100644
--- a/csharp/examples/QuickStart.cs
+++ b/csharp/rocketmq-client-csharp/RpcInvocation.cs
@@ -1,31 +1,39 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace examples
-{
-    internal static class QuickStart
-    {
-        public static void Main()
-        {
-            // ProducerNormalMessageExample.QuickStart().Wait();
-            // await ProducerFifoMessageExample.QuickStart();
-            // await ProducerDelayMessageExample.QuickStart();
-            // await SimpleConsumerExample.QuickStart();
-            ProducerBenchmark.QuickStart().Wait();
-        }
-    }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Grpc.Core;
+
+namespace Org.Apache.Rocketmq
+{
+    public class RpcInvocation<T, U>
+    {
+        public RpcInvocation(T request, U response, Metadata metadata)
+        {
+            Request = request;
+            Response = response;
+            Metadata = metadata;
+        }
+
+        internal T Request { get; }
+
+        internal U Response { get; }
+
+        private Metadata Metadata { get; }
+
+        internal string RequestId => Metadata.GetValue(MetadataConstants.RequestIdKey);
+    }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs b/csharp/rocketmq-client-csharp/SendReceipt.cs
index 6e7610c3..c9fe8014 100644
--- a/csharp/rocketmq-client-csharp/SendReceipt.cs
+++ b/csharp/rocketmq-client-csharp/SendReceipt.cs
@@ -44,10 +44,11 @@ namespace Org.Apache.Rocketmq
         }
 
         public static IEnumerable<SendReceipt> ProcessSendMessageResponse(MessageQueue mq,
-            Proto.SendMessageResponse response)
+            RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>
+                invocation)
         {
-            var status = response.Status;
-            foreach (var entry in response.Entries)
+            var status = invocation.Response.Status;
+            foreach (var entry in invocation.Response.Entries)
             {
                 if (Proto.Code.Ok.Equals(entry.Status.Code))
                 {
@@ -56,8 +57,8 @@ namespace Org.Apache.Rocketmq
             }
 
             // May throw exception.
-            StatusChecker.Check(status, response);
-            return response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList();
+            StatusChecker.Check(status, invocation.Request, invocation.RequestId);
+            return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList();
         }
     }
 }
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Signature.cs b/csharp/rocketmq-client-csharp/Signature.cs
index a1df5049..0572a062 100644
--- a/csharp/rocketmq-client-csharp/Signature.cs
+++ b/csharp/rocketmq-client-csharp/Signature.cs
@@ -40,6 +40,7 @@ namespace Org.Apache.Rocketmq
             var clientConfig = client.GetClientConfig();
             dictionary.Add(MetadataConstants.LanguageKey, MetadataConstants.LanguageValue);
             dictionary.Add(MetadataConstants.ClientVersionKey, MetadataConstants.Instance.ClientVersion);
+            dictionary.Add(MetadataConstants.RequestIdKey, Guid.NewGuid().ToString());
             dictionary.Add(MetadataConstants.ClientIdKey, client.GetClientId());
 
             var time = DateTime.Now.ToString(MetadataConstants.DateTimeFormat);
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 953de163..a9c4713b 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -206,9 +206,9 @@ namespace Org.Apache.Rocketmq
             }
 
             var request = WrapChangeInvisibleDuration(messageView, invisibleDuration);
-            var response = await ClientManager.ChangeInvisibleDuration(messageView.MessageQueue.Broker.Endpoints,
+            var invocation = await ClientManager.ChangeInvisibleDuration(messageView.MessageQueue.Broker.Endpoints,
                 request, ClientConfig.RequestTimeout);
-            StatusChecker.Check(response.Status, request);
+            StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
         }
 
 
@@ -220,9 +220,9 @@ namespace Org.Apache.Rocketmq
             }
 
             var request = WrapAckMessageRequest(messageView);
-            var response = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
+            var invocation = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
                 ClientConfig.RequestTimeout);
-            StatusChecker.Check(response.Status, request);
+            StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
         }
 
         private Proto.AckMessageRequest WrapAckMessageRequest(MessageView messageView)
diff --git a/csharp/rocketmq-client-csharp/StatusChecker.cs b/csharp/rocketmq-client-csharp/StatusChecker.cs
index abc0b160..3fecfd95 100644
--- a/csharp/rocketmq-client-csharp/StatusChecker.cs
+++ b/csharp/rocketmq-client-csharp/StatusChecker.cs
@@ -26,12 +26,11 @@ namespace Org.Apache.Rocketmq
     {
         private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
 
-        public static void Check(Proto.Status status, IMessage message)
+        public static void Check(Proto.Status status, IMessage request, string requestId)
         {
             var statusCode = status.Code;
 
             var statusMessage = status.Message;
-            // TODO: add request-id.
             switch (statusCode)
             {
                 case Proto.Code.Ok:
@@ -56,15 +55,15 @@ namespace Org.Apache.Rocketmq
                 case Proto.Code.MessageCorrupted:
                 case Proto.Code.ClientIdRequired:
                 case Proto.Code.IllegalPollingTime:
-                    throw new BadRequestException((int)statusCode, statusMessage);
+                    throw new BadRequestException((int)statusCode, requestId, statusMessage);
                 case Proto.Code.Unauthorized:
-                    throw new UnauthorizedException((int)statusCode, statusMessage);
+                    throw new UnauthorizedException((int)statusCode, requestId, statusMessage);
                 case Proto.Code.PaymentRequired:
-                    throw new PaymentRequiredException((int)statusCode, statusMessage);
+                    throw new PaymentRequiredException((int)statusCode, requestId, statusMessage);
                 case Proto.Code.Forbidden:
-                    throw new ForbiddenException((int)statusCode, statusMessage);
+                    throw new ForbiddenException((int)statusCode, requestId, statusMessage);
                 case Proto.Code.MessageNotFound:
-                    if (message is Proto.ReceiveMessageRequest)
+                    if (request is Proto.ReceiveMessageRequest)
                     {
                         return;
                     }
@@ -74,27 +73,27 @@ namespace Org.Apache.Rocketmq
                 case Proto.Code.NotFound:
                 case Proto.Code.TopicNotFound:
                 case Proto.Code.ConsumerGroupNotFound:
-                    throw new NotFoundException((int)statusCode, statusMessage);
+                    throw new NotFoundException((int)statusCode, requestId, statusMessage);
                 case Proto.Code.PayloadTooLarge:
                 case Proto.Code.MessageBodyTooLarge:
-                    throw new PayloadTooLargeException((int)statusCode, statusMessage);
+                    throw new PayloadTooLargeException((int)statusCode, requestId, statusMessage);
                 case Proto.Code.TooManyRequests:
-                    throw new TooManyRequestsException((int)statusCode, statusMessage);
+                    throw new TooManyRequestsException((int)statusCode, requestId, statusMessage);
                 case Proto.Code.RequestHeaderFieldsTooLarge:
                 case Proto.Code.MessagePropertiesTooLarge:
-                    throw new RequestHeaderFieldsTooLargeException((int)statusCode, statusMessage);
+                    throw new RequestHeaderFieldsTooLargeException((int)statusCode, requestId, statusMessage);
                 case Proto.Code.InternalError:
                 case Proto.Code.InternalServerError:
                 case Proto.Code.HaNotAvailable:
-                    throw new InternalErrorException((int)statusCode, statusMessage);
+                    throw new InternalErrorException((int)statusCode, requestId, statusMessage);
                 case Proto.Code.ProxyTimeout:
                 case Proto.Code.MasterPersistenceTimeout:
                 case Proto.Code.SlavePersistenceTimeout:
-                    throw new ProxyTimeoutException((int)statusCode, statusMessage);
+                    throw new ProxyTimeoutException((int)statusCode, requestId, statusMessage);
                 case Proto.Code.Unsupported:
                 case Proto.Code.VersionUnsupported:
                 case Proto.Code.VerifyFifoMessageUnsupported:
-                    throw new UnsupportedException((int)statusCode, statusMessage);
+                    throw new UnsupportedException((int)statusCode, requestId, statusMessage);
                 // Not used code.
                 case Proto.Code.RequestTimeout:
                 case Proto.Code.PreconditionFailed:
@@ -102,8 +101,8 @@ namespace Org.Apache.Rocketmq
                 case Proto.Code.FailedToConsumeMessage:
                 case Proto.Code.Unspecified:
                 default:
-                    Logger.Warn($"Unrecognized status code={statusCode}, statusMessage={statusMessage}");
-                    throw new UnsupportedException((int)statusCode, statusMessage);
+                    Logger.Warn($"Unrecognized status code={statusCode}, requestId={requestId}, statusMessage={statusMessage}");
+                    throw new UnsupportedException((int)statusCode, requestId, statusMessage);
             }
         }
     }