You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/25 11:49:16 UTC

[GitHub] [pulsar-dotpulsar] blankensteiner commented on a change in pull request #48: Producer for Partitioned topics

blankensteiner commented on a change in pull request #48:
URL: https://github.com/apache/pulsar-dotpulsar/pull/48#discussion_r476374969



##########
File path: src/DotPulsar/Abstractions/IMessageRouter.cs
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+    using DotPulsar.Internal;

Review comment:
       Our public abstractions should never expose things from DotPulsar.Internal.

##########
File path: src/DotPulsar/PulsarClient.cs
##########
@@ -12,15 +12,22 @@
  * limitations under the License.
  */
 
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("DotPulsar.Tests")]

Review comment:
       Please remove this and use the "Internal" namespace convention of being able to test "internal" stuff :-)

##########
File path: src/DotPulsar/Internal/MathUtils.cs
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    public static class MathUtils

Review comment:
       If this is only used by certain MessageRouter implementations, maybe we should create an abstract MessageRouter class and have the method there instead?

##########
File path: src/DotPulsar/Abstractions/IMessageRouter.cs
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+    using DotPulsar.Internal;
+    public interface IMessageRouter
+    {
+        /// <summary>
+        /// Choose a partition.
+        /// </summary>
+        int ChoosePartition(MessageMetadata? message, PartitionedTopicMetadata partitionedTopic);

Review comment:
       Let's rename "message" to "metadata" or "messageMetadata". Are we sure we want it to be nullable?
   Do we need "PartitionedTopicMetadata" or just need to know the number of partitions? Do we want it injected pr call to ChoosePartition or just once in the constructor?

##########
File path: src/DotPulsar/Abstractions/IMessageRouter.cs
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+    using DotPulsar.Internal;
+    public interface IMessageRouter

Review comment:
       Let's have a space between the using statement(s) and the declaration of the interface.
   Also some documentation of the interface like "A message routing abstraction".

##########
File path: src/DotPulsar/Abstractions/IPulsarClient.cs
##########
@@ -35,5 +38,10 @@ public interface IPulsarClient : IAsyncDisposable
         /// Create a reader.
         /// </summary>
         IReader CreateReader(ReaderOptions options);
+
+        /// <summary>
+        /// Get the partition topic metadata for a given topic.
+        /// </summary>
+        public Task<PartitionedTopicMetadata> GetPartitionTopicMetadata(string topic, CancellationToken cancellationToken = default);

Review comment:
       I don't think we need to make this a public method. It's only used internally right?
   If it should be public, please do use ValueTask.

##########
File path: tests/DotPulsar.Tests/RoundRobinPartitonRouterTests.cs
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests
+{
+    using DotPulsar.Internal;
+    using Xunit;
+    public class RoundRobinPartitonRouterTests

Review comment:
       Let's have a space between usings declarations and the class declaration

##########
File path: src/DotPulsar/Internal/PartitionedTopicMetadata.cs
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    public sealed class PartitionedTopicMetadata

Review comment:
       I wonder if we need a class just to pass an integer?

##########
File path: src/DotPulsar/PulsarClient.cs
##########
@@ -12,15 +12,22 @@
  * limitations under the License.
  */
 
+using System.Runtime.CompilerServices;

Review comment:
       Please remove this

##########
File path: src/DotPulsar/ProducerOptions.cs
##########
@@ -41,5 +50,21 @@ public ProducerOptions(string topic)
         /// Set the topic for this producer. This is required.
         /// </summary>
         public string Topic { get; set; }
+
+        /// <summary>
+        /// Set the message router. The default router is Round Robind routing mode.
+        /// </summary>
+        public IMessageRouter MessageRouter { get; set; } = new RoundRobinPartitionRouter();

Review comment:
       Let's keep all initialization in the constructor.

##########
File path: src/DotPulsar/SinglePartitionRouter.cs
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using System;
+
+    public class SinglePartitionRouter : IMessageRouter
+    {
+        private int? _partitionIndex;
+        public SinglePartitionRouter(int? partitionIndex = null)
+        {
+            _partitionIndex = partitionIndex;
+        }
+        public int ChoosePartition(MessageMetadata? message, PartitionedTopicMetadata partitionedTopic)

Review comment:
       Let's have a space between methods

##########
File path: src/DotPulsar/SinglePartitionRouter.cs
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using System;
+
+    public class SinglePartitionRouter : IMessageRouter
+    {
+        private int? _partitionIndex;
+        public SinglePartitionRouter(int? partitionIndex = null)

Review comment:
       Let's have a space between fields and methods.

##########
File path: tests/DotPulsar.Tests/SinglePartitionRouterTests.cs
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests
+{
+    using DotPulsar.Internal;
+    using Xunit;
+    public class SinglePartitionRouterTests

Review comment:
       Let's have a space between usings declarations and the class declaration

##########
File path: src/DotPulsar/Abstractions/IProducerBuilder.cs
##########
@@ -34,6 +34,28 @@ public interface IProducerBuilder
         /// </summary>
         IProducerBuilder Topic(string topic);
 
+        /// <summary>
+        /// Set the message router for this producer. This is optional.
+        /// </summary>
+        IProducerBuilder MessageRouter(IMessageRouter router);
+
+        /// <summary>
+        /// If enabled, partitioned producer will automatically discover new partitions at runtime.
+        /// Only for the partitioned producer implementation.
+        /// Default is true.
+        /// </summary>
+        /// <param name="autoUpdate">Whether to auto discover the partition configuration changes</param>
+        /// <returns>The producer builder instance</returns>
+        IProducerBuilder AutoUpdatePartitions(bool autoUpdate);
+
+        /// <summary>
+        /// Set the interval of updating partitions. This is in second.
+        /// Only for the partitioned producer implementation. 
+        /// </summary>
+        /// <param name="interval">The interval of updating partitions</param>
+        /// <returns>The producer builder instance</returns>
+        IProducerBuilder AutoUpdatePartitionsInterval(int interval);

Review comment:
       "interval" should be a TimeSpan.

##########
File path: src/DotPulsar/RoundRobinPartitionRouter.cs
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using System.Threading;
+
+    public class RoundRobinPartitionRouter : IMessageRouter
+    {
+        private long _partitionIndex = -1;

Review comment:
       Let's have a space between fields and methods.

##########
File path: src/DotPulsar/Abstractions/IPulsarClient.cs
##########
@@ -14,7 +14,10 @@
 
 namespace DotPulsar.Abstractions
 {
+    using DotPulsar.Internal;

Review comment:
       Our public abstractions should never expose things from DotPulsar.Internal.

##########
File path: src/DotPulsar/Internal/Abstractions/ITimer.cs
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Abstractions
+{
+    using System;
+    public interface ITimer : IDisposable

Review comment:
       Instead of ITimer and it's implementation, I think we should keep it simple and just loop with an await Task.Delay(..)

##########
File path: src/DotPulsar/ProducerOptions.cs
##########
@@ -14,19 +14,28 @@
 
 namespace DotPulsar
 {
+    using DotPulsar.Abstractions;
     /// <summary>
     /// The producer building options.
     /// </summary>
     public sealed class ProducerOptions
     {
         internal const ulong DefaultInitialSequenceId = 0;
+        internal const int DefaultAutoUpdatePartitionsInterval = 60;
 
         public ProducerOptions(string topic)
         {
             InitialSequenceId = DefaultInitialSequenceId;
             Topic = topic;
         }
 
+        public ProducerOptions(ProducerOptions previousOptions)

Review comment:
       Not sure I like this. If you want easy copying then maybe just create a "Clone()" method?

##########
File path: src/DotPulsar/Internal/Murmur3_32Hash.cs
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using System.IO;
+    using System.Text;
+
+    /// <summary>
+    /// Implementation of the MurmurHash3 non-cryptographic hash function.
+    /// </summary>
+    public class Murmur3_32Hash

Review comment:
       Let's go with the standard naming convention and not use underscore.
   Let's also make the class sealed and not a singleton (just create an instance if you need it and avoid the method call to the getter every time you need it).
   My main concern is the performance compared to the many other murmur implementations out there. Have you done any performance testing?
   

##########
File path: tests/DotPulsar.Tests/PulsarClientTests.cs
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using DotPulsar.Internal.Abstractions;
+    using DotPulsar.Internal.PulsarApi;
+    using Moq;
+    using System.Threading;
+    using Xunit;
+    public class PulsarClientTests

Review comment:
       Let's have a space between usings declarations and the class declaration

##########
File path: src/DotPulsar/RoundRobinPartitionRouter.cs
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using System.Threading;
+
+    public class RoundRobinPartitionRouter : IMessageRouter

Review comment:
       Mark the class as sealed

##########
File path: src/DotPulsar/SinglePartitionRouter.cs
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using System;
+
+    public class SinglePartitionRouter : IMessageRouter

Review comment:
       Mark the class as sealed

##########
File path: tests/DotPulsar.Tests/Internal/PartitionedProducerTests.cs
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using Moq;
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Threading;
+    using Xunit;
+    public class PartitionedProducerTests

Review comment:
       Let's have a space between usings declarations and the class declaration

##########
File path: tests/DotPulsar.Tests/DotPulsar.Tests.csproj
##########
@@ -7,6 +7,7 @@
 
   <ItemGroup>
     <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.6.1" />
+    <PackageReference Include="Moq" Version="4.14.5" />

Review comment:
       We can add mocking and autofixture, but then it's stuff like:
   <PackageReference Include="AutoFixture" Version="4.13.0" />
   <PackageReference Include="AutoFixture.AutoNSubstitute" Version="4.13.0" />
   <PackageReference Include="AutoFixture.Xunit2" Version="4.13.0" />
   <PackageReference Include="NSubstitute" Version="4.2.2" />
   <PackageReference Include="FluentAssertions" Version="5.10.3" />




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org