You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2021/09/07 10:33:44 UTC

[pulsar-dotpulsar] branch master updated: Only hash the key if the array is not empty

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 08a6ff8  Only hash the key if the array is not empty
08a6ff8 is described below

commit 08a6ff856ddaba9a6acc4ee4f310c4fe474447a2
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Tue Sep 7 12:33:37 2021 +0200

    Only hash the key if the array is not empty
---
 src/DotPulsar/RoundRobinPartitionRouter.cs        |  2 +-
 src/DotPulsar/SinglePartitionRouter.cs            |  2 +-
 tests/DotPulsar.IntegrationTests/ProducerTests.cs | 44 +++++++++++++----------
 3 files changed, 27 insertions(+), 21 deletions(-)

diff --git a/src/DotPulsar/RoundRobinPartitionRouter.cs b/src/DotPulsar/RoundRobinPartitionRouter.cs
index 6176d40..8a932ec 100644
--- a/src/DotPulsar/RoundRobinPartitionRouter.cs
+++ b/src/DotPulsar/RoundRobinPartitionRouter.cs
@@ -41,7 +41,7 @@ namespace DotPulsar
         public int ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions)
         {
             var keyBytes = messageMetadata.KeyBytes;
-            if (keyBytes is not null)
+            if (keyBytes is not null && keyBytes.Length > 0)
                 return (int) MurmurHash3.Hash32(keyBytes, 0) % numberOfPartitions;
 
             return Interlocked.Increment(ref _partitionIndex) % numberOfPartitions;
diff --git a/src/DotPulsar/SinglePartitionRouter.cs b/src/DotPulsar/SinglePartitionRouter.cs
index cc2562b..e117653 100644
--- a/src/DotPulsar/SinglePartitionRouter.cs
+++ b/src/DotPulsar/SinglePartitionRouter.cs
@@ -49,7 +49,7 @@ namespace DotPulsar
         public int ChoosePartition(MessageMetadata messageMetadata, int numberOfPartitions)
         {
             var keyBytes = messageMetadata.KeyBytes;
-            if (keyBytes is not null)
+            if (keyBytes is not null && keyBytes.Length > 0)
                 return (int) MurmurHash3.Hash32(keyBytes, 0) % numberOfPartitions;
             
             if (_partitionIndex == -1)
diff --git a/tests/DotPulsar.IntegrationTests/ProducerTests.cs b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
index 742454c..6384ed9 100644
--- a/tests/DotPulsar.IntegrationTests/ProducerTests.cs
+++ b/tests/DotPulsar.IntegrationTests/ProducerTests.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -69,16 +69,25 @@ namespace DotPulsar.IntegrationTests
         public async Task SinglePartition_WhenSendMessages_ThenGetMessagesFromSinglePartition()
         {
             //Arrange
-            await using var client = PulsarClient.Builder().ServiceUrl(_pulsarService.GetBrokerUri()).Build();
-            string topicName = $"single-partitioned-{Guid.NewGuid():N}";
+            var serviceUrl = _pulsarService.GetBrokerUri();
             const string content = "test-message";
             const int partitions = 3;
             const int msgCount = 3;
-            var consumers = new List<IConsumer<string>>();
-
+            var topicName = $"single-partitioned-{Guid.NewGuid():N}";
             await _pulsarService.CreatePartitionedTopic($"persistent/public/default/{topicName}", partitions);
+            await using var client = PulsarClient.Builder().ServiceUrl(serviceUrl).Build();
 
             //Act
+            var consumers = new List<IConsumer<string>>();
+            for (var i = 0; i < partitions; ++i)
+            {
+                consumers.Add(client.NewConsumer(Schema.String)
+                    .Topic($"{topicName}-partition-{i}")
+                    .SubscriptionName("test-sub")
+                    .InitialPosition(SubscriptionInitialPosition.Earliest)
+                    .Create());
+            }
+
             for (var i = 0; i < partitions; ++i)
             {
                 await using var producer = client.NewProducer(Schema.String)
@@ -88,26 +97,23 @@ namespace DotPulsar.IntegrationTests
 
                 for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
                 {
-                    await producer.Send($"{content}-{i}-{msgIndex}");
-                    _testOutputHelper.WriteLine($"Sent a message: {content}-{i}-{msgIndex}");
+                    var message = $"{content}-{i}-{msgIndex}";
+                    _ = await producer.Send(message);
+                    _testOutputHelper.WriteLine($"Sent a message: {message}");
                 }
             }
 
+            //Assert
             for (var i = 0; i < partitions; ++i)
             {
-                consumers.Add(client.NewConsumer(Schema.String)
-                    .Topic($"{topicName}-partition-{i}")
-                    .SubscriptionName("test-sub")
-                    .InitialPosition(SubscriptionInitialPosition.Earliest)
-                    .Create());
-            }
-
-            var msg = await consumers[1].GetLastMessageId();
+                var consumer = consumers[i];
 
-            //Assert
-            for (var i = 0; i < partitions; ++i)
-            for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
-                (await consumers[i].Receive()).Value().Should().Be($"{content}-{i}-{msgIndex}");
+                for (var msgIndex = 0; msgIndex < msgCount; ++msgIndex)
+                {
+                    var message = await consumer.Receive();
+                    message.Value().Should().Be($"{content}-{i}-{msgIndex}");
+                }
+            }
         }
 
         [Fact]