You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/10/20 07:44:08 UTC

[pulsar-dotpulsar] branch master updated: Don't wait for shutdown using Console.ReadKey(). Using TaskCOmpletionSource instead.

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c832e3  Don't wait for shutdown using Console.ReadKey(). Using TaskCOmpletionSource instead.
3c832e3 is described below

commit 3c832e315a6ea597fb84c03c88e2c87a99e42596
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Tue Oct 20 09:43:55 2020 +0200

    Don't wait for shutdown using Console.ReadKey(). Using TaskCOmpletionSource instead.
---
 samples/Consuming/Program.cs | 22 +++++++++++++++-------
 samples/Producing/Program.cs | 26 ++++++++++++++++++--------
 samples/Reading/Program.cs   | 20 ++++++++++++++------
 3 files changed, 47 insertions(+), 21 deletions(-)

diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index d09ae3f..3bc70e9 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -29,6 +29,14 @@ namespace Consuming
         {
             const string myTopic = "persistent://public/default/mytopic";
 
+            var taskCompletionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+            Console.CancelKeyPress += (sender, args) =>
+            {
+                taskCompletionSource.SetResult(null);
+                args.Cancel = true;
+            };
+
             await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
 
             var consumer = client.NewConsumer()
@@ -42,17 +50,17 @@ namespace Consuming
 
             var consuming = ConsumeMessages(consumer, cts.Token);
 
-            Console.WriteLine("Press a key to exit");
+            Console.WriteLine("Press Ctrl+C to exit");
 
-            _ = Console.ReadKey();
+            await taskCompletionSource.Task;
 
             cts.Cancel();
 
-            await consuming.ConfigureAwait(false);
+            await consuming;
 
-            await consumer.DisposeAsync().ConfigureAwait(false);
+            await consumer.DisposeAsync();
 
-            await monitoring.ConfigureAwait(false);
+            await monitoring;
         }
 
         private static async Task ConsumeMessages(IConsumer consumer, CancellationToken cancellationToken)
@@ -65,7 +73,7 @@ namespace Consuming
                 {
                     var data = Encoding.UTF8.GetString(message.Data.ToArray());
                     Console.WriteLine("Received: " + data);
-                    await consumer.Acknowledge(message, cancellationToken).ConfigureAwait(false);
+                    await consumer.Acknowledge(message, cancellationToken);
                 }
             }
             catch (OperationCanceledException) { }
@@ -79,7 +87,7 @@ namespace Consuming
 
             while (true)
             {
-                var stateChanged = await consumer.StateChangedFrom(state).ConfigureAwait(false);
+                var stateChanged = await consumer.StateChangedFrom(state);
                 state = stateChanged.ConsumerState;
 
                 var stateMessage = state switch
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 8db5b3d..7fe7d58 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -28,9 +28,19 @@ namespace Producing
         {
             const string myTopic = "persistent://public/default/mytopic";
 
+            var taskCompletionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+            Console.CancelKeyPress += (sender, args) =>
+            {
+                taskCompletionSource.SetResult(null);
+                args.Cancel = true;
+            };
+
             await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
 
-            var producer = client.NewProducer().Topic(myTopic).Create();
+            var producer = client.NewProducer()
+                .Topic(myTopic)
+                .Create();
 
             var monitoring = Monitor(producer);
 
@@ -38,17 +48,17 @@ namespace Producing
 
             var producing = ProduceMessages(producer, cts.Token);
 
-            Console.WriteLine("Press a key to exit");
+            Console.WriteLine("Press Ctrl+C to exit");
 
-            _ = Console.ReadKey();
+            await taskCompletionSource.Task;
 
             cts.Cancel();
 
-            await producing.ConfigureAwait(false);
+            await producing;
 
-            await producer.DisposeAsync().ConfigureAwait(false);
+            await producer.DisposeAsync();
 
-            await monitoring.ConfigureAwait(false);
+            await monitoring;
         }
 
         private static async Task ProduceMessages(IProducer producer, CancellationToken cancellationToken)
@@ -63,9 +73,9 @@ namespace Producing
                 {
                     var data = DateTime.UtcNow.ToLongTimeString();
                     var bytes = Encoding.UTF8.GetBytes(data);
-                    _ = await producer.Send(bytes, cancellationToken).ConfigureAwait(false);
+                    _ = await producer.Send(bytes, cancellationToken);
                     Console.WriteLine("Sent: " + data);
-                    await Task.Delay(delay).ConfigureAwait(false);
+                    await Task.Delay(delay, cancellationToken);
                 }
             }
             catch (OperationCanceledException) // If not using the cancellationToken, then just dispose the producer and catch ObjectDisposedException instead
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index 0769cdc..bb29458 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -29,6 +29,14 @@ namespace Reading
         {
             const string myTopic = "persistent://public/default/mytopic";
 
+            var taskCompletionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+            Console.CancelKeyPress += (sender, args) =>
+            {
+                taskCompletionSource.SetResult(null);
+                args.Cancel = true;
+            };
+
             await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
 
             var reader = client.NewReader()
@@ -42,17 +50,17 @@ namespace Reading
 
             var reading = ReadMessages(reader, cts.Token);
 
-            Console.WriteLine("Press a key to exit");
+            Console.WriteLine("Press Ctrl+C to exit");
 
-            _ = Console.ReadKey();
+            await taskCompletionSource.Task;
 
             cts.Cancel();
 
-            await reading.ConfigureAwait(false);
+            await reading;
 
-            await reader.DisposeAsync().ConfigureAwait(false);
+            await reader.DisposeAsync();
 
-            await monitoring.ConfigureAwait(false);
+            await monitoring;
         }
 
         private static async Task ReadMessages(IReader reader, CancellationToken cancellationToken)
@@ -78,7 +86,7 @@ namespace Reading
 
             while (true)
             {
-                var stateChanged = await reader.StateChangedFrom(state).ConfigureAwait(false);
+                var stateChanged = await reader.StateChangedFrom(state);
                 state = stateChanged.ReaderState;
 
                 var stateMessage = state switch