You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/03/15 04:08:14 UTC

[rocketmq-clients] branch master updated (6766a8ff -> 513de8f3)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


    from 6766a8ff Fix code style
     new 8550102f Add more tests
     new dc01d5e6 Add the logging part of C# README.md
     new 513de8f3 Log TooManyRequest

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 csharp/README.md                           |  39 ++++++--
 csharp/rocketmq-client-csharp/Endpoints.cs |  10 +-
 csharp/rocketmq-client-csharp/Producer.cs  | 149 +++++++++++++++--------------
 csharp/tests/PublishingLoadBalancerTest.cs | 100 +++++++++++++++++++
 diff_sec.zip                               | Bin 0 -> 174 bytes
 java/README.md                             |   2 +-
 6 files changed, 221 insertions(+), 79 deletions(-)
 create mode 100644 csharp/tests/PublishingLoadBalancerTest.cs
 create mode 100644 diff_sec.zip


[rocketmq-clients] 02/03: Add the logging part of C# README.md

Posted by aa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit dc01d5e6e8b1b18c041de75753c7ce4630e4c687
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Mar 15 10:42:56 2023 +0800

    Add the logging part of C# README.md
---
 csharp/README.md | 39 ++++++++++++++++++++++++++++++++-------
 java/README.md   |  2 +-
 2 files changed, 33 insertions(+), 8 deletions(-)

diff --git a/csharp/README.md b/csharp/README.md
index 5e353855..df85a3fb 100644
--- a/csharp/README.md
+++ b/csharp/README.md
@@ -4,13 +4,16 @@ Here is the .NET implementation of the client for [Apache RocketMQ](https://rock
 
 ## Supported .NET Versions
 
-Due to the release of .NET 5 in 2020, which unified .NET Framework and .NET Core, and has gradually become the mainstream platform for .NET development, the RocketMQ client will support .NET 5 and later versions.
+Due to the release of .NET 5 in 2020, which unified .NET Framework and .NET Core, and has gradually become the
+mainstream platform for .NET development, the RocketMQ client will support .NET 5 and later versions.
 
 See more details about .NET 5 from [Introducing .NET 5](https://devblogs.microsoft.com/dotnet/introducing-net-5/).
 
 ## Architecture
 
-The client would be developed using the protocols outlined in [rocketmq-apis](https://github.com/apache/rocketmq-apis) and built on [gRPC-dotnet](https://github.com/grpc/grpc-dotnet), leveraging Protocol Buffers for data serialization and deserialization during transmission.
+The client would be developed using the protocols outlined in [rocketmq-apis](https://github.com/apache/rocketmq-apis)
+and built on [gRPC-dotnet](https://github.com/grpc/grpc-dotnet), leveraging Protocol Buffers for data serialization and
+deserialization during transmission.
 
 ## Quickstart & Build
 
@@ -18,9 +21,14 @@ The client would be developed using the protocols outlined in [rocketmq-apis](ht
 dotnet add package RocketMQ.Client
 ```
 
-You can obtain the latest version of `RocketMQ.Client` from [NuGet Gallery](https://www.nuget.org/packages/RocketMQ.Client). To assist with getting started quickly and working with various message types and clients, we offer examples [here](./examples).
+You can obtain the latest version of `RocketMQ.Client`
+from [NuGet Gallery](https://www.nuget.org/packages/RocketMQ.Client). To assist with getting started quickly and working
+with various message types and clients, we offer examples [here](./examples).
 
-Layout of this project roughly follows [this guide](https://docs.microsoft.com/en-us/dotnet/core/tutorials/library-with-visual-studio-code?pivots=dotnet-5-0). The solution contains a class library, a unit test module and an example console module. Assuming you are at the home of this repository:
+Layout of this project roughly
+follows [this guide](https://docs.microsoft.com/en-us/dotnet/core/tutorials/library-with-visual-studio-code?pivots=dotnet-5-0)
+. The solution contains a class library, a unit test module and an example console module. Assuming you are at the home
+of this repository:
 
 ```sh
 # build the project
@@ -29,9 +37,26 @@ dotnet build
 dotnet test -l "console;verbosity=detailed"
 ```
 
+## Logging System
+
+We use [NLog](https://nlog-project.org/) as our logging implementation. Similar to the Java binding, we allow the use of
+environment variables to customize the related configuration:
+
+* `rocketmq.log.level`: Log output level, default is INFO.
+* `rocketmq.log.root`: The root directory of the log output. The default path is `$HOME/logs/rocketmq`, so the full path
+  is `$HOME/logs/rocketmq/rocketmq-client.log`.
+* `rocketmq.log.file.maxIndex`: The maximum number of log files to keep. The default is 10, and the size of a single log
+  file is limited to 64 MB. Adjustment is not supported yet.
+
+Specifically, by setting `mq.consoleAppender.enabled` to true, you can output client logs to the console simultaneously
+if you need debugging.
+
 ## Publishing Steps
 
 1. Open the command prompt, and change the directory to the project folder that you want to package.
-2. Run the `dotnet pack --configuration Release` command. This will create a NuGet package in the `bin/Release` folder of the project.
-3. To upload the package to NuGet, go to the NuGet website and sign in. Click on the "Upload" button and select the package file from the `bin/Release` folder.
-4. Follow the instructions on the website to complete the upload process. Once the package is uploaded, it will be available for others to download and use.
+2. Run the `dotnet pack --configuration Release` command. This will create a NuGet package in the `bin/Release` folder
+   of the project.
+3. To upload the package to NuGet, go to the NuGet website and sign in. Click on the "Upload" button and select the
+   package file from the `bin/Release` folder.
+4. Follow the instructions on the website to complete the upload process. Once the package is uploaded, it will be
+   available for others to download and use.
diff --git a/java/README.md b/java/README.md
index d7089ff7..b97014bf 100644
--- a/java/README.md
+++ b/java/README.md
@@ -67,4 +67,4 @@ The following logging parameters are all supported for specification by JVM syst
 * `rocketmq.log.root`: the root directory of the log output, default is `$HOME/logs/rocketmq`, so the full path is `$HOME/logs/rocketmq/rocketmq-client.log`.
 * `rocketmq.log.file.maxIndex`: the maximum number of log files to keep, default is 10 (the size of a single log file is limited to 64 MB, no adjustment is supported now).
 
-Specifically, by setting `mq.consoleAppender.enabled` to 'true,' you can output client logs to the console simultaneously if you need debugging.
+Specifically, by setting `mq.consoleAppender.enabled` to true, you can output client logs to the console simultaneously if you need debugging.


[rocketmq-clients] 01/03: Add more tests

Posted by aa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 8550102f3daf1532a5e9d8b60acdd1430472f47a
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Mar 15 10:00:42 2023 +0800

    Add more tests
---
 csharp/rocketmq-client-csharp/Endpoints.cs |  10 ++-
 csharp/tests/PublishingLoadBalancerTest.cs | 100 +++++++++++++++++++++++++++++
 2 files changed, 109 insertions(+), 1 deletion(-)

diff --git a/csharp/rocketmq-client-csharp/Endpoints.cs b/csharp/rocketmq-client-csharp/Endpoints.cs
index 8d560494..dbf9bdfb 100644
--- a/csharp/rocketmq-client-csharp/Endpoints.cs
+++ b/csharp/rocketmq-client-csharp/Endpoints.cs
@@ -30,7 +30,7 @@ namespace Org.Apache.Rocketmq
 
         private static readonly AddressListEqualityComparer AddressListComparer = new();
         private const string EndpointSeparator = ":";
-        private List<Address> Addresses { get; }
+        public List<Address> Addresses { get; }
         private AddressScheme Scheme { get; }
         private readonly int _hashCode;
 
@@ -114,6 +114,14 @@ namespace Org.Apache.Rocketmq
             var address = new Address(host, port);
             var addresses = new List<Address> { address };
             Addresses = addresses;
+
+            unchecked
+            {
+                var hash = 17;
+                hash = (hash * 31) + AddressListComparer.GetHashCode(Addresses);
+                hash = (hash * 31) + (int)Scheme;
+                _hashCode = hash;
+            }
         }
 
         public override string ToString()
diff --git a/csharp/tests/PublishingLoadBalancerTest.cs b/csharp/tests/PublishingLoadBalancerTest.cs
new file mode 100644
index 00000000..b86a0955
--- /dev/null
+++ b/csharp/tests/PublishingLoadBalancerTest.cs
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections.Generic;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.Rocketmq;
+using Proto = Apache.Rocketmq.V2;
+
+namespace tests
+{
+    [TestClass]
+    public class PublishingLoadBalancerTest
+    {
+        [TestMethod]
+        public void TestTakeMessageQueues()
+        {
+            const string host0 = "127.0.0.1";
+            const string host1 = "127.0.0.2";
+            var mqs = new List<Proto.MessageQueue>();
+            var mq0 = new Proto.MessageQueue
+            {
+                Broker = new Proto.Broker
+                {
+                    Name = "broker0",
+                    Endpoints = new Proto.Endpoints
+                    {
+                        Scheme = Proto.AddressScheme.Ipv4,
+                        Addresses =
+                        {
+                            new Proto.Address
+                            {
+                                Host = host0,
+                                Port = 80
+                            }
+                        }
+                    }
+                },
+                Id = 0,
+                Permission = Proto.Permission.ReadWrite,
+                Topic = new Proto.Resource
+                {
+                    Name = "TestTopic",
+                }
+            };
+            var mq1 = new Proto.MessageQueue
+            {
+                Broker = new Proto.Broker
+                {
+                    Name = "broker1",
+                    Endpoints = new Proto.Endpoints
+                    {
+                        Scheme = Proto.AddressScheme.Ipv4,
+                        Addresses =
+                        {
+                            new Proto.Address
+                            {
+                                Host = host1,
+                                Port = 80
+                            }
+                        }
+                    }
+                },
+                Id = 0,
+                Permission = Proto.Permission.ReadWrite,
+                Topic = new Proto.Resource
+                {
+                    Name = "TestTopic",
+                }
+            };
+            mqs.Add(mq0);
+            mqs.Add(mq1);
+            var topicRouteData = new TopicRouteData(mqs);
+            var publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
+            var endpoints0 = new Endpoints(host0);
+            var excluded0 = new HashSet<Endpoints> { endpoints0 };
+            var candidates0
+                = publishingLoadBalancer.TakeMessageQueues(excluded0, 1);
+            Assert.AreEqual(candidates0.Count, 1);
+            Assert.AreEqual(host1, candidates0[0].Broker.Endpoints.Addresses[0].Host);
+            var endpoints1 = new Endpoints(host1);
+            var excluded1 = new HashSet<Endpoints> { endpoints0, endpoints1 };
+            var candidates1 = publishingLoadBalancer.TakeMessageQueues(excluded1, 2);
+            Assert.AreEqual(2, candidates1.Count);
+        }
+    }
+}
\ No newline at end of file


[rocketmq-clients] 03/03: Log TooManyRequest

Posted by aa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 513de8f3eaa802f445f10928312614016f866b3b
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Mar 15 11:50:43 2023 +0800

    Log TooManyRequest
---
 csharp/rocketmq-client-csharp/Producer.cs | 149 ++++++++++++++++--------------
 diff_sec.zip                              | Bin 0 -> 174 bytes
 2 files changed, 79 insertions(+), 70 deletions(-)

diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs
index da7e8d03..392c50d4 100644
--- a/csharp/rocketmq-client-csharp/Producer.cs
+++ b/csharp/rocketmq-client-csharp/Producer.cs
@@ -168,39 +168,8 @@ namespace Org.Apache.Rocketmq
                 ? publishingLoadBalancer.TakeMessageQueues(new HashSet<Endpoints>(Isolated.Keys), maxAttempts)
                 : new List<MessageQueue>
                     { publishingLoadBalancer.TakeMessageQueueByMessageGroup(publishingMessage.MessageGroup) };
-            Exception exception = null;
-            for (var attempt = 1; attempt <= maxAttempts; attempt++)
-            {
-                var stopwatch = Stopwatch.StartNew();
-                try
-                {
-                    var sendReceipt = await Send0(publishingMessage, candidates, attempt, maxAttempts);
-                    return sendReceipt;
-                }
-                catch (Exception e)
-                {
-                    exception = e;
-                }
-                finally
-                {
-                    var elapsed = stopwatch.Elapsed.Milliseconds;
-                    _sendCostTimeHistogram.Record(elapsed,
-                        new KeyValuePair<string, object>(MetricConstant.Topic, message.Topic),
-                        new KeyValuePair<string, object>(MetricConstant.ClientId, ClientId),
-                        new KeyValuePair<string, object>(MetricConstant.InvocationStatus,
-                            null == exception ? MetricConstant.Success : MetricConstant.Failure));
-                    // Retry immediately if the request is not throttled.
-                    if (exception is TooManyRequestsException)
-                    {
-                        var nextAttempt = 1 + attempt;
-                        var delay = retryPolicy.GetNextAttemptDelay(nextAttempt);
-                        await Task.Delay(delay);
-                    }
-                }
-            }
-
-            throw new Exception($"Failed to send message finally, topic={message.Topic}, clientId={ClientId}",
-                exception);
+            var sendReceipt = await Send0(publishingMessage, candidates);
+            return sendReceipt;
         }
 
         public async Task<ISendReceipt> Send(Message message)
@@ -236,53 +205,93 @@ namespace Org.Apache.Rocketmq
             };
         }
 
-        private async Task<SendReceipt> Send0(PublishingMessage message, List<MessageQueue> candidates, int attempt,
-            int maxAttempts)
+        private async Task<SendReceipt> Send0(PublishingMessage message, List<MessageQueue> candidates)
         {
-            var candidateIndex = (attempt - 1) % candidates.Count;
-            var mq = candidates[candidateIndex];
-            if (PublishingSettings.IsValidateMessageType() &&
-                !mq.AcceptMessageTypes.Contains(message.MessageType))
-            {
-                throw new ArgumentException("Current message type does not match with the accept message types," +
-                                            $" topic={message.Topic}, actualMessageType={message.MessageType}" +
-                                            $" acceptMessageType={string.Join(",", mq.AcceptMessageTypes)}");
-            }
-
-            var sendMessageRequest = WrapSendMessageRequest(message, mq);
-            var endpoints = mq.Broker.Endpoints;
-            var invocation = await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
-            try
+            var retryPolicy = GetRetryPolicy();
+            var maxAttempts = retryPolicy.GetMaxAttempts();
+            Exception exception = null;
+            for (var attempt = 1; attempt <= maxAttempts; attempt++)
             {
-                var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, invocation);
+                var stopwatch = Stopwatch.StartNew();
 
-                var sendReceipt = sendReceipts.First();
-                if (attempt > 1)
+                var candidateIndex = (attempt - 1) % candidates.Count;
+                var mq = candidates[candidateIndex];
+                if (PublishingSettings.IsValidateMessageType() && !mq.AcceptMessageTypes.Contains(message.MessageType))
                 {
-                    Logger.Info(
-                        $"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," +
-                        $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}");
+                    throw new ArgumentException(
+                        "Current message type does not match with the accept message types," +
+                        $" topic={message.Topic}, actualMessageType={message.MessageType}" +
+                        $" acceptMessageType={string.Join(",", mq.AcceptMessageTypes)}");
                 }
 
-                return sendReceipt;
-            }
-            catch (Exception e)
-            {
-                // Isolate current endpoints.
-                Isolated[endpoints] = true;
-                if (attempt >= maxAttempts)
+                var sendMessageRequest = WrapSendMessageRequest(message, mq);
+                var endpoints = mq.Broker.Endpoints;
+                try
                 {
-                    Logger.Error(e, "Failed to send message finally, run out of attempt times, " +
-                                    $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " +
-                                    $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
-                    throw;
+                    var invocation =
+                        await ClientManager.SendMessage(endpoints, sendMessageRequest, ClientConfig.RequestTimeout);
+                    var sendReceipts = SendReceipt.ProcessSendMessageResponse(mq, invocation);
+
+                    var sendReceipt = sendReceipts.First();
+                    if (attempt > 1)
+                    {
+                        Logger.Info(
+                            $"Re-send message successfully, topic={message.Topic}, messageId={sendReceipt.MessageId}," +
+                            $" maxAttempts={maxAttempts}, endpoints={endpoints}, clientId={ClientId}");
+                    }
+
+                    return sendReceipt;
                 }
+                catch (Exception e)
+                {
+                    // Isolate current endpoints.
+                    Isolated[endpoints] = true;
+                    if (attempt >= maxAttempts)
+                    {
+                        Logger.Error(e, "Failed to send message finally, run out of attempt times, " +
+                                        $"topic={message.Topic}, maxAttempt={maxAttempts}, attempt={attempt}, " +
+                                        $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
+                        throw;
+                    }
 
-                Logger.Warn(e, $"Failed to send message, topic={message.Topic}, maxAttempts={maxAttempts}, " +
-                               $"attempt={attempt}, endpoints={endpoints}, messageId={message.MessageId}," +
-                               $" clientId={ClientId}");
-                throw;
+                    if (MessageType.Transaction == message.MessageType)
+                    {
+                        Logger.Error(e, "Failed to send transaction message, run out of attempt times, " +
+                                        $"topic={message.Topic}, maxAttempt=1, attempt={attempt}, " +
+                                        $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
+                        throw;
+                    }
+
+                    exception = e;
+                    if (exception is not TooManyRequestsException)
+                    {
+                        // Retry immediately if the request is not throttled.
+                        Logger.Warn(e, $"Failed to send message, topic={message.Topic}, maxAttempts={maxAttempts}, " +
+                                       $"attempt={attempt}, endpoints={endpoints}, messageId={message.MessageId}," +
+                                       $" clientId={ClientId}");
+                        continue;
+                    }
+
+                    var nextAttempt = 1 + attempt;
+                    var delay = retryPolicy.GetNextAttemptDelay(nextAttempt);
+                    await Task.Delay(delay);
+                    Logger.Warn(e, "Failed to send message due to too many request, would attempt to resend " +
+                                   $"after {delay}, topic={message.Topic}, maxAttempts={maxAttempts}, attempt={attempt}, " +
+                                   $"endpoints={endpoints}, messageId={message.MessageId}, clientId={ClientId}");
+                }
+                finally
+                {
+                    var elapsed = stopwatch.Elapsed.Milliseconds;
+                    _sendCostTimeHistogram.Record(elapsed,
+                        new KeyValuePair<string, object>(MetricConstant.Topic, message.Topic),
+                        new KeyValuePair<string, object>(MetricConstant.ClientId, ClientId),
+                        new KeyValuePair<string, object>(MetricConstant.InvocationStatus,
+                            null == exception ? MetricConstant.Success : MetricConstant.Failure));
+                }
             }
+
+            throw new Exception($"Failed to send message finally, topic={message.Topic}, clientId={ClientId}",
+                exception);
         }
 
         internal override Settings GetSettings()
diff --git a/diff_sec.zip b/diff_sec.zip
new file mode 100644
index 00000000..89278e56
Binary files /dev/null and b/diff_sec.zip differ