You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2021/09/07 10:33:44 UTC
[pulsar-dotpulsar] branch master updated: Only hash the key if the
array is not empty
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 08a6ff8 Only hash the key if the array is not empty
08a6ff8 is described below
commit 08a6ff856ddaba9a6acc4ee4f310c4fe474447a2
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Tue Sep 7 12:33:37 2021 +0200
Only hash the key if the array is not empty
---
src/DotPulsar/RoundRobinPartitionRouter.cs | 2 +-
src/DotPulsar/SinglePartitionRouter.cs | 2 +-
tests/DotPulsar.IntegrationTests/ProducerTests.cs | 44 +++++++++++++----------
3 files changed, 27 insertions(+), 21 deletions(-)
diff --git a/src/DotPulsar/RoundRobinPartitionRouter.cs b/src/DotPulsar/RoundRobinPartitionRouter.cs
index 6176d40..8a932ec 100644
--- a/src/DotPulsar/RoundRobinPartitionRouter.cs
+++ b/src/DotPulsar/RoundRobinPartitionRouter.cs
@@ -41,7 +41,7 @@ namespace DotPulsar
public int ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions)
{
var keyBytes = messageMetadata.KeyBytes;
- if (keyBytes is not null)
+ if (keyBytes is not null && keyBytes.Length > 0)
return (int) MurmurHash3.Hash32(keyBytes, 0) % numberOfPartitions;
return Interlocked.Increment(ref _partitionIndex) % numberOfPartitions;
diff --git a/src/DotPulsar/SinglePartitionRouter.cs b/src/DotPulsar/SinglePartitionRouter.cs
index cc2562b..e117653 100644
--- a/src/DotPulsar/SinglePartitionRouter.cs
+++ b/src/DotPulsar/SinglePartitionRouter.cs
@@ -49,7 +49,7 @@ namespace DotPulsar
public int ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions)
{
var keyBytes = messageMetadata.KeyBytes;
- if (keyBytes is not null)
+ if (keyBytes is not null && keyBytes.Length > 0)
return (int) MurmurHash3.Hash32(keyBytes, 0) % numberOfPartitions;
if (_partitionIndex == -1)
diff --git a/tests/DotPulsar.IntegrationTests/ProducerTests.cs b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
index 742454c..6384ed9 100644
--- a/tests/DotPulsar.IntegrationTests/ProducerTests.cs
+++ b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -69,16 +69,25 @@ namespace DotPulsar.IntegrationTests
public async Task SinglePartition_WhenSendMessages_ThenGetMessagesFromSinglePartition()
{
//Arrange
- await using var client = PulsarClient.Builder().ServiceUrl(_pulsarService.GetBrokerUri()).Build();
- string topicName = $"single-partitioned-{Guid.NewGuid():N}";
+ var serviceUrl = _pulsarService.GetBrokerUri();
const string content = "test-message";
const int partitions = 3;
const int msgCount = 3;
- var consumers = new List<IConsumer<string>>();
-
+ var topicName = $"single-partitioned-{Guid.NewGuid():N}";
await _pulsarService.CreatePartitionedTopic($"persistent/public/default/{topicName}", partitions);
+ await using var client = PulsarClient.Builder().ServiceUrl(serviceUrl).Build();
//Act
+ var consumers = new List<IConsumer<string>>();
+ for (var i = 0; i < partitions; ++i)
+ {
+ consumers.Add(client.NewConsumer(Schema.String)
+ .Topic($"{topicName}-partition-{i}")
+ .SubscriptionName("test-sub")
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .Create());
+ }
+
for (var i = 0; i < partitions; ++i)
{
await using var producer = client.NewProducer(Schema.String)
@@ -88,26 +97,23 @@ namespace DotPulsar.IntegrationTests
for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
{
- await producer.Send($"{content}-{i}-{msgIndex}");
- _testOutputHelper.WriteLine($"Sent a message: {content}-{i}-{msgIndex}");
+ var message = $"{content}-{i}-{msgIndex}";
+ _ = await producer.Send(message);
+ _testOutputHelper.WriteLine($"Sent a message: {message}");
}
}
+ //Assert
for (var i = 0; i < partitions; ++i)
{
- consumers.Add(client.NewConsumer(Schema.String)
- .Topic($"{topicName}-partition-{i}")
- .SubscriptionName("test-sub")
- .InitialPosition(SubscriptionInitialPosition.Earliest)
- .Create());
- }
-
- var msg = await consumers[1].GetLastMessageId();
+ var consumer = consumers[i];
- //Assert
- for (var i = 0; i < partitions; ++i)
- for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
- (await consumers[i].Receive()).Value().Should().Be($"{content}-{i}-{msgIndex}");
+ for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
+ {
+ var message = await consumer.Receive();
+ message.Value().Should().Be($"{content}-{i}-{msgIndex}");
+ }
+ }
}
[Fact]