You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/09/04 04:53:19 UTC

[GitHub] [pulsar-dotpulsar] usaguerrilla commented on a change in pull request #48: Producer for Partitioned topics

usaguerrilla commented on a change in pull request #48:
URL: https://github.com/apache/pulsar-dotpulsar/pull/48#discussion_r483385175



##########
File path: src/DotPulsar/Internal/PartitionedProducer.cs
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal.Abstractions;
+    using System;
+    using System.Buffers;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Linq;
+    using System.Threading;
+    using System.Threading.Tasks;
+
+    public sealed class PartitionedProducer : IProducer
+    {
+        private readonly ConcurrentDictionary<int, IProducer> _producers;
+        private readonly StateManager<ProducerState> _state;
+        private PartitionedTopicMetadata _partitionedTopicMetadata;
+        private int _isDisposed;
+        private CancellationTokenSource _cancellationTokenSource;
+        private int _connectedProducerCount = 0;
+        private ReaderWriterLockSlim _metadataLock = new ReaderWriterLockSlim();
+        private IMessageRouter _messageRouter;
+        private ITimer _timer;
+        private PulsarClient _client;
+        private ProducerOptions _options;
+
+        public string Topic { get; }
+
+        public PartitionedProducer(
+            string topic,
+            StateManager<ProducerState> state,
+            ProducerOptions options,
+            PartitionedTopicMetadata partitionedTopicMetadata,
+            Dictionary<int, IProducer> producers,
+            IMessageRouter messageRouter,
+            PulsarClient client,
+            ITimer timer)
+        {
+            Topic = topic;
+            _state = state;
+            _partitionedTopicMetadata = partitionedTopicMetadata;
+            _producers = new ConcurrentDictionary<int, IProducer>(producers);
+            _isDisposed = 0;
+            _messageRouter = messageRouter;
+            _client = client;
+            _options = options;
+
+            _cancellationTokenSource = new CancellationTokenSource();
+
+            foreach (var producer in _producers.Values)
+            {
+                _ = MonitorState(producer, null, _cancellationTokenSource.Token);
+            }
+
+            _timer = timer;
+            if (options.AutoUpdatePartitions)
+            {
+                _timer.SetCallback(UpdatePartitionMetadata, options.AutoUpdatePartitionsInterval * 1000);
+            }
+        }
+
+        private async Task MonitorState(IProducer producer, ProducerState? initialState, CancellationToken cancellationToken = default)
+        {
+            await Task.Yield();
+
+            var state = initialState ?? ProducerState.Disconnected;
+
+            try
+            {
+                while (!cancellationToken.IsCancellationRequested)
+                {
+                    var stateChanged = await producer.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+                    state = stateChanged.ProducerState;
+
+                    switch (state)
+                    {
+                        case ProducerState.Disconnected:
+                            Interlocked.Decrement(ref _connectedProducerCount);
+                            _state.SetState(ProducerState.Disconnected);
+                            break;
+                        case ProducerState.Faulted:
+                            Interlocked.Decrement(ref _connectedProducerCount);
+                            _state.SetState(ProducerState.Faulted);
+                            break;
+                        case ProducerState.Closed:
+                            Interlocked.Decrement(ref _connectedProducerCount);
+                            _state.SetState(ProducerState.Closed);
+                            break;
+                        case ProducerState.Connected:
+                            Interlocked.Increment(ref _connectedProducerCount);
+                            break;
+                    }
+
+                    _metadataLock.EnterReadLock();
+                    if (_connectedProducerCount == _partitionedTopicMetadata.Partitions)
+                        _state.SetState(ProducerState.Connected);
+                    _metadataLock.ExitReadLock();
+
+                    if (IsFinalState(state))
+                        _cancellationTokenSource.Cancel(); // cancel other monitor tasks
+
+                    if (producer.IsFinalState(state))
+                        return;
+                }
+            }
+            catch (OperationCanceledException)
+            { }
+        }
+
+        private async void UpdatePartitionMetadata()
+        {
+            var cancellationToken = _cancellationTokenSource.Token;
+
+            try
+            {
+                while (!cancellationToken.IsCancellationRequested)

Review comment:
       (this leads to a memory leak)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org