You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2020/10/20 07:44:08 UTC
[pulsar-dotpulsar] branch master updated: Don't wait for shutdown
using Console.ReadKey(). Using TaskCOmpletionSource instead.
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3c832e3 Don't wait for shutdown using Console.ReadKey(). Using TaskCOmpletionSource instead.
3c832e3 is described below
commit 3c832e315a6ea597fb84c03c88e2c87a99e42596
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Tue Oct 20 09:43:55 2020 +0200
Don't wait for shutdown using Console.ReadKey(). Using TaskCOmpletionSource instead.
---
samples/Consuming/Program.cs | 22 +++++++++++++++-------
samples/Producing/Program.cs | 26 ++++++++++++++++++--------
samples/Reading/Program.cs | 20 ++++++++++++++------
3 files changed, 47 insertions(+), 21 deletions(-)
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index d09ae3f..3bc70e9 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -29,6 +29,14 @@ namespace Consuming
{
const string myTopic = "persistent://public/default/mytopic";
+ var taskCompletionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ Console.CancelKeyPress += (sender, args) =>
+ {
+ taskCompletionSource.SetResult(null);
+ args.Cancel = true;
+ };
+
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
var consumer = client.NewConsumer()
@@ -42,17 +50,17 @@ namespace Consuming
var consuming = ConsumeMessages(consumer, cts.Token);
- Console.WriteLine("Press a key to exit");
+ Console.WriteLine("Press Ctrl+C to exit");
- _ = Console.ReadKey();
+ await taskCompletionSource.Task;
cts.Cancel();
- await consuming.ConfigureAwait(false);
+ await consuming;
- await consumer.DisposeAsync().ConfigureAwait(false);
+ await consumer.DisposeAsync();
- await monitoring.ConfigureAwait(false);
+ await monitoring;
}
private static async Task ConsumeMessages(IConsumer consumer, CancellationToken cancellationToken)
@@ -65,7 +73,7 @@ namespace Consuming
{
var data = Encoding.UTF8.GetString(message.Data.ToArray());
Console.WriteLine("Received: " + data);
- await consumer.Acknowledge(message, cancellationToken).ConfigureAwait(false);
+ await consumer.Acknowledge(message, cancellationToken);
}
}
catch (OperationCanceledException) { }
@@ -79,7 +87,7 @@ namespace Consuming
while (true)
{
- var stateChanged = await consumer.StateChangedFrom(state).ConfigureAwait(false);
+ var stateChanged = await consumer.StateChangedFrom(state);
state = stateChanged.ConsumerState;
var stateMessage = state switch
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 8db5b3d..7fe7d58 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -28,9 +28,19 @@ namespace Producing
{
const string myTopic = "persistent://public/default/mytopic";
+ var taskCompletionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ Console.CancelKeyPress += (sender, args) =>
+ {
+ taskCompletionSource.SetResult(null);
+ args.Cancel = true;
+ };
+
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
- var producer = client.NewProducer().Topic(myTopic).Create();
+ var producer = client.NewProducer()
+ .Topic(myTopic)
+ .Create();
var monitoring = Monitor(producer);
@@ -38,17 +48,17 @@ namespace Producing
var producing = ProduceMessages(producer, cts.Token);
- Console.WriteLine("Press a key to exit");
+ Console.WriteLine("Press Ctrl+C to exit");
- _ = Console.ReadKey();
+ await taskCompletionSource.Task;
cts.Cancel();
- await producing.ConfigureAwait(false);
+ await producing;
- await producer.DisposeAsync().ConfigureAwait(false);
+ await producer.DisposeAsync();
- await monitoring.ConfigureAwait(false);
+ await monitoring;
}
private static async Task ProduceMessages(IProducer producer, CancellationToken cancellationToken)
@@ -63,9 +73,9 @@ namespace Producing
{
var data = DateTime.UtcNow.ToLongTimeString();
var bytes = Encoding.UTF8.GetBytes(data);
- _ = await producer.Send(bytes, cancellationToken).ConfigureAwait(false);
+ _ = await producer.Send(bytes, cancellationToken);
Console.WriteLine("Sent: " + data);
- await Task.Delay(delay).ConfigureAwait(false);
+ await Task.Delay(delay, cancellationToken);
}
}
catch (OperationCanceledException) // If not using the cancellationToken, then just dispose the producer and catch ObjectDisposedException instead
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index 0769cdc..bb29458 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -29,6 +29,14 @@ namespace Reading
{
const string myTopic = "persistent://public/default/mytopic";
+ var taskCompletionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ Console.CancelKeyPress += (sender, args) =>
+ {
+ taskCompletionSource.SetResult(null);
+ args.Cancel = true;
+ };
+
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
var reader = client.NewReader()
@@ -42,17 +50,17 @@ namespace Reading
var reading = ReadMessages(reader, cts.Token);
- Console.WriteLine("Press a key to exit");
+ Console.WriteLine("Press Ctrl+C to exit");
- _ = Console.ReadKey();
+ await taskCompletionSource.Task;
cts.Cancel();
- await reading.ConfigureAwait(false);
+ await reading;
- await reader.DisposeAsync().ConfigureAwait(false);
+ await reader.DisposeAsync();
- await monitoring.ConfigureAwait(false);
+ await monitoring;
}
private static async Task ReadMessages(IReader reader, CancellationToken cancellationToken)
@@ -78,7 +86,7 @@ namespace Reading
while (true)
{
- var stateChanged = await reader.StateChangedFrom(state).ConfigureAwait(false);
+ var stateChanged = await reader.StateChangedFrom(state);
state = stateChanged.ReaderState;
var stateMessage = state switch