You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/11/03 09:33:57 UTC
[rocketmq-clients] 02/02: Reserve index of load balancer when topic route is updated
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit cfb9daa75f2a49d2f205d223de9bf7331c002620
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Nov 2 11:11:48 2022 +0800
Reserve index of load balancer when topic route is updated
---
.../java/impl/consumer/SimpleConsumerImpl.java | 39 +++++++++++-----------
.../impl/consumer/SubscriptionLoadBalancer.java | 10 +++++-
.../client/java/impl/producer/ProducerImpl.java | 28 +++++++++-------
.../java/impl/producer/PublishingLoadBalancer.java | 10 +++++-
4 files changed, 53 insertions(+), 34 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 1b565dd..310df59 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -27,7 +27,6 @@ import com.google.common.math.IntMath;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
@@ -68,7 +67,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
private final AtomicInteger topicIndex;
private final Map<String /* topic */, FilterExpression> subscriptionExpressions;
- private final ConcurrentMap<String /* topic */, SubscriptionLoadBalancer> subTopicRouteDataResultCache;
+ private final ConcurrentMap<String /* topic */, SubscriptionLoadBalancer> subscriptionRouteDataCache;
public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration,
Map<String, FilterExpression> subscriptionExpressions) {
@@ -82,7 +81,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
this.topicIndex = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
this.subscriptionExpressions = subscriptionExpressions;
- this.subTopicRouteDataResultCache = new ConcurrentHashMap<>();
+ this.subscriptionRouteDataCache = new ConcurrentHashMap<>();
}
@Override
@@ -191,7 +190,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
}
final String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()));
final FilterExpression filterExpression = copy.get(topic);
- final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionTopicRouteResult(topic);
+ final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionLoadBalancer(topic);
final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
final MessageQueueImpl mq = result.takeMessageQueue();
final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
@@ -298,25 +297,25 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
return simpleSubscriptionSettings;
}
+ private SubscriptionLoadBalancer updateSubscriptionLoadBalancer(String topic, TopicRouteData topicRouteData) {
+ SubscriptionLoadBalancer subscriptionLoadBalancer = subscriptionRouteDataCache.get(topic);
+ subscriptionLoadBalancer = null == subscriptionLoadBalancer ? new SubscriptionLoadBalancer(topicRouteData) :
+ subscriptionLoadBalancer.update(topicRouteData);
+ subscriptionRouteDataCache.put(topic, subscriptionLoadBalancer);
+ return subscriptionLoadBalancer;
+ }
+
+ @Override
public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
- final SubscriptionLoadBalancer subscriptionLoadBalancer =
- new SubscriptionLoadBalancer(topicRouteData);
- subTopicRouteDataResultCache.put(topic, subscriptionLoadBalancer);
+ updateSubscriptionLoadBalancer(topic, topicRouteData);
}
- private ListenableFuture<SubscriptionLoadBalancer> getSubscriptionTopicRouteResult(final String topic) {
- SettableFuture<SubscriptionLoadBalancer> future0 = SettableFuture.create();
- final SubscriptionLoadBalancer result = subTopicRouteDataResultCache.get(topic);
- if (null != result) {
- future0.set(result);
- return future0;
+ private ListenableFuture<SubscriptionLoadBalancer> getSubscriptionLoadBalancer(final String topic) {
+ final SubscriptionLoadBalancer loadBalancer = subscriptionRouteDataCache.get(topic);
+ if (null != loadBalancer) {
+ return Futures.immediateFuture(loadBalancer);
}
- final ListenableFuture<TopicRouteData> future = getRouteData(topic);
- return Futures.transform(future, topicRouteDataResult -> {
- final SubscriptionLoadBalancer subscriptionLoadBalancer =
- new SubscriptionLoadBalancer(topicRouteDataResult);
- subTopicRouteDataResultCache.put(topic, subscriptionLoadBalancer);
- return subscriptionLoadBalancer;
- }, MoreExecutors.directExecutor());
+ return Futures.transform(getRouteData(topic), topicRouteData -> updateSubscriptionLoadBalancer(topic,
+ topicRouteData), MoreExecutors.directExecutor());
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
index 012d441..109855d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
@@ -41,7 +41,11 @@ public class SubscriptionLoadBalancer {
private final ImmutableList<MessageQueueImpl> messageQueues;
public SubscriptionLoadBalancer(TopicRouteData topicRouteData) {
- this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
+ this(new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)), topicRouteData);
+ }
+
+ private SubscriptionLoadBalancer(AtomicInteger index, TopicRouteData topicRouteData) {
+ this.index = index;
final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
.filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isReadable() &&
Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
@@ -52,6 +56,10 @@ public class SubscriptionLoadBalancer {
this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build();
}
+ SubscriptionLoadBalancer update(TopicRouteData topicRouteData) {
+ return new SubscriptionLoadBalancer(index, topicRouteData);
+ }
+
public MessageQueueImpl takeMessageQueue() {
final int next = index.getAndIncrement();
return messageQueues.get(IntMath.mod(next, messageQueues.size()));
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 307af8a..70cb6aa 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -398,7 +398,7 @@ class ProducerImpl extends ClientImpl implements Producer {
this.topics.add(topic);
// Get publishing topic route.
- final ListenableFuture<PublishingLoadBalancer> routeFuture = getPublishingTopicRouteResult(topic);
+ final ListenableFuture<PublishingLoadBalancer> routeFuture = getPublishingLoadBalancer(topic);
return Futures.transformAsync(routeFuture, result -> {
// Prepare the candidate message queue(s) for retry-sending in advance.
final List<MessageQueueImpl> candidates = null == messageGroup ? takeMessageQueues(result) :
@@ -541,21 +541,25 @@ class ProducerImpl extends ClientImpl implements Producer {
}, clientCallbackExecutor);
}
+ private PublishingLoadBalancer updatePublishingLoadBalancer(String topic, TopicRouteData topicRouteData) {
+ PublishingLoadBalancer publishingLoadBalancer = publishingRouteDataCache.get(topic);
+ publishingLoadBalancer = null == publishingLoadBalancer ? new PublishingLoadBalancer(topicRouteData) :
+ publishingLoadBalancer.update(topicRouteData);
+ publishingRouteDataCache.put(topic, publishingLoadBalancer);
+ return publishingLoadBalancer;
+ }
+
@Override
public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
- final PublishingLoadBalancer publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
- publishingRouteDataCache.put(topic, publishingLoadBalancer);
+ updatePublishingLoadBalancer(topic, topicRouteData);
}
- private ListenableFuture<PublishingLoadBalancer> getPublishingTopicRouteResult(final String topic) {
- final PublishingLoadBalancer result = publishingRouteDataCache.get(topic);
- if (null != result) {
- return Futures.immediateFuture(result);
- }
- return Futures.transformAsync(getRouteData(topic), topicRouteDataResult -> {
- final PublishingLoadBalancer loadBalancer = new PublishingLoadBalancer(topicRouteDataResult);
- publishingRouteDataCache.put(topic, loadBalancer);
+ private ListenableFuture<PublishingLoadBalancer> getPublishingLoadBalancer(final String topic) {
+ final PublishingLoadBalancer loadBalancer = publishingRouteDataCache.get(topic);
+ if (null != loadBalancer) {
return Futures.immediateFuture(loadBalancer);
- }, MoreExecutors.directExecutor());
+ }
+ return Futures.transform(getRouteData(topic), topicRouteData -> updatePublishingLoadBalancer(topic,
+ topicRouteData), MoreExecutors.directExecutor());
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
index feb9616..1ba9e52 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
@@ -50,7 +50,11 @@ public class PublishingLoadBalancer {
private final ImmutableList<MessageQueueImpl> messageQueues;
public PublishingLoadBalancer(TopicRouteData topicRouteData) {
- this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
+ this(new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)), topicRouteData);
+ }
+
+ private PublishingLoadBalancer(AtomicInteger index, TopicRouteData topicRouteData) {
+ this.index = index;
final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
.filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isWritable() &&
Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
@@ -61,6 +65,10 @@ public class PublishingLoadBalancer {
this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build();
}
+ PublishingLoadBalancer update(TopicRouteData topicRouteData) {
+ return new PublishingLoadBalancer(index, topicRouteData);
+ }
+
public MessageQueueImpl takeMessageQueueByMessageGroup(String messageGroup) {
final long hashCode = Hashing.sipHash24().hashBytes(messageGroup.getBytes(StandardCharsets.UTF_8)).asLong();
final int index = LongMath.mod(hashCode, messageQueues.size());