You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "toneill818 (via GitHub)" <gi...@apache.org> on 2023/03/17 18:30:14 UTC

[GitHub] [pulsar-dotpulsar] toneill818 opened a new pull request, #146: Partitioned Topic Consumer Support

toneill818 opened a new pull request, #146:
URL: https://github.com/apache/pulsar-dotpulsar/pull/146

   Fixes #141
   
   # Description
   
   There are a few open PRs to support consuming partition topics by @RobertIndie and @usaguerrilla but they do not seem to be actively worked on. Per Issue #141 this PR adds support for consuming partitioned topics and is also backwards compatible as there were not interface or type changes. If this PR is accepted and merged, it should be easy to extend the consumer to support multiple topics as mentioned in issue #5 and the implementation was started in PR #86.
   
   **If Merged:**
   Close PR #49
   Close PR #81
   Close Issue #141
   
   # Testing
   
   `dotnet test` passed
   Ran against pulsar 2.11 with auto create partitioned topics set to create 3 partitions.
   Ran against pulsar 2.11 without auto create partition (regular topics) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-dotpulsar] toneill818 commented on pull request #146: Partitioned Topic Consumer Support

Posted by "toneill818 (via GitHub)" <gi...@apache.org>.
toneill818 commented on PR #146:
URL: https://github.com/apache/pulsar-dotpulsar/pull/146#issuecomment-1520060415

   @blankensteiner Go ahead and create a new release. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-dotpulsar] toneill818 commented on pull request #146: Partitioned Topic Consumer Support

Posted by "toneill818 (via GitHub)" <gi...@apache.org>.
toneill818 commented on PR #146:
URL: https://github.com/apache/pulsar-dotpulsar/pull/146#issuecomment-1551227664

   @entvex That works for me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-dotpulsar] toneill818 commented on pull request #146: Partitioned Topic Consumer Support

Posted by "toneill818 (via GitHub)" <gi...@apache.org>.
toneill818 commented on PR #146:
URL: https://github.com/apache/pulsar-dotpulsar/pull/146#issuecomment-1487181573

   @blankensteiner I can look into adding a partitioned reader. Would you like it included in this PR or create another PR to reduce the amount of change within the PR?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-dotpulsar] blankensteiner merged pull request #146: Partitioned Topic Consumer Support

Posted by "blankensteiner (via GitHub)" <gi...@apache.org>.
blankensteiner merged PR #146:
URL: https://github.com/apache/pulsar-dotpulsar/pull/146


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-dotpulsar] entvex commented on pull request #146: Partitioned Topic Consumer Support

Posted by "entvex (via GitHub)" <gi...@apache.org>.
entvex commented on PR #146:
URL: https://github.com/apache/pulsar-dotpulsar/pull/146#issuecomment-1550968008

   Dear @toneill818,
   I am working with @blankensteiner. I wanted to ask if you would be open to the idea of we're merging the Partitioned Topic Consumer that you created. Then I could take on the task of creating the reader?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-dotpulsar] blankensteiner commented on pull request #146: Partitioned Topic Consumer Support

Posted by "blankensteiner (via GitHub)" <gi...@apache.org>.
blankensteiner commented on PR #146:
URL: https://github.com/apache/pulsar-dotpulsar/pull/146#issuecomment-1491338572

   @toneill818 That sounds great, thanks! :-)
   Having it as part of this PR would be nice.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-dotpulsar] blankensteiner commented on pull request #146: Partitioned Topic Consumer Support

Posted by "blankensteiner (via GitHub)" <gi...@apache.org>.
blankensteiner commented on PR #146:
URL: https://github.com/apache/pulsar-dotpulsar/pull/146#issuecomment-1519443474

   Hi @toneill818 Just wanted to know if you have an ETA? (if I should create a new release and wait for this PR if you are close to finishing) :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar-dotpulsar] blankensteiner commented on a diff in pull request #146: Partitioned Topic Consumer Support

Posted by "blankensteiner (via GitHub)" <gi...@apache.org>.
blankensteiner commented on code in PR #146:
URL: https://github.com/apache/pulsar-dotpulsar/pull/146#discussion_r1145897117


##########
samples/Consuming/Program.cs:
##########
@@ -52,8 +52,9 @@ private static async Task ConsumeMessages(IConsumer<string> consumer, Cancellati
         {
             await foreach (var message in consumer.Messages(cancellationToken))
             {
-                Console.WriteLine($"Received: {message.Value()}");
-                await consumer.Acknowledge(message, cancellationToken);
+                Console.WriteLine($"Received: {message?.Value()}");

Review Comment:
   message is never null



##########
src/DotPulsar/DotPulsar.csproj:
##########
@@ -25,6 +25,7 @@
     <PackageReference Include="HashDepot" Version="2.0.3" />
     <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="7.0.4" />
     <PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
+    <PackageReference Include="Nito.AsyncEx" Version="5.1.2" />

Review Comment:
   Let's do this without onboarding a new dependency



##########
src/DotPulsar/Internal/Consumer.cs:
##########
@@ -86,140 +96,407 @@ public async ValueTask DisposeAsync()
         if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
             return;
 
-        _eventRegister.Register(new ConsumerDisposed(_correlationId));
-        await DisposeChannel().ConfigureAwait(false);
+        _cts.Cancel();
+        _cts.Dispose();
+
+        _state.SetState(ConsumerState.Closed);
+
+        using (_lock.ReaderLock())
+        {
+            foreach (var consumer in _consumers.Values)
+            {
+                await consumer.DisposeAsync().ConfigureAwait(false);
+            }
+        }
     }
 
-    private async ValueTask DisposeChannel()
+    private async Task Setup()
     {
-        await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
-        await _channel.DisposeAsync().ConfigureAwait(false);
-    }
+        await Task.Yield();
 
-    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken)
-        => await _executor.Execute(() => InternalReceive(cancellationToken), cancellationToken).ConfigureAwait(false);
+        try
+        {
+            await _executor.Execute(Monitor, _cts.Token).ConfigureAwait(false);
+        }
+        catch (Exception exception)
+        {
+            if (_cts.IsCancellationRequested)
+                return;
 
-    public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken)
-        => await InternalAcknowledge(messageId, CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
+            _faultException = exception;
+            _state.SetState(ConsumerState.Faulted);
+        }
+    }
+    private async Task Monitor()
+    {
+        _numberOfPartitions = await GetNumberOfPartitions(Topic, _cts.Token).ConfigureAwait(false);
+        _isPartitioned = _numberOfPartitions != 0;
+        var monitoringTasks = new List<Task<ConsumerStateChanged>>();
 
-    public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
-        => await InternalAcknowledge(messageId, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
+        using (_lock.ReaderLock())
+        {
+            if (_isPartitioned)
+            {
+
+                for (var partition = 0; partition < _numberOfPartitions; ++partition)
+                {
+                    var partitionedTopicName = getPartitonedTopicName(partition);
+
+                    var consumer = CreateSubConsumer(partitionedTopicName);
+                    _ = _consumers.TryAdd(partitionedTopicName, consumer);
+                    monitoringTasks.Add(consumer.StateChangedFrom(ConsumerState.Disconnected, _cts.Token).AsTask());
+                }
+            }
+
+            else
+            {
+                var consumer = CreateSubConsumer(Topic);
+                _ = _consumers.TryAdd(Topic, consumer);
+                monitoringTasks.Add(consumer.StateChangedFrom(ConsumerState.Disconnected, _cts.Token).AsTask());
+            }
+
+            Interlocked.Exchange(ref _consumerCount, monitoringTasks.Count);
+        }
+        var activeConsumers = 0;
+        while (true)
+        {
+            await Task.WhenAny(monitoringTasks).ConfigureAwait(false);
+
+            for (var i = 0; i < monitoringTasks.Count; ++i)
+            {
+                var task = monitoringTasks[i];
+                if (!task.IsCompleted)
+                    continue;
+                var state = task.Result.ConsumerState;
+                switch (state)
+                {
+                    case ConsumerState.Active:
+                        ++activeConsumers;
+                        break;
+                    case ConsumerState.Disconnected:
+                        --activeConsumers;
+                        break;
+                    case ConsumerState.ReachedEndOfTopic:
+                        _state.SetState(ConsumerState.ReachedEndOfTopic);
+                        return;
+                    case ConsumerState.Faulted:
+                        _state.SetState(ConsumerState.Faulted);
+                        return;
+                    case ConsumerState.Unsubscribed:
+                        _state.SetState(ConsumerState.Unsubscribed);
+                        return;
+                }
+
+                monitoringTasks[i] = task.Result.Consumer.StateChangedFrom(state, _cts.Token).AsTask();
+            }
+
+            if (activeConsumers == 0)
+                _state.SetState(ConsumerState.Disconnected);
+            else if (activeConsumers == monitoringTasks.Count)
+                _state.SetState(ConsumerState.Active);
+            else
+                _state.SetState(ConsumerState.PartiallyActive);
+        }
+    }
 
-    public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
+    private SubConsumer<TMessage> CreateSubConsumer(string topic)
     {
-        var command = new CommandRedeliverUnacknowledgedMessages();
-        command.MessageIds.AddRange(messageIds.Select(messageId => messageId.ToMessageIdData()));
-        await _executor.Execute(() => InternalRedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false);
-    }
+        var correlationId = Guid.NewGuid();
+        var consumerName = _options.ConsumerName ?? $"Consumer-{correlationId:N}";
 
-    public async ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken)
-        => await RedeliverUnacknowledgedMessages(Enumerable.Empty<MessageId>(), cancellationToken).ConfigureAwait(false);
+        var subscribe = new CommandSubscribe
+        {
+            ConsumerName = consumerName,
+            InitialPosition = (CommandSubscribe.InitialPositionType) _options.InitialPosition,
+            PriorityLevel = _options.PriorityLevel,
+            ReadCompacted = _options.ReadCompacted,
+            ReplicateSubscriptionState = _options.ReplicateSubscriptionState,
+            Subscription = _options.SubscriptionName,
+            Topic = topic,
+            Type = (CommandSubscribe.SubType) _options.SubscriptionType
+        };
+
+        foreach (var property in _options.SubscriptionProperties)
+        {
+            var keyValue = new KeyValue { Key = property.Key, Value = property.Value };
+            subscribe.SubscriptionProperties.Add(keyValue);
+        }
 
-    public async ValueTask Unsubscribe(CancellationToken cancellationToken)
+        var messagePrefetchCount = _options.MessagePrefetchCount;
+        var messageFactory = new MessageFactory<TMessage>(_options.Schema);
+        var batchHandler = new BatchHandler<TMessage>(true, messageFactory);
+        var decompressorFactories = CompressionFactories.DecompressorFactories();
+
+        var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
+        var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
+        var initialChannel = new NotReadyChannel<TMessage>();
+        var executor = new Executor(correlationId, _processManager, _exceptionHandler);
+        var consumer = new SubConsumer<TMessage>(correlationId, ServiceUrl, _options.SubscriptionName, topic, _processManager, initialChannel, executor, stateManager, factory);
+        var process = new ConsumerProcess(correlationId, stateManager, consumer, _options.SubscriptionType == SubscriptionType.Failover);
+        _processManager.Add(process);
+        process.Start();
+        return consumer;
+    }
+
+    private async Task<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken)
     {
-        var unsubscribe = new CommandUnsubscribe();
-        await _executor.Execute(() => InternalUnsubscribe(unsubscribe, cancellationToken), cancellationToken).ConfigureAwait(false);
+        var connection = await _connectionPool.FindConnectionForTopic(topic, cancellationToken).ConfigureAwait(false);
+        var commandPartitionedMetadata = new PulsarApi.CommandPartitionedTopicMetadata { Topic = topic };
+        var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
+
+        response.Expect(PulsarApi.BaseCommand.Type.PartitionedMetadataResponse);
+
+        if (response.PartitionMetadataResponse.Response == PulsarApi.CommandPartitionedTopicMetadataResponse.LookupType.Failed)
+            response.PartitionMetadataResponse.Throw();
+
+        return response.PartitionMetadataResponse.Partitions;
     }
 
-    public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
+    private void ThrowIfDisposed()
+    {
+        if (_isDisposed != 0)
+            throw new ConsumerDisposedException(GetType().FullName!);
+    }
+    public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken = default)
     {
-        var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
-        await _executor.Execute(() => InternalSeek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+        ThrowIfDisposed();
+
+        var sourceTopic = Topic;
+        if (_isPartitioned)
+        {
+            sourceTopic = getPartitonedTopicName(messageId.Partition);
+        }
+        await _executor.Execute(() =>
+                {
+                    ThrowIfNotActive();
+
+                    using (_lock.ReaderLock())
+                    {
+                        return _consumers[sourceTopic].Acknowledge(messageId, cancellationToken);
+                    }
+                }, cancellationToken)
+                .ConfigureAwait(false);
     }
 
-    public async ValueTask Seek(ulong publishTime, CancellationToken cancellationToken)
+    public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
     {
-        var seek = new CommandSeek { MessagePublishTime = publishTime };
-        await _executor.Execute(() => InternalSeek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+        ThrowIfDisposed();
+
+        await _executor.Execute(() =>
+            {
+                ThrowIfNotActive();
+
+                using (_lock.ReaderLock())
+                {
+                    var sourceTopic = Topic;
+                    if (_isPartitioned)
+                    {
+                        sourceTopic = getPartitonedTopicName(messageId.Partition);
+                    }
+                    return _consumers[sourceTopic].AcknowledgeCumulative(messageId, cancellationToken);
+                }
+            }, cancellationToken)
+            .ConfigureAwait(false);
     }
 
     public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken)
     {
+        ThrowIfDisposed();
+
         var getLastMessageId = new CommandGetLastMessageId();
-        return await _executor.Execute(() => InternalGetLastMessageId(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false);
+        return await _executor.Execute(() => GetLastMessageId(getLastMessageId, cancellationToken), cancellationToken).ConfigureAwait(false);
     }
 
-    private void Guard()
+    private async ValueTask<MessageId> GetLastMessageId(CommandGetLastMessageId command, CancellationToken cancellationToken)
     {
-        if (_isDisposed != 0)
-            throw new ConsumerDisposedException(GetType().FullName!);
+        ThrowIfNotActive();
 
-        if (_faultException is not null)
-            throw new ConsumerFaultedException(_faultException);
+        if (_isPartitioned)
+        {
+            throw new NotImplementedException("GetLastMessageId is not implemented for partitioned topics");
+        }
+        using (_lock.ReaderLock())
+        {
+            return await _consumers.First().Value.GetLastMessageId(cancellationToken).ConfigureAwait(false);
+        }
     }
 
-    public async Task EstablishNewChannel(CancellationToken cancellationToken)
+    public async ValueTask<IMessage<TMessage>> Receive(CancellationToken cancellationToken = default)
     {
-        var channel = await _executor.Execute(() => _factory.Create(cancellationToken), cancellationToken).ConfigureAwait(false);
+        ThrowIfDisposed();
 
-        var oldChannel = _channel;
-        if (oldChannel is not null)
-            await oldChannel.DisposeAsync().ConfigureAwait(false);
-
-        _channel = channel;
+        return await _executor.Execute(() => ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
     }
 
-    public async ValueTask CloseChannel(CancellationToken cancellationToken)
-        => await _channel.ClosedByClient(cancellationToken).ConfigureAwait(false);
-
-    public async ValueTask ChannelFaulted(Exception exception)
+    private async ValueTask<IMessage<TMessage>> ReceiveMessage(CancellationToken cancellationToken)
     {
-        _faultException = exception;
-        await DisposeChannel().ConfigureAwait(false);
+        ThrowIfNotActive();
+
+        using (_lock.ReaderLock())
+        {
+            if (_messagesQueue.TryDequeue(out var message))
+            {
+                return message;
+            }
+            var cts = new CancellationTokenSource();
+            var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, cts.Token);
+            while (!cancellationToken.IsCancellationRequested)
+            {
+                var done = false;
+
+                Task<IMessage<TMessage>>[] receiveTasks = _consumers.Values.Select(consumer => consumer.Receive(linkedCts.Token).AsTask()).ToArray();
+                await Task.WhenAny(receiveTasks).ConfigureAwait(false);
+
+                try
+                {
+                    receiveTasks.Where(t => t.IsCompleted).ToList().ForEach(t =>
+                        {
+                            if (t.Result == null)
+                            {
+                                return;
+                            }
+
+                            done = true;
+                            _messagesQueue.Enqueue(t.Result);
+                        }
+                    );
+                }
+                catch (Exception exception)
+                {
+                    if (linkedCts.IsCancellationRequested)
+                    {
+                        cts.Cancel();
+                    }
+                    else
+                    {
+                        throw exception;

Review Comment:
   rethrow?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org