You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/11/03 09:33:56 UTC

[rocketmq-clients] 01/02: Revert "Define index of LoadBalancer as static (#272)"

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit d429372ea90241d994dade58fe6955ecc4e4b8f9
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Nov 2 10:17:14 2022 +0800

    Revert "Define index of LoadBalancer as static (#272)"
    
    This reverts commit 105640858ae1d9c71e2055ce12f8bc55720c3286.
---
 .../rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java | 5 +++--
 .../rocketmq/client/java/impl/producer/PublishingLoadBalancer.java   | 5 +++--
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
index b9b096e..012d441 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
@@ -34,13 +34,14 @@ public class SubscriptionLoadBalancer {
     /**
      * Index for round-robin.
      */
-    private static final AtomicInteger INDEX = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
+    private final AtomicInteger index;
     /**
      * Message queues to receive message.
      */
     private final ImmutableList<MessageQueueImpl> messageQueues;
 
     public SubscriptionLoadBalancer(TopicRouteData topicRouteData) {
+        this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
         final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
             .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isReadable() &&
                 Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
@@ -52,7 +53,7 @@ public class SubscriptionLoadBalancer {
     }
 
     public MessageQueueImpl takeMessageQueue() {
-        final int next = INDEX.getAndIncrement();
+        final int next = index.getAndIncrement();
         return messageQueues.get(IntMath.mod(next, messageQueues.size()));
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
index 2c9597e..feb9616 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
@@ -43,13 +43,14 @@ public class PublishingLoadBalancer {
     /**
      * Index for round-robin.
      */
-    private static final AtomicInteger INDEX = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
+    private final AtomicInteger index;
     /**
      * Message queues to send message.
      */
     private final ImmutableList<MessageQueueImpl> messageQueues;
 
     public PublishingLoadBalancer(TopicRouteData topicRouteData) {
+        this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
         final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
             .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isWritable() &&
                 Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
@@ -67,7 +68,7 @@ public class PublishingLoadBalancer {
     }
 
     public List<MessageQueueImpl> takeMessageQueues(Set<Endpoints> excluded, int count) {
-        int next = INDEX.getAndIncrement();
+        int next = index.getAndIncrement();
         List<MessageQueueImpl> candidates = new ArrayList<>();
         Set<String> candidateBrokerNames = new HashSet<>();