You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2021/06/29 10:18:03 UTC
[pulsar-dotpulsar] branch master updated: Make ready for release
1.1.0 and autogenerate consumer/reader name when not set by the user
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7e1c9be Make ready for release 1.1.0 and autogenerate consumer/reader name when not set by the user
7e1c9be is described below
commit 7e1c9beb6f2d49374b1792ab27021669495e24f3
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Tue Jun 29 12:17:49 2021 +0200
Make ready for release 1.1.0 and autogenerate consumer/reader name when not set by the user
---
CHANGELOG.md | 15 +++++++++++++--
README.md | 1 +
src/DotPulsar/Abstractions/IMessageRouter.cs | 2 +-
src/DotPulsar/DotPulsar.csproj | 2 +-
src/DotPulsar/Internal/Producer.cs | 2 +-
src/DotPulsar/PulsarClient.cs | 12 +++++++-----
src/DotPulsar/RoundRobinPartitionRouter.cs | 6 +++---
src/DotPulsar/SinglePartitionRouter.cs | 6 +++---
8 files changed, 30 insertions(+), 16 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 54f3e9f..972cae5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,11 +4,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
-## [Unreleased]
+## [1.1.0] - 2021-06-29
+
+### Added
+
+- The producer now supports partitioned topics
+- The IMessageRouter interface with the RoundRobinPartitionRouter (default) and SinglePartitionRouter implementations
+- The producer builder accepts a custom implementation of IMessageRouter
### Changed
-- The KeyBytes property on MessageMetadata returned null if the key was set via a string. Now it will return string keys as UTF8 bytes.
+- The producer state can now be 'PartiallyConnected'
+- The KeyBytes property on MessageMetadata returned null if the key was set via a string. Now it will return string keys as UTF8 bytes
+
+### Fixed
+
+- Autogenerate a consumer and reader name when it's not explicitly set by the user
## [1.0.2] - 2021-04-30
diff --git a/README.md b/README.md
index a4a92ec..4a00c91 100644
--- a/README.md
+++ b/README.md
@@ -52,6 +52,7 @@ For a more in-depth tour of the API, please visit the [Wiki](https://github.com/
- [X] Producer send with custom metadata
- [X] Producer send with event time, sequence id, and delayed message delivery
- [X] Producer send with key and ordering key
+- [X] Producer for partitioned topics
- [X] Consumer subscription with initial position and priority level
- [X] Consumer subscription types exclusive, shared, failover, and key shared
- [X] Consumer receive and single + cumulative acknowledge
diff --git a/src/DotPulsar/Abstractions/IMessageRouter.cs b/src/DotPulsar/Abstractions/IMessageRouter.cs
index 6652c9c..b6ad294 100644
--- a/src/DotPulsar/Abstractions/IMessageRouter.cs
+++ b/src/DotPulsar/Abstractions/IMessageRouter.cs
@@ -22,6 +22,6 @@ namespace DotPulsar.Abstractions
/// <summary>
/// Choose a partition.
/// </summary>
- int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount);
+ int ChoosePartition(MessageMetadata? messageMetadata, int numberOfPartitions);
}
}
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 0613e86..1dadb0d 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0</TargetFrameworks>
- <Version>1.0.2</Version>
+ <Version>1.1.0</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Authors>ApachePulsar,DanskeCommodities,dblank</Authors>
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 10c4fd5..5951a1d 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -147,13 +147,13 @@ namespace DotPulsar.Internal
private SubProducer<TMessage> CreateSubProducer(string topic)
{
var correlationId = Guid.NewGuid();
- var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var producerName = _options.ProducerName;
var schema = _options.Schema;
var initialSequenceId = _options.InitialSequenceId;
var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, topic, producerName, schema.SchemaInfo, _compressorFactory);
var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
var initialChannel = new NotReadyChannel<TMessage>();
+ var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var producer = new SubProducer<TMessage>(correlationId, ServiceUrl, topic, initialSequenceId, _processManager, initialChannel, executor, stateManager, factory, schema);
var process = new ProducerProcess(correlationId, stateManager, producer);
_processManager.Add(process);
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 167bed8..91549ab 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -90,10 +90,10 @@ namespace DotPulsar
ThrowIfDisposed();
var correlationId = Guid.NewGuid();
- var executor = new Executor(correlationId, _processManager, _exceptionHandler);
+ var consumerName = options.ConsumerName ?? $"Consumer-{correlationId:N}";
var subscribe = new CommandSubscribe
{
- ConsumerName = options.ConsumerName,
+ ConsumerName = consumerName,
InitialPosition = (CommandSubscribe.InitialPositionType) options.InitialPosition,
PriorityLevel = options.PriorityLevel,
ReadCompacted = options.ReadCompacted,
@@ -108,6 +108,7 @@ namespace DotPulsar
var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
var initialChannel = new NotReadyChannel<TMessage>();
+ var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var consumer = new Consumer<TMessage>(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
@@ -125,14 +126,14 @@ namespace DotPulsar
ThrowIfDisposed();
var correlationId = Guid.NewGuid();
- var executor = new Executor(correlationId, _processManager, _exceptionHandler);
+ var subscription = $"Reader-{correlationId:N}";
var subscribe = new CommandSubscribe
{
- ConsumerName = options.ReaderName,
+ ConsumerName = options.ReaderName ?? subscription,
Durable = false,
ReadCompacted = options.ReadCompacted,
StartMessageId = options.StartMessageId.ToMessageIdData(),
- Subscription = $"Reader-{Guid.NewGuid():N}",
+ Subscription = subscription,
Topic = options.Topic
};
var messagePrefetchCount = options.MessagePrefetchCount;
@@ -142,6 +143,7 @@ namespace DotPulsar
var factory = new ConsumerChannelFactory<TMessage>(correlationId, _processManager, _connectionPool, subscribe, messagePrefetchCount, batchHandler, messageFactory, decompressorFactories);
var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
var initialChannel = new NotReadyChannel<TMessage>();
+ var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var reader = new Reader<TMessage>(correlationId, ServiceUrl, options.Topic, _processManager, initialChannel, executor, stateManager, factory);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
diff --git a/src/DotPulsar/RoundRobinPartitionRouter.cs b/src/DotPulsar/RoundRobinPartitionRouter.cs
index d6a1738..0a972bb 100644
--- a/src/DotPulsar/RoundRobinPartitionRouter.cs
+++ b/src/DotPulsar/RoundRobinPartitionRouter.cs
@@ -38,13 +38,13 @@ namespace DotPulsar
/// <summary>
/// Choose a partition in round robin routing mode
/// </summary>
- public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount)
+ public int ChoosePartition(MessageMetadata? messageMetadata, int numberOfPartitions)
{
var keyBytes = messageMetadata?.KeyBytes;
if (keyBytes is not null)
- return (int) MurmurHash3.Hash32(keyBytes, 0) % partitionsCount;
+ return (int) MurmurHash3.Hash32(keyBytes, 0) % numberOfPartitions;
- return Interlocked.Increment(ref _partitionIndex) % partitionsCount;
+ return Interlocked.Increment(ref _partitionIndex) % numberOfPartitions;
}
}
}
diff --git a/src/DotPulsar/SinglePartitionRouter.cs b/src/DotPulsar/SinglePartitionRouter.cs
index 61c241e..e9ecfbc 100644
--- a/src/DotPulsar/SinglePartitionRouter.cs
+++ b/src/DotPulsar/SinglePartitionRouter.cs
@@ -46,14 +46,14 @@ namespace DotPulsar
/// <summary>
/// Choose a partition in single partition routing mode
/// </summary>
- public int ChoosePartition(MessageMetadata? messageMetadata, int partitionsCount)
+ public int ChoosePartition(MessageMetadata? messageMetadata, int numberOfPartitions)
{
var keyBytes = messageMetadata?.KeyBytes;
if (keyBytes is not null)
- return (int) MurmurHash3.Hash32(keyBytes, 0) % partitionsCount;
+ return (int) MurmurHash3.Hash32(keyBytes, 0) % numberOfPartitions;
if (_partitionIndex == -1)
- _partitionIndex = new Random().Next(0, partitionsCount);
+ _partitionIndex = new Random().Next(0, numberOfPartitions);
return _partitionIndex;
}