You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/02 07:13:44 UTC
[rocketmq-clients] branch master updated: Add request-id support
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new a905eba6 Add request-id support
a905eba6 is described below
commit a905eba687ec9fa88eadfffdb3d53c17c8fe02ad
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Thu Mar 2 14:46:56 2023 +0800
Add request-id support
---
csharp/README.md | 2 +-
csharp/examples/QuickStart.cs | 4 +-
csharp/rocketmq-client-csharp/Client.cs | 26 +++----
csharp/rocketmq-client-csharp/ClientManager.cs | 81 ++++++++++++++--------
csharp/rocketmq-client-csharp/Consumer.cs | 6 +-
csharp/rocketmq-client-csharp/IClientManager.cs | 31 +++++----
csharp/rocketmq-client-csharp/Producer.cs | 14 ++--
.../RpcInvocation.cs} | 68 ++++++++++--------
csharp/rocketmq-client-csharp/SendReceipt.cs | 11 +--
csharp/rocketmq-client-csharp/Signature.cs | 1 +
csharp/rocketmq-client-csharp/SimpleConsumer.cs | 8 +--
csharp/rocketmq-client-csharp/StatusChecker.cs | 31 ++++-----
12 files changed, 162 insertions(+), 121 deletions(-)
diff --git a/csharp/README.md b/csharp/README.md
index 774e1400..3de6bc65 100644
--- a/csharp/README.md
+++ b/csharp/README.md
@@ -18,7 +18,7 @@ The client would be developed using the protocols outlined in [rocketmq-apis](ht
dotnet add package RocketMQ.Client
```
-You can obtain the latest version of `RocketMQ.Client` from [NuGet Gallery](https://www.nuget.org/packages/RocketMQ.Client). To assist with getting started quickly and working with various message types and clients, we offer additional code [here](./examples) here.
+You can obtain the latest version of `RocketMQ.Client` from [NuGet Gallery](https://www.nuget.org/packages/RocketMQ.Client). To assist with getting started quickly and working with various message types and clients, we offer examples [here](./examples) here.
## Build
diff --git a/csharp/examples/QuickStart.cs b/csharp/examples/QuickStart.cs
index 291042d9..174fccd2 100644
--- a/csharp/examples/QuickStart.cs
+++ b/csharp/examples/QuickStart.cs
@@ -21,11 +21,11 @@ namespace examples
{
public static void Main()
{
- // ProducerNormalMessageExample.QuickStart().Wait();
+ ProducerNormalMessageExample.QuickStart().Wait();
// await ProducerFifoMessageExample.QuickStart();
// await ProducerDelayMessageExample.QuickStart();
// await SimpleConsumerExample.QuickStart();
- ProducerBenchmark.QuickStart().Wait();
+ // ProducerBenchmark.QuickStart().Wait();
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Client.cs b/csharp/rocketmq-client-csharp/Client.cs
index 1b3ff59d..a9bdb705 100644
--- a/csharp/rocketmq-client-csharp/Client.cs
+++ b/csharp/rocketmq-client-csharp/Client.cs
@@ -322,18 +322,18 @@ namespace Org.Apache.Rocketmq
Endpoints = Endpoints.ToProtobuf()
};
- var response =
+ var invocation =
await ClientManager.QueryRoute(Endpoints, request, ClientConfig.RequestTimeout);
- var code = response.Status.Code;
+ var code = invocation.Response.Status.Code;
if (!Proto.Code.Ok.Equals(code))
{
Logger.Error($"Failed to fetch topic route, clientId={ClientId}, topic={topic}, code={code}, " +
- $"statusMessage={response.Status.Message}");
+ $"statusMessage={invocation.Response.Status.Message}");
}
- StatusChecker.Check(response.Status, request);
+ StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
- var messageQueues = response.MessageQueues.ToList();
+ var messageQueues = invocation.Response.MessageQueues.ToList();
return new TopicRouteData(messageQueues);
}
@@ -343,21 +343,21 @@ namespace Org.Apache.Rocketmq
{
var endpoints = GetTotalRouteEndpoints();
var request = WrapHeartbeatRequest();
- Dictionary<Endpoints, Task<Proto.HeartbeatResponse>> responses = new();
+ Dictionary<Endpoints, Task<RpcInvocation<Proto.HeartbeatRequest, Proto.HeartbeatResponse>>> invocations = new();
// Collect task into a map.
foreach (var item in endpoints)
{
var task = ClientManager.Heartbeat(item, request, ClientConfig.RequestTimeout);
- responses[item] = task;
+ invocations[item] = task;
}
- foreach (var item in responses.Keys)
+ foreach (var item in invocations.Keys)
{
try
{
- var response = await responses[item];
- var code = response.Status.Code;
+ var invocation = await invocations[item];
+ var code = invocation.Response.Status.Code;
if (code.Equals(Proto.Code.Ok))
{
@@ -371,7 +371,7 @@ namespace Org.Apache.Rocketmq
return;
}
- var statusMessage = response.Status.Message;
+ var statusMessage = invocation.Response.Status.Message;
Logger.Info($"Failed to send heartbeat, endpoints={item}, code={code}, " +
$"statusMessage={statusMessage}, clientId={ClientId}");
}
@@ -404,10 +404,10 @@ namespace Org.Apache.Rocketmq
var request = WrapNotifyClientTerminationRequest();
foreach (var item in endpoints)
{
- var response = await ClientManager.NotifyClientTermination(item, request, ClientConfig.RequestTimeout);
+ var invocation = await ClientManager.NotifyClientTermination(item, request, ClientConfig.RequestTimeout);
try
{
- StatusChecker.Check(response.Status, request);
+ StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
}
catch (Exception e)
{
diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs
index c8a525a4..da46af30 100644
--- a/csharp/rocketmq-client-csharp/ClientManager.cs
+++ b/csharp/rocketmq-client-csharp/ClientManager.cs
@@ -98,59 +98,86 @@ namespace Org.Apache.Rocketmq
return GetRpcClient(endpoints).Telemetry(_client.Sign());
}
- public async Task<Proto.QueryRouteResponse> QueryRoute(Endpoints endpoints, Proto.QueryRouteRequest request,
- TimeSpan timeout)
+ public async Task<RpcInvocation<Proto.QueryRouteRequest, Proto.QueryRouteResponse>> QueryRoute(
+ Endpoints endpoints,
+ Proto.QueryRouteRequest request, TimeSpan timeout)
{
- return await GetRpcClient(endpoints).QueryRoute(_client.Sign(), request, timeout);
+ var metadata = _client.Sign();
+ var response = await GetRpcClient(endpoints).QueryRoute(metadata, request, timeout);
+ return new RpcInvocation<Proto.QueryRouteRequest, Proto.QueryRouteResponse>(request, response, metadata);
}
- public async Task<Proto.HeartbeatResponse> Heartbeat(Endpoints endpoints, Proto.HeartbeatRequest request,
- TimeSpan timeout)
+ public async Task<RpcInvocation<Proto.HeartbeatRequest, Proto.HeartbeatResponse>> Heartbeat(Endpoints endpoints,
+ Proto.HeartbeatRequest request, TimeSpan timeout)
{
- return await GetRpcClient(endpoints).Heartbeat(_client.Sign(), request, timeout);
+ var metadata = _client.Sign();
+ var response = await GetRpcClient(endpoints).Heartbeat(metadata, request, timeout);
+ return new RpcInvocation<Proto.HeartbeatRequest, Proto.HeartbeatResponse>(request, response, metadata);
}
- public async Task<Proto.NotifyClientTerminationResponse> NotifyClientTermination(Endpoints endpoints,
- Proto.NotifyClientTerminationRequest request, TimeSpan timeout)
+ public async Task<RpcInvocation<Proto.NotifyClientTerminationRequest, Proto.NotifyClientTerminationResponse>>
+ NotifyClientTermination(Endpoints endpoints, Proto.NotifyClientTerminationRequest request, TimeSpan timeout)
{
- return await GetRpcClient(endpoints).NotifyClientTermination(_client.Sign(), request, timeout);
+ var metadata = _client.Sign();
+ var response = await GetRpcClient(endpoints).NotifyClientTermination(metadata, request, timeout);
+ return new RpcInvocation<Proto.NotifyClientTerminationRequest, Proto.NotifyClientTerminationResponse>(
+ request, response, metadata);
}
- public async Task<Proto::SendMessageResponse> SendMessage(Endpoints endpoints,
- Proto::SendMessageRequest request,
- TimeSpan timeout)
+ public async Task<RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>> SendMessage(
+ Endpoints endpoints, Proto::SendMessageRequest request, TimeSpan timeout)
{
- return await GetRpcClient(endpoints).SendMessage(_client.Sign(), request, timeout);
+ var metadata = _client.Sign();
+ var response = await GetRpcClient(endpoints).SendMessage(metadata, request, timeout);
+ return new RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>(
+ request, response, metadata);
}
- public async Task<Proto::QueryAssignmentResponse> QueryAssignment(Endpoints endpoints,
- Proto.QueryAssignmentRequest request, TimeSpan timeout)
+ public async Task<RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse>> QueryAssignment(
+ Endpoints endpoints, Proto.QueryAssignmentRequest request, TimeSpan timeout)
{
- return await GetRpcClient(endpoints).QueryAssignment(_client.Sign(), request, timeout);
+ var metadata = _client.Sign();
+ var response = await GetRpcClient(endpoints).QueryAssignment(metadata, request, timeout);
+ return new RpcInvocation<Proto.QueryAssignmentRequest, Proto.QueryAssignmentResponse>(
+ request, response, metadata);
}
- public async Task<List<Proto::ReceiveMessageResponse>> ReceiveMessage(Endpoints endpoints,
- Proto.ReceiveMessageRequest request, TimeSpan timeout)
+ public async Task<RpcInvocation<Proto.ReceiveMessageRequest, List<Proto.ReceiveMessageResponse>>>
+ ReceiveMessage(Endpoints endpoints, Proto.ReceiveMessageRequest request, TimeSpan timeout)
{
- return await GetRpcClient(endpoints).ReceiveMessage(_client.Sign(), request, timeout);
+ var metadata = _client.Sign();
+ var response = await GetRpcClient(endpoints).ReceiveMessage(metadata, request, timeout);
+ return new RpcInvocation<Proto.ReceiveMessageRequest, List<Proto.ReceiveMessageResponse>>(
+ request, response, metadata);
}
- public async Task<Proto::AckMessageResponse> AckMessage(Endpoints endpoints,
- Proto.AckMessageRequest request, TimeSpan timeout)
+ public async Task<RpcInvocation<Proto.AckMessageRequest, Proto.AckMessageResponse>> AckMessage(
+ Endpoints endpoints, Proto.AckMessageRequest request, TimeSpan timeout)
{
- return await GetRpcClient(endpoints).AckMessage(_client.Sign(), request, timeout);
+ var metadata = _client.Sign();
+ var response = await GetRpcClient(endpoints).AckMessage(metadata, request, timeout);
+ return new RpcInvocation<Proto.AckMessageRequest, Proto.AckMessageResponse>(
+ request, response, metadata);
}
- public async Task<Proto::ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Endpoints endpoints,
- Proto.ChangeInvisibleDurationRequest request, TimeSpan timeout)
+ public async Task<RpcInvocation<Proto.ChangeInvisibleDurationRequest, Proto.ChangeInvisibleDurationResponse>>
+ ChangeInvisibleDuration(Endpoints endpoints,
+ Proto.ChangeInvisibleDurationRequest request, TimeSpan timeout)
{
- return await GetRpcClient(endpoints).ChangeInvisibleDuration(_client.Sign(), request, timeout);
+ var metadata = _client.Sign();
+ var response = await GetRpcClient(endpoints).ChangeInvisibleDuration(metadata, request, timeout);
+ return new RpcInvocation<Proto.ChangeInvisibleDurationRequest, Proto.ChangeInvisibleDurationResponse>(
+ request, response, metadata);
}
- public async Task<Proto.EndTransactionResponse> EndTransaction(Endpoints endpoints,
+ public async Task<RpcInvocation<Proto.EndTransactionRequest, Proto.EndTransactionResponse>> EndTransaction(
+ Endpoints endpoints,
Proto.EndTransactionRequest request, TimeSpan timeout)
{
- return await GetRpcClient(endpoints).EndTransaction(_client.Sign(), request, timeout);
+ var metadata = _client.Sign();
+ var response = await GetRpcClient(endpoints).EndTransaction(metadata, request, timeout);
+ return new RpcInvocation<Proto.EndTransactionRequest, Proto.EndTransactionResponse>(
+ request, response, metadata);
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Consumer.cs b/csharp/rocketmq-client-csharp/Consumer.cs
index 2ba1de1e..3444fbc0 100644
--- a/csharp/rocketmq-client-csharp/Consumer.cs
+++ b/csharp/rocketmq-client-csharp/Consumer.cs
@@ -41,14 +41,14 @@ namespace Org.Apache.Rocketmq
{
var tolerance = ClientConfig.RequestTimeout;
var timeout = tolerance.Add(awaitDuration);
- var response = await ClientManager.ReceiveMessage(mq.Broker.Endpoints, request, timeout);
+ var invocation = await ClientManager.ReceiveMessage(mq.Broker.Endpoints, request, timeout);
var status = new Proto.Status()
{
Code = Proto.Code.InternalServerError,
Message = "Status was not set by server"
};
var messageList = new List<Proto.Message>();
- foreach (var entry in response)
+ foreach (var entry in invocation.Response)
{
switch (entry.ContentCase)
{
@@ -66,7 +66,7 @@ namespace Org.Apache.Rocketmq
}
var messages = messageList.Select(message => MessageView.FromProtobuf(message, mq)).ToList();
- StatusChecker.Check(status, request);
+ StatusChecker.Check(status, request, invocation.RequestId);
return new ReceiveMessageResult(mq.Broker.Endpoints, messages);
}
diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs
index 2082584c..47af0280 100644
--- a/csharp/rocketmq-client-csharp/IClientManager.cs
+++ b/csharp/rocketmq-client-csharp/IClientManager.cs
@@ -39,7 +39,8 @@ namespace Org.Apache.Rocketmq
/// <param name="request">gRPC request of querying topic route.</param>
/// <param name="timeout">Request max duration.</param>
/// <returns>Task of response.</returns>
- Task<QueryRouteResponse> QueryRoute(Endpoints endpoints, QueryRouteRequest request, TimeSpan timeout);
+ Task<RpcInvocation<QueryRouteRequest, QueryRouteResponse>> QueryRoute(Endpoints endpoints,
+ QueryRouteRequest request, TimeSpan timeout);
/// <summary>
/// Send heartbeat to remote endpoints.
@@ -48,7 +49,8 @@ namespace Org.Apache.Rocketmq
/// <param name="request">gRPC request of heartbeat.</param>
/// <param name="timeout">Request max duration.</param>
/// <returns>Task of response.</returns>
- Task<HeartbeatResponse> Heartbeat(Endpoints endpoints, HeartbeatRequest request, TimeSpan timeout);
+ Task<RpcInvocation<HeartbeatRequest, HeartbeatResponse>> Heartbeat(Endpoints endpoints,
+ HeartbeatRequest request, TimeSpan timeout);
/// <summary>
/// Notify client's termination.
@@ -57,8 +59,8 @@ namespace Org.Apache.Rocketmq
/// <param name="request">gRPC request of notifying client's termination.</param>
/// <param name="timeout">Request max duration.</param>
/// <returns>Task of response.</returns>
- Task<NotifyClientTerminationResponse> NotifyClientTermination(Endpoints endpoints,
- NotifyClientTerminationRequest request, TimeSpan timeout);
+ Task<RpcInvocation<NotifyClientTerminationRequest, NotifyClientTerminationResponse>> NotifyClientTermination(
+ Endpoints endpoints, NotifyClientTerminationRequest request, TimeSpan timeout);
/// <summary>
/// Send message to remote endpoints.
@@ -67,22 +69,25 @@ namespace Org.Apache.Rocketmq
/// <param name="request"></param>
/// <param name="timeout"></param>
/// <returns></returns>
- Task<SendMessageResponse> SendMessage(Endpoints endpoints, SendMessageRequest request,
- TimeSpan timeout);
+ Task<RpcInvocation<SendMessageRequest, SendMessageResponse>> SendMessage(Endpoints endpoints,
+ SendMessageRequest request, TimeSpan timeout);
- Task<QueryAssignmentResponse> QueryAssignment(Endpoints endpoints, QueryAssignmentRequest request,
- TimeSpan timeout);
+ Task<RpcInvocation<QueryAssignmentRequest, QueryAssignmentResponse>> QueryAssignment(Endpoints endpoints,
+ QueryAssignmentRequest request, TimeSpan timeout);
- Task<List<ReceiveMessageResponse>> ReceiveMessage(Endpoints endpoints, ReceiveMessageRequest request,
+ Task<RpcInvocation<ReceiveMessageRequest, List<ReceiveMessageResponse>>> ReceiveMessage(Endpoints endpoints,
+ ReceiveMessageRequest request,
TimeSpan timeout);
- Task<AckMessageResponse> AckMessage(Endpoints endpoints, AckMessageRequest request, TimeSpan timeout);
+ Task<RpcInvocation<AckMessageRequest, AckMessageResponse>> AckMessage(Endpoints endpoints,
+ AckMessageRequest request, TimeSpan timeout);
- Task<ChangeInvisibleDurationResponse> ChangeInvisibleDuration(Endpoints endpoints,
+ Task<RpcInvocation<ChangeInvisibleDurationRequest, ChangeInvisibleDurationResponse>> ChangeInvisibleDuration(
+ Endpoints endpoints,
ChangeInvisibleDurationRequest request, TimeSpan timeout);
- Task<EndTransactionResponse> EndTransaction(Endpoints endpoints, EndTransactionRequest request,
- TimeSpan timeout);
+ Task<RpcInvocation<EndTransactionRequest, EndTransactionResponse>> EndTransaction(Endpoints endpoints,
+ EndTransactionRequest request, TimeSpan timeout);
Task Shutdown();
}
diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index 734a0902..6510909b 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -239,10 +239,10 @@ namespace Org.Apache.Rocketmq
var sendMessageRequest = WrapSendMessageRequest(message, mq);
var endpoints = mq.Broker.Endpoints;
- var response = await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
+ var invocation = await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
try
{
- var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, response);
+ var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, invocation);
var sendReceipt = sendReceipts.First();
if (attempt > 1)
@@ -260,9 +260,9 @@ namespace Org.Apache.Rocketmq
Isolated[endpoints] = true;
if (attempt >= maxAttempts)
{
- Logger.Error("Failed to send message finally, run out of attempt times, " +
- $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " +
- $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
+ Logger.Error(e, "Failed to send message finally, run out of attempt times, " +
+ $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " +
+ $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
throw;
}
@@ -325,8 +325,8 @@ namespace Org.Apache.Rocketmq
? Proto.TransactionResolution.Commit
: Proto.TransactionResolution.Rollback
};
- var response = await ClientManager.EndTransaction(endpoints, request, ClientConfig.RequestTimeout);
- StatusChecker.Check(response.Status, request);
+ var invocation = await ClientManager.EndTransaction(endpoints, request, ClientConfig.RequestTimeout);
+ StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
}
public class Builder
diff --git a/csharp/examples/QuickStart.cs b/csharp/rocketmq-client-csharp/RpcInvocation.cs
similarity index 64%
copy from csharp/examples/QuickStart.cs
copy to csharp/rocketmq-client-csharp/RpcInvocation.cs
index 291042d9..c8242993 100644
--- a/csharp/examples/QuickStart.cs
+++ b/csharp/rocketmq-client-csharp/RpcInvocation.cs
@@ -1,31 +1,39 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace examples
-{
- internal static class QuickStart
- {
- public static void Main()
- {
- // ProducerNormalMessageExample.QuickStart().Wait();
- // await ProducerFifoMessageExample.QuickStart();
- // await ProducerDelayMessageExample.QuickStart();
- // await SimpleConsumerExample.QuickStart();
- ProducerBenchmark.QuickStart().Wait();
- }
- }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Grpc.Core;
+
+namespace Org.Apache.Rocketmq
+{
+ public class RpcInvocation<T, U>
+ {
+ public RpcInvocation(T request, U response, Metadata metadata)
+ {
+ Request = request;
+ Response = response;
+ Metadata = metadata;
+ }
+
+ internal T Request { get; }
+
+ internal U Response { get; }
+
+ private Metadata Metadata { get; }
+
+ internal string RequestId => Metadata.GetValue(MetadataConstants.RequestIdKey);
+ }
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs b/csharp/rocketmq-client-csharp/SendReceipt.cs
index 6e7610c3..c9fe8014 100644
--- a/csharp/rocketmq-client-csharp/SendReceipt.cs
+++ b/csharp/rocketmq-client-csharp/SendReceipt.cs
@@ -44,10 +44,11 @@ namespace Org.Apache.Rocketmq
}
public static IEnumerable<SendReceipt> ProcessSendMessageResponse(MessageQueue mq,
- Proto.SendMessageResponse response)
+ RpcInvocation<Proto.SendMessageRequest, Proto.SendMessageResponse>
+ invocation)
{
- var status = response.Status;
- foreach (var entry in response.Entries)
+ var status = invocation.Response.Status;
+ foreach (var entry in invocation.Response.Entries)
{
if (Proto.Code.Ok.Equals(entry.Status.Code))
{
@@ -56,8 +57,8 @@ namespace Org.Apache.Rocketmq
}
// May throw exception.
- StatusChecker.Check(status, response);
- return response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList();
+ StatusChecker.Check(status, invocation.Request, invocation.RequestId);
+ return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList();
}
}
}
\ No newline at end of file
diff --git a/csharp/rocketmq-client-csharp/Signature.cs b/csharp/rocketmq-client-csharp/Signature.cs
index a1df5049..0572a062 100644
--- a/csharp/rocketmq-client-csharp/Signature.cs
+++ b/csharp/rocketmq-client-csharp/Signature.cs
@@ -40,6 +40,7 @@ namespace Org.Apache.Rocketmq
var clientConfig = client.GetClientConfig();
dictionary.Add(MetadataConstants.LanguageKey, MetadataConstants.LanguageValue);
dictionary.Add(MetadataConstants.ClientVersionKey, MetadataConstants.Instance.ClientVersion);
+ dictionary.Add(MetadataConstants.RequestIdKey, Guid.NewGuid().ToString());
dictionary.Add(MetadataConstants.ClientIdKey, client.GetClientId());
var time = DateTime.Now.ToString(MetadataConstants.DateTimeFormat);
diff --git a/csharp/rocketmq-client-csharp/SimpleConsumer.cs b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
index 953de163..a9c4713b 100644
--- a/csharp/rocketmq-client-csharp/SimpleConsumer.cs
+++ b/csharp/rocketmq-client-csharp/SimpleConsumer.cs
@@ -206,9 +206,9 @@ namespace Org.Apache.Rocketmq
}
var request = WrapChangeInvisibleDuration(messageView, invisibleDuration);
- var response = await ClientManager.ChangeInvisibleDuration(messageView.MessageQueue.Broker.Endpoints,
+ var invocation = await ClientManager.ChangeInvisibleDuration(messageView.MessageQueue.Broker.Endpoints,
request, ClientConfig.RequestTimeout);
- StatusChecker.Check(response.Status, request);
+ StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
}
@@ -220,9 +220,9 @@ namespace Org.Apache.Rocketmq
}
var request = WrapAckMessageRequest(messageView);
- var response = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
+ var invocation = await ClientManager.AckMessage(messageView.MessageQueue.Broker.Endpoints, request,
ClientConfig.RequestTimeout);
- StatusChecker.Check(response.Status, request);
+ StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
}
private Proto.AckMessageRequest WrapAckMessageRequest(MessageView messageView)
diff --git a/csharp/rocketmq-client-csharp/StatusChecker.cs b/csharp/rocketmq-client-csharp/StatusChecker.cs
index abc0b160..3fecfd95 100644
--- a/csharp/rocketmq-client-csharp/StatusChecker.cs
+++ b/csharp/rocketmq-client-csharp/StatusChecker.cs
@@ -26,12 +26,11 @@ namespace Org.Apache.Rocketmq
{
private static readonly Logger Logger = MqLogManager.Instance.GetCurrentClassLogger();
- public static void Check(Proto.Status status, IMessage message)
+ public static void Check(Proto.Status status, IMessage request, string requestId)
{
var statusCode = status.Code;
var statusMessage = status.Message;
- // TODO: add request-id.
switch (statusCode)
{
case Proto.Code.Ok:
@@ -56,15 +55,15 @@ namespace Org.Apache.Rocketmq
case Proto.Code.MessageCorrupted:
case Proto.Code.ClientIdRequired:
case Proto.Code.IllegalPollingTime:
- throw new BadRequestException((int)statusCode, statusMessage);
+ throw new BadRequestException((int)statusCode, requestId, statusMessage);
case Proto.Code.Unauthorized:
- throw new UnauthorizedException((int)statusCode, statusMessage);
+ throw new UnauthorizedException((int)statusCode, requestId, statusMessage);
case Proto.Code.PaymentRequired:
- throw new PaymentRequiredException((int)statusCode, statusMessage);
+ throw new PaymentRequiredException((int)statusCode, requestId, statusMessage);
case Proto.Code.Forbidden:
- throw new ForbiddenException((int)statusCode, statusMessage);
+ throw new ForbiddenException((int)statusCode, requestId, statusMessage);
case Proto.Code.MessageNotFound:
- if (message is Proto.ReceiveMessageRequest)
+ if (request is Proto.ReceiveMessageRequest)
{
return;
}
@@ -74,27 +73,27 @@ namespace Org.Apache.Rocketmq
case Proto.Code.NotFound:
case Proto.Code.TopicNotFound:
case Proto.Code.ConsumerGroupNotFound:
- throw new NotFoundException((int)statusCode, statusMessage);
+ throw new NotFoundException((int)statusCode, requestId, statusMessage);
case Proto.Code.PayloadTooLarge:
case Proto.Code.MessageBodyTooLarge:
- throw new PayloadTooLargeException((int)statusCode, statusMessage);
+ throw new PayloadTooLargeException((int)statusCode, requestId, statusMessage);
case Proto.Code.TooManyRequests:
- throw new TooManyRequestsException((int)statusCode, statusMessage);
+ throw new TooManyRequestsException((int)statusCode, requestId, statusMessage);
case Proto.Code.RequestHeaderFieldsTooLarge:
case Proto.Code.MessagePropertiesTooLarge:
- throw new RequestHeaderFieldsTooLargeException((int)statusCode, statusMessage);
+ throw new RequestHeaderFieldsTooLargeException((int)statusCode, requestId, statusMessage);
case Proto.Code.InternalError:
case Proto.Code.InternalServerError:
case Proto.Code.HaNotAvailable:
- throw new InternalErrorException((int)statusCode, statusMessage);
+ throw new InternalErrorException((int)statusCode, requestId, statusMessage);
case Proto.Code.ProxyTimeout:
case Proto.Code.MasterPersistenceTimeout:
case Proto.Code.SlavePersistenceTimeout:
- throw new ProxyTimeoutException((int)statusCode, statusMessage);
+ throw new ProxyTimeoutException((int)statusCode, requestId, statusMessage);
case Proto.Code.Unsupported:
case Proto.Code.VersionUnsupported:
case Proto.Code.VerifyFifoMessageUnsupported:
- throw new UnsupportedException((int)statusCode, statusMessage);
+ throw new UnsupportedException((int)statusCode, requestId, statusMessage);
// Not used code.
case Proto.Code.RequestTimeout:
case Proto.Code.PreconditionFailed:
@@ -102,8 +101,8 @@ namespace Org.Apache.Rocketmq
case Proto.Code.FailedToConsumeMessage:
case Proto.Code.Unspecified:
default:
- Logger.Warn($"Unrecognized status code={statusCode}, statusMessage={statusMessage}");
- throw new UnsupportedException((int)statusCode, statusMessage);
+ Logger.Warn($"Unrecognized status code={statusCode}, requestId={requestId}, statusMessage={statusMessage}");
+ throw new UnsupportedException((int)statusCode, requestId, statusMessage);
}
}
}