You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/25 09:25:40 UTC

[GitHub] [pulsar-dotpulsar] dionjansen opened a new pull request #47: Redeliver unacknowledged messages

dionjansen opened a new pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47


   Try exposing binary RedeliverUnacknowledgedMessages command in consumer API. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on a change in pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#discussion_r446645265



##########
File path: src/DotPulsar/Internal/Consumer.cs
##########
@@ -104,6 +107,12 @@ public async ValueTask AcknowledgeCumulative(Message message, CancellationToken
         public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
             => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
 
+        public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
+            => await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data).ToList(), cancellationToken);

Review comment:
       We need a .ConfigureAwait(false); here :-)

##########
File path: src/DotPulsar/Internal/Consumer.cs
##########
@@ -104,6 +107,12 @@ public async ValueTask AcknowledgeCumulative(Message message, CancellationToken
         public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
             => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
 
+        public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
+            => await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data).ToList(), cancellationToken);
+
+        public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken)
+            => await RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), cancellationToken);

Review comment:
       We need a .ConfigureAwait(false); here :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#issuecomment-650163312


   Hi @dionjansen 
   Thanks for the PR!
   I've added a few comments, let me know what you think :-)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#discussion_r446230418



##########
File path: samples/Producing/Program.cs
##########
@@ -14,88 +14,90 @@
 
 namespace Producing
 {
-    using DotPulsar;
-    using DotPulsar.Abstractions;
-    using DotPulsar.Extensions;
-    using System;
-    using System.Text;
-    using System.Threading;
-    using System.Threading.Tasks;
-
-    internal static class Program
+  using DotPulsar;
+  using DotPulsar.Abstractions;
+  using DotPulsar.Extensions;
+  using System;
+  using System.Text;
+  using System.Threading;
+  using System.Threading.Tasks;
+
+  internal static class Program
+  {
+    private static async Task Main(string[] args)
     {
-        private static async Task Main(string[] args)
-        {
-            const string myTopic = "persistent://public/default/mytopic";
+      const string myTopic = "persistent://public/default/mytopic";
 
-            await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
+      await using var client = PulsarClient.Builder()
+          .ServiceUrl(new Uri("pulsar://host.docker.internal:6650"))

Review comment:
       Agreed, will clean this up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#issuecomment-650746865


   Almost done now :-)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on a change in pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#discussion_r446158457



##########
File path: samples/Consuming/Program.cs
##########
@@ -14,91 +14,93 @@
 
 namespace Consuming
 {
-    using DotPulsar;
-    using DotPulsar.Abstractions;
-    using DotPulsar.Extensions;
-    using System;
-    using System.Buffers;
-    using System.Text;
-    using System.Threading;
-    using System.Threading.Tasks;
-
-    internal static class Program
+  using DotPulsar;
+  using DotPulsar.Abstractions;
+  using DotPulsar.Extensions;
+  using System;
+  using System.Buffers;
+  using System.Text;
+  using System.Threading;
+  using System.Threading.Tasks;
+
+  internal static class Program
+  {
+    private static async Task Main(string[] args)
     {
-        private static async Task Main(string[] args)
-        {
-            const string myTopic = "persistent://public/default/mytopic";
+      const string myTopic = "persistent://public/default/mytopic";
 
-            await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
+      await using var client = PulsarClient.Builder()
+        .ServiceUrl(new Uri("pulsar://host.docker.internal:6650"))

Review comment:
       Correct me if I'm wrong, but all the changes to this file can be discarded as they are style changes and the one change here is (the hostname) is not something needed for this feature to work.

##########
File path: src/DotPulsar/Internal/Connection.cs
##########
@@ -97,6 +97,9 @@ public Task Send(CommandAck command, CancellationToken cancellationToken)
         public Task Send(CommandFlow command, CancellationToken cancellationToken)
             => Send(command.AsBaseCommand(), cancellationToken);
 
+        public Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
+          => Send(command.AsBaseCommand(), cancellationToken);

Review comment:
       It looks like your IDE has a different style setting since it only indents with 2 spaces.

##########
File path: samples/Producing/Program.cs
##########
@@ -14,88 +14,90 @@
 
 namespace Producing
 {
-    using DotPulsar;
-    using DotPulsar.Abstractions;
-    using DotPulsar.Extensions;
-    using System;
-    using System.Text;
-    using System.Threading;
-    using System.Threading.Tasks;
-
-    internal static class Program
+  using DotPulsar;
+  using DotPulsar.Abstractions;
+  using DotPulsar.Extensions;
+  using System;
+  using System.Text;
+  using System.Threading;
+  using System.Threading.Tasks;
+
+  internal static class Program
+  {
+    private static async Task Main(string[] args)
     {
-        private static async Task Main(string[] args)
-        {
-            const string myTopic = "persistent://public/default/mytopic";
+      const string myTopic = "persistent://public/default/mytopic";
 
-            await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
+      await using var client = PulsarClient.Builder()
+          .ServiceUrl(new Uri("pulsar://host.docker.internal:6650"))

Review comment:
       Correct me if I'm wrong, but all the changes to this file can be discarded as they are style changes and the one change here is (the hostname) is not something needed for this feature to work.

##########
File path: src/DotPulsar/Abstractions/IConsumer.cs
##########
@@ -106,5 +106,15 @@ public interface IConsumer : IAsyncDisposable
         /// Unsubscribe the consumer.
         /// </summary>
         ValueTask Unsubscribe(CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Redeliver the pending messages that were pushed to this consumer that are not yet acknowledged.
+        /// </summary>
+        ValueTask RedeliverUnacknowledgedMessages(List<MessageId> messageIds, CancellationToken cancellationToken);

Review comment:
       Public API's should receive/return IEnumerable<T> instead of concrete collection types like List<T>

##########
File path: src/DotPulsar/Internal/Consumer.cs
##########
@@ -104,6 +107,12 @@ public async ValueTask AcknowledgeCumulative(Message message, CancellationToken
         public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
             => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
 
+        public async ValueTask RedeliverUnacknowledgedMessages(List<MessageId> messageIds, CancellationToken cancellationToken)
+            => await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data).ToList(), cancellationToken);
+
+        public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken)
+            => await RedeliverUnacknowledgedMessages(new List<MessageIdData>(), cancellationToken);

Review comment:
       Use Enumerable.Empty<MessageIdData>() instead or creating a new list.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#discussion_r446230216



##########
File path: samples/Consuming/Program.cs
##########
@@ -14,91 +14,93 @@
 
 namespace Consuming
 {
-    using DotPulsar;
-    using DotPulsar.Abstractions;
-    using DotPulsar.Extensions;
-    using System;
-    using System.Buffers;
-    using System.Text;
-    using System.Threading;
-    using System.Threading.Tasks;
-
-    internal static class Program
+  using DotPulsar;
+  using DotPulsar.Abstractions;
+  using DotPulsar.Extensions;
+  using System;
+  using System.Buffers;
+  using System.Text;
+  using System.Threading;
+  using System.Threading.Tasks;
+
+  internal static class Program
+  {
+    private static async Task Main(string[] args)
     {
-        private static async Task Main(string[] args)
-        {
-            const string myTopic = "persistent://public/default/mytopic";
+      const string myTopic = "persistent://public/default/mytopic";
 
-            await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
+      await using var client = PulsarClient.Builder()
+        .ServiceUrl(new Uri("pulsar://host.docker.internal:6650"))

Review comment:
       Agreed, will clean this up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#discussion_r446670611



##########
File path: src/DotPulsar/Internal/Consumer.cs
##########
@@ -104,6 +107,12 @@ public async ValueTask AcknowledgeCumulative(Message message, CancellationToken
         public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
             => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
 
+        public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
+            => await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data).ToList(), cancellationToken);
+
+        public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken)
+            => await RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), cancellationToken);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#discussion_r446242805



##########
File path: src/DotPulsar/Internal/Connection.cs
##########
@@ -97,6 +97,9 @@ public Task Send(CommandAck command, CancellationToken cancellationToken)
         public Task Send(CommandFlow command, CancellationToken cancellationToken)
             => Send(command.AsBaseCommand(), cancellationToken);
 
+        public Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
+          => Send(command.AsBaseCommand(), cancellationToken);

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner merged pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
blankensteiner merged pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#discussion_r446243639



##########
File path: src/DotPulsar/Abstractions/IConsumer.cs
##########
@@ -106,5 +106,15 @@ public interface IConsumer : IAsyncDisposable
         /// Unsubscribe the consumer.
         /// </summary>
         ValueTask Unsubscribe(CancellationToken cancellationToken = default);
+
+        /// <summary>
+        /// Redeliver the pending messages that were pushed to this consumer that are not yet acknowledged.
+        /// </summary>
+        ValueTask RedeliverUnacknowledgedMessages(List<MessageId> messageIds, CancellationToken cancellationToken);

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
dionjansen commented on pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#issuecomment-650537684


   @blankensteiner cleaned up unnecessary changes and updated with your suggestions, let me know what you think!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on a change in pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
dionjansen commented on a change in pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#discussion_r446509650



##########
File path: src/DotPulsar/Internal/Consumer.cs
##########
@@ -104,6 +107,12 @@ public async ValueTask AcknowledgeCumulative(Message message, CancellationToken
         public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
             => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
 
+        public async ValueTask RedeliverUnacknowledgedMessages(List<MessageId> messageIds, CancellationToken cancellationToken)
+            => await RedeliverUnacknowledgedMessages(messageIds.Select(m => m.Data).ToList(), cancellationToken);
+
+        public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken)
+            => await RedeliverUnacknowledgedMessages(new List<MessageIdData>(), cancellationToken);

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
dionjansen commented on pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#issuecomment-650789784


   @blankensteiner my bad, added the configureAwaits for the consumer methods, let me know if there are any other issues :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] blankensteiner commented on pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
blankensteiner commented on pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#issuecomment-650790980


   Thanks for the PR! :-)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-dotpulsar] dionjansen commented on pull request #47: Redeliver unacknowledged messages

Posted by GitBox <gi...@apache.org>.
dionjansen commented on pull request #47:
URL: https://github.com/apache/pulsar-dotpulsar/pull/47#issuecomment-649420395


   @blankensteiner hi I tried implementing one part of #45 which is exposing this [redeliverunacknowledgedmessages](https://pulsar.apache.org/docs/en/develop-binary-protocol/#command-redeliverunacknowledgedmessages). Please let me know if this is in the right direction.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org