You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/02/23 15:15:01 UTC
[rocketmq-clients] 21/28: Make simpleconsumer works
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit ebd236ad94270407f90b105393e6ed58ce354ec8
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Feb 20 19:55:29 2023 +0800
Make simpleconsumer works
---
csharp/rocketmq-client-csharp/Consumer.cs | 11 ++++++++---
csharp/rocketmq-client-csharp/MessageView.cs | 12 +++++++-----
2 files changed, 15 insertions(+), 8 deletions(-)
diff --git a/csharp/rocketmq-client-csharp/Consumer.cs b/csharp/rocketmq-client-csharp/Consumer.cs
index 9a58104f..d5a1d9a5 100644
--- a/csharp/rocketmq-client-csharp/Consumer.cs
+++ b/csharp/rocketmq-client-csharp/Consumer.cs
@@ -33,7 +33,7 @@ namespace Org.Apache.Rocketmq
{
ConsumerGroup = consumerGroup;
}
-
+
protected async Task<ReceiveMessageResult> ReceiveMessage(Proto.ReceiveMessageRequest request, MessageQueue mq,
TimeSpan awaitDuration)
{
@@ -83,11 +83,16 @@ namespace Org.Apache.Rocketmq
};
}
- protected static Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
+ protected Proto.ReceiveMessageRequest WrapReceiveMessageRequest(int batchSize, MessageQueue mq,
FilterExpression filterExpression, TimeSpan invisibleDuration)
{
- return new Proto.ReceiveMessageRequest()
+ var group = new Proto.Resource
+ {
+ Name = ConsumerGroup
+ };
+ return new Proto.ReceiveMessageRequest
{
+ Group = group,
MessageQueue = mq.ToProtobuf(),
FilterExpression = WrapFilterExpression(filterExpression),
BatchSize = batchSize,
diff --git a/csharp/rocketmq-client-csharp/MessageView.cs b/csharp/rocketmq-client-csharp/MessageView.cs
index fd095819..b88c40b4 100644
--- a/csharp/rocketmq-client-csharp/MessageView.cs
+++ b/csharp/rocketmq-client-csharp/MessageView.cs
@@ -37,7 +37,7 @@ namespace Org.Apache.Rocketmq
private readonly bool _corrupted;
internal MessageView(string messageId, string topic, byte[] body, string tag, string messageGroup,
- DateTime deliveryTime, List<string> keys, Dictionary<string, string> properties, string bornHost,
+ DateTime? deliveryTimestamp, List<string> keys, Dictionary<string, string> properties, string bornHost,
DateTime bornTime, int deliveryAttempt, MessageQueue messageQueue, string receiptHandle, long offset,
bool corrupted)
{
@@ -46,7 +46,7 @@ namespace Org.Apache.Rocketmq
Body = body;
Tag = tag;
MessageGroup = messageGroup;
- DeliveryTime = deliveryTime;
+ DeliveryTimestamp = deliveryTimestamp;
Keys = keys;
Properties = properties;
BornHost = bornHost;
@@ -68,7 +68,7 @@ namespace Org.Apache.Rocketmq
public string MessageGroup { get; }
- public DateTime DeliveryTime { get; }
+ public DateTime? DeliveryTimestamp { get; }
public List<string> Keys { get; }
@@ -161,7 +161,9 @@ namespace Org.Apache.Rocketmq
var tag = systemProperties.HasTag ? systemProperties.Tag : null;
var messageGroup = systemProperties.HasMessageGroup ? systemProperties.MessageGroup : null;
- var deliveryTime = systemProperties.DeliveryTimestamp.ToDateTime();
+ DateTime? deliveryTime = null == systemProperties.DeliveryTimestamp
+ ? null
+ : systemProperties.DeliveryTimestamp.ToDateTime();
var keys = systemProperties.Keys.ToList();
var bornHost = systemProperties.BornHost;
@@ -184,7 +186,7 @@ namespace Org.Apache.Rocketmq
{
return
$"{nameof(MessageId)}: {MessageId}, {nameof(Topic)}: {Topic}, {nameof(Tag)}: {Tag}," +
- $" {nameof(MessageGroup)}: {MessageGroup}, {nameof(DeliveryTime)}: {DeliveryTime}," +
+ $" {nameof(MessageGroup)}: {MessageGroup}, {nameof(DeliveryTimestamp)}: {DeliveryTimestamp}," +
$" {nameof(Keys)}: {Keys}, {nameof(Properties)}: {Properties}, {nameof(BornHost)}: {BornHost}, " +
$"{nameof(BornTime)}: {BornTime}, {nameof(DeliveryAttempt)}: {DeliveryAttempt}";
}