You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/11/03 09:33:56 UTC
[rocketmq-clients] 01/02: Revert "Define index of LoadBalancer as static (#272)"
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit d429372ea90241d994dade58fe6955ecc4e4b8f9
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Nov 2 10:17:14 2022 +0800
Revert "Define index of LoadBalancer as static (#272)"
This reverts commit 105640858ae1d9c71e2055ce12f8bc55720c3286.
---
.../rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java | 5 +++--
.../rocketmq/client/java/impl/producer/PublishingLoadBalancer.java | 5 +++--
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
index b9b096e..012d441 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
@@ -34,13 +34,14 @@ public class SubscriptionLoadBalancer {
/**
* Index for round-robin.
*/
- private static final AtomicInteger INDEX = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
+ private final AtomicInteger index;
/**
* Message queues to receive message.
*/
private final ImmutableList<MessageQueueImpl> messageQueues;
public SubscriptionLoadBalancer(TopicRouteData topicRouteData) {
+ this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
.filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isReadable() &&
Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
@@ -52,7 +53,7 @@ public class SubscriptionLoadBalancer {
}
public MessageQueueImpl takeMessageQueue() {
- final int next = INDEX.getAndIncrement();
+ final int next = index.getAndIncrement();
return messageQueues.get(IntMath.mod(next, messageQueues.size()));
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
index 2c9597e..feb9616 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
@@ -43,13 +43,14 @@ public class PublishingLoadBalancer {
/**
* Index for round-robin.
*/
- private static final AtomicInteger INDEX = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
+ private final AtomicInteger index;
/**
* Message queues to send message.
*/
private final ImmutableList<MessageQueueImpl> messageQueues;
public PublishingLoadBalancer(TopicRouteData topicRouteData) {
+ this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
.filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isWritable() &&
Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
@@ -67,7 +68,7 @@ public class PublishingLoadBalancer {
}
public List<MessageQueueImpl> takeMessageQueues(Set<Endpoints> excluded, int count) {
- int next = INDEX.getAndIncrement();
+ int next = index.getAndIncrement();
List<MessageQueueImpl> candidates = new ArrayList<>();
Set<String> candidateBrokerNames = new HashSet<>();