You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2021/02/23 10:35:57 UTC

[pulsar-dotpulsar] branch master updated: Adding PackageIcon and removing the ReaderChannelFactory

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 814d366  Adding PackageIcon and removing the ReaderChannelFactory
814d366 is described below

commit 814d366096c03cc2bd75f5d4c79d8e71c2e0ff66
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Tue Feb 23 11:35:42 2021 +0100

    Adding PackageIcon and removing the ReaderChannelFactory
---
 .asf.yaml                                        |   2 +-
 src/DotPulsar/DotPulsar.csproj                   |   9 ++-
 src/DotPulsar/Internal/ConsumerChannelFactory.cs |  21 ++-----
 src/DotPulsar/Internal/ReaderChannelFactory.cs   |  75 -----------------------
 src/DotPulsar/PackageIcon.png                    | Bin 0 -> 13430 bytes
 src/DotPulsar/PulsarClient.cs                    |  32 +++++++++-
 6 files changed, 44 insertions(+), 95 deletions(-)

diff --git a/.asf.yaml b/.asf.yaml
index 7f8fedd..319738d 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -18,7 +18,7 @@
 #
 
 github:
-  description: ".NET/C# client library for Apache Pulsar"
+  description: "The official .NET/C# client library for Apache Pulsar"
   homepage: https://pulsar.apache.org/
   labels:
     - pulsar
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index f89de85..934b5a4 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -5,11 +5,12 @@
     <Version>0.11.0</Version>
     <AssemblyVersion>$(Version)</AssemblyVersion>
     <FileVersion>$(Version)</FileVersion>
-    <Authors>DanskeCommodities;dblank</Authors>
-    <Company>Danske Commodities A/S</Company>
+    <Authors>ApachePulsar,DanskeCommodities,dblank</Authors>
+    <Company>Apache Software Foundation</Company>
     <Copyright>$(Company)</Copyright>
     <Title>DotPulsar</Title>
     <PackageTags>Apache;Pulsar</PackageTags>
+    <PackageIcon>PackageIcon.png</PackageIcon>
     <PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
     <PackageReleaseNotes>Please refer to CHANGELOG.md for details</PackageReleaseNotes>
     <Description>The official .NET/C# client library for Apache Pulsar</Description>
@@ -32,4 +33,8 @@
     <PackageReference Include="Microsoft.Bcl.HashCode" Version="1.1.1" />
   </ItemGroup>
 
+  <ItemGroup>
+    <None Include="PackageIcon.png" Pack="true" PackagePath="/" Visible="False" />
+  </ItemGroup>
+
 </Project>
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index eb97404..5d20ce5 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -37,27 +37,18 @@ namespace DotPulsar.Internal
             IRegisterEvent eventRegister,
             IConnectionPool connectionPool,
             IExecute executor,
-            ConsumerOptions options,
+            CommandSubscribe subscribe,
+            uint messagePrefetchCount,
+            BatchHandler batchHandler,
             IEnumerable<IDecompressorFactory> decompressorFactories)
         {
             _correlationId = correlationId;
             _eventRegister = eventRegister;
             _connectionPool = connectionPool;
             _executor = executor;
-            _messagePrefetchCount = options.MessagePrefetchCount;
-
-            _subscribe = new CommandSubscribe
-            {
-                ConsumerName = options.ConsumerName,
-                InitialPosition = (CommandSubscribe.InitialPositionType) options.InitialPosition,
-                PriorityLevel = options.PriorityLevel,
-                ReadCompacted = options.ReadCompacted,
-                Subscription = options.SubscriptionName,
-                Topic = options.Topic,
-                Type = (CommandSubscribe.SubType) options.SubscriptionType
-            };
-
-            _batchHandler = new BatchHandler(true);
+            _subscribe = subscribe;
+            _messagePrefetchCount = messagePrefetchCount;
+            _batchHandler = batchHandler;
             _decompressorFactories = decompressorFactories;
         }
 
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs
deleted file mode 100644
index bc8b932..0000000
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal
-{
-    using Abstractions;
-    using PulsarApi;
-    using System;
-    using System.Collections.Generic;
-    using System.Threading;
-    using System.Threading.Tasks;
-
-    public sealed class ReaderChannelFactory : IConsumerChannelFactory
-    {
-        private readonly Guid _correlationId;
-        private readonly IRegisterEvent _eventRegister;
-        private readonly IConnectionPool _connectionPool;
-        private readonly IExecute _executor;
-        private readonly CommandSubscribe _subscribe;
-        private readonly uint _messagePrefetchCount;
-        private readonly BatchHandler _batchHandler;
-        private readonly IEnumerable<IDecompressorFactory> _decompressorFactories;
-
-        public ReaderChannelFactory(
-            Guid correlationId,
-            IRegisterEvent eventRegister,
-            IConnectionPool connectionPool,
-            IExecute executor,
-            ReaderOptions options,
-            IEnumerable<IDecompressorFactory> decompressorFactories)
-        {
-            _correlationId = correlationId;
-            _eventRegister = eventRegister;
-            _connectionPool = connectionPool;
-            _executor = executor;
-            _messagePrefetchCount = options.MessagePrefetchCount;
-
-            _subscribe = new CommandSubscribe
-            {
-                ConsumerName = options.ReaderName,
-                Durable = false,
-                ReadCompacted = options.ReadCompacted,
-                StartMessageId = options.StartMessageId.ToMessageIdData(),
-                Subscription = $"Reader-{Guid.NewGuid():N}",
-                Topic = options.Topic
-            };
-
-            _batchHandler = new BatchHandler(false);
-            _decompressorFactories = decompressorFactories;
-        }
-
-        public async Task<IConsumerChannel> Create(CancellationToken cancellationToken)
-            => await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken).ConfigureAwait(false);
-
-        private async ValueTask<IConsumerChannel> GetChannel(CancellationToken cancellationToken)
-        {
-            var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
-            var messageQueue = new AsyncQueue<MessagePackage>();
-            var channel = new Channel(_correlationId, _eventRegister, messageQueue);
-            var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
-            return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler, _decompressorFactories);
-        }
-    }
-}
diff --git a/src/DotPulsar/PackageIcon.png b/src/DotPulsar/PackageIcon.png
new file mode 100644
index 0000000..849e7ce
Binary files /dev/null and b/src/DotPulsar/PackageIcon.png differ
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index a74f015..3ce1940 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -16,6 +16,7 @@ namespace DotPulsar
 {
     using Abstractions;
     using DotPulsar.Internal.Compression;
+    using DotPulsar.Internal.PulsarApi;
     using Exceptions;
     using Internal;
     using Internal.Abstractions;
@@ -92,9 +93,23 @@ namespace DotPulsar
         public IConsumer CreateConsumer(ConsumerOptions options)
         {
             ThrowIfDisposed();
+
             var correlationId = Guid.NewGuid();
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
-            var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options, CompressionFactories.DecompressorFactories());
+            var subscribe = new CommandSubscribe
+            {
+                ConsumerName = options.ConsumerName,
+                InitialPosition = (CommandSubscribe.InitialPositionType) options.InitialPosition,
+                PriorityLevel = options.PriorityLevel,
+                ReadCompacted = options.ReadCompacted,
+                Subscription = options.SubscriptionName,
+                Topic = options.Topic,
+                Type = (CommandSubscribe.SubType) options.SubscriptionType
+            };
+            var messagePrefetchCount = options.MessagePrefetchCount;
+            var batchHandler = new BatchHandler(true);
+            var decompressorFactories = CompressionFactories.DecompressorFactories();
+            var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, subscribe, messagePrefetchCount, batchHandler, decompressorFactories);
             var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
             var consumer = new Consumer(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
             if (options.StateChangedHandler is not null)
@@ -111,9 +126,22 @@ namespace DotPulsar
         public IReader CreateReader(ReaderOptions options)
         {
             ThrowIfDisposed();
+
             var correlationId = Guid.NewGuid();
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
-            var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options, CompressionFactories.DecompressorFactories());
+            var subscribe = new CommandSubscribe
+            {
+                ConsumerName = options.ReaderName,
+                Durable = false,
+                ReadCompacted = options.ReadCompacted,
+                StartMessageId = options.StartMessageId.ToMessageIdData(),
+                Subscription = $"Reader-{Guid.NewGuid():N}",
+                Topic = options.Topic
+            };
+            var messagePrefetchCount = options.MessagePrefetchCount;
+            var batchHandler = new BatchHandler(false);
+            var decompressorFactories = CompressionFactories.DecompressorFactories();
+            var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, subscribe, messagePrefetchCount, batchHandler, decompressorFactories);
             var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
             var reader = new Reader(correlationId, ServiceUrl, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
             if (options.StateChangedHandler is not null)