You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/11/03 09:33:57 UTC

[rocketmq-clients] 02/02: Reserve index of load balancer when topic route is updated

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit cfb9daa75f2a49d2f205d223de9bf7331c002620
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Nov 2 11:11:48 2022 +0800

    Reserve index of load balancer when topic route is updated
---
 .../java/impl/consumer/SimpleConsumerImpl.java     | 39 +++++++++++-----------
 .../impl/consumer/SubscriptionLoadBalancer.java    | 10 +++++-
 .../client/java/impl/producer/ProducerImpl.java    | 28 +++++++++-------
 .../java/impl/producer/PublishingLoadBalancer.java | 10 +++++-
 4 files changed, 53 insertions(+), 34 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 1b565dd..310df59 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -27,7 +27,6 @@ import com.google.common.math.IntMath;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -68,7 +67,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     private final AtomicInteger topicIndex;
 
     private final Map<String /* topic */, FilterExpression> subscriptionExpressions;
-    private final ConcurrentMap<String /* topic */, SubscriptionLoadBalancer> subTopicRouteDataResultCache;
+    private final ConcurrentMap<String /* topic */, SubscriptionLoadBalancer> subscriptionRouteDataCache;
 
     public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration,
         Map<String, FilterExpression> subscriptionExpressions) {
@@ -82,7 +81,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         this.topicIndex = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
 
         this.subscriptionExpressions = subscriptionExpressions;
-        this.subTopicRouteDataResultCache = new ConcurrentHashMap<>();
+        this.subscriptionRouteDataCache = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -191,7 +190,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         }
         final String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()));
         final FilterExpression filterExpression = copy.get(topic);
-        final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionTopicRouteResult(topic);
+        final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionLoadBalancer(topic);
         final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
             final MessageQueueImpl mq = result.takeMessageQueue();
             final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
@@ -298,25 +297,25 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         return simpleSubscriptionSettings;
     }
 
+    private SubscriptionLoadBalancer updateSubscriptionLoadBalancer(String topic, TopicRouteData topicRouteData) {
+        SubscriptionLoadBalancer subscriptionLoadBalancer = subscriptionRouteDataCache.get(topic);
+        subscriptionLoadBalancer = null == subscriptionLoadBalancer ? new SubscriptionLoadBalancer(topicRouteData) :
+            subscriptionLoadBalancer.update(topicRouteData);
+        subscriptionRouteDataCache.put(topic, subscriptionLoadBalancer);
+        return subscriptionLoadBalancer;
+    }
+
+    @Override
     public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
-        final SubscriptionLoadBalancer subscriptionLoadBalancer =
-            new SubscriptionLoadBalancer(topicRouteData);
-        subTopicRouteDataResultCache.put(topic, subscriptionLoadBalancer);
+        updateSubscriptionLoadBalancer(topic, topicRouteData);
     }
 
-    private ListenableFuture<SubscriptionLoadBalancer> getSubscriptionTopicRouteResult(final String topic) {
-        SettableFuture<SubscriptionLoadBalancer> future0 = SettableFuture.create();
-        final SubscriptionLoadBalancer result = subTopicRouteDataResultCache.get(topic);
-        if (null != result) {
-            future0.set(result);
-            return future0;
+    private ListenableFuture<SubscriptionLoadBalancer> getSubscriptionLoadBalancer(final String topic) {
+        final SubscriptionLoadBalancer loadBalancer = subscriptionRouteDataCache.get(topic);
+        if (null != loadBalancer) {
+            return Futures.immediateFuture(loadBalancer);
         }
-        final ListenableFuture<TopicRouteData> future = getRouteData(topic);
-        return Futures.transform(future, topicRouteDataResult -> {
-            final SubscriptionLoadBalancer subscriptionLoadBalancer =
-                new SubscriptionLoadBalancer(topicRouteDataResult);
-            subTopicRouteDataResultCache.put(topic, subscriptionLoadBalancer);
-            return subscriptionLoadBalancer;
-        }, MoreExecutors.directExecutor());
+        return Futures.transform(getRouteData(topic), topicRouteData -> updateSubscriptionLoadBalancer(topic,
+            topicRouteData), MoreExecutors.directExecutor());
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
index 012d441..109855d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
@@ -41,7 +41,11 @@ public class SubscriptionLoadBalancer {
     private final ImmutableList<MessageQueueImpl> messageQueues;
 
     public SubscriptionLoadBalancer(TopicRouteData topicRouteData) {
-        this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
+        this(new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)), topicRouteData);
+    }
+
+    private SubscriptionLoadBalancer(AtomicInteger index, TopicRouteData topicRouteData) {
+        this.index = index;
         final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
             .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isReadable() &&
                 Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
@@ -52,6 +56,10 @@ public class SubscriptionLoadBalancer {
         this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build();
     }
 
+    SubscriptionLoadBalancer update(TopicRouteData topicRouteData) {
+        return new SubscriptionLoadBalancer(index, topicRouteData);
+    }
+
     public MessageQueueImpl takeMessageQueue() {
         final int next = index.getAndIncrement();
         return messageQueues.get(IntMath.mod(next, messageQueues.size()));
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 307af8a..70cb6aa 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -398,7 +398,7 @@ class ProducerImpl extends ClientImpl implements Producer {
 
         this.topics.add(topic);
         // Get publishing topic route.
-        final ListenableFuture<PublishingLoadBalancer> routeFuture = getPublishingTopicRouteResult(topic);
+        final ListenableFuture<PublishingLoadBalancer> routeFuture = getPublishingLoadBalancer(topic);
         return Futures.transformAsync(routeFuture, result -> {
             // Prepare the candidate message queue(s) for retry-sending in advance.
             final List<MessageQueueImpl> candidates = null == messageGroup ? takeMessageQueues(result) :
@@ -541,21 +541,25 @@ class ProducerImpl extends ClientImpl implements Producer {
         }, clientCallbackExecutor);
     }
 
+    private PublishingLoadBalancer updatePublishingLoadBalancer(String topic, TopicRouteData topicRouteData) {
+        PublishingLoadBalancer publishingLoadBalancer = publishingRouteDataCache.get(topic);
+        publishingLoadBalancer = null == publishingLoadBalancer ? new PublishingLoadBalancer(topicRouteData) :
+            publishingLoadBalancer.update(topicRouteData);
+        publishingRouteDataCache.put(topic, publishingLoadBalancer);
+        return publishingLoadBalancer;
+    }
+
     @Override
     public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
-        final PublishingLoadBalancer publishingLoadBalancer = new PublishingLoadBalancer(topicRouteData);
-        publishingRouteDataCache.put(topic, publishingLoadBalancer);
+        updatePublishingLoadBalancer(topic, topicRouteData);
     }
 
-    private ListenableFuture<PublishingLoadBalancer> getPublishingTopicRouteResult(final String topic) {
-        final PublishingLoadBalancer result = publishingRouteDataCache.get(topic);
-        if (null != result) {
-            return Futures.immediateFuture(result);
-        }
-        return Futures.transformAsync(getRouteData(topic), topicRouteDataResult -> {
-            final PublishingLoadBalancer loadBalancer = new PublishingLoadBalancer(topicRouteDataResult);
-            publishingRouteDataCache.put(topic, loadBalancer);
+    private ListenableFuture<PublishingLoadBalancer> getPublishingLoadBalancer(final String topic) {
+        final PublishingLoadBalancer loadBalancer = publishingRouteDataCache.get(topic);
+        if (null != loadBalancer) {
             return Futures.immediateFuture(loadBalancer);
-        }, MoreExecutors.directExecutor());
+        }
+        return Futures.transform(getRouteData(topic), topicRouteData -> updatePublishingLoadBalancer(topic,
+            topicRouteData), MoreExecutors.directExecutor());
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
index feb9616..1ba9e52 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
@@ -50,7 +50,11 @@ public class PublishingLoadBalancer {
     private final ImmutableList<MessageQueueImpl> messageQueues;
 
     public PublishingLoadBalancer(TopicRouteData topicRouteData) {
-        this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
+        this(new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE)), topicRouteData);
+    }
+
+    private PublishingLoadBalancer(AtomicInteger index, TopicRouteData topicRouteData) {
+        this.index = index;
         final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
             .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isWritable() &&
                 Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
@@ -61,6 +65,10 @@ public class PublishingLoadBalancer {
         this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build();
     }
 
+    PublishingLoadBalancer update(TopicRouteData topicRouteData) {
+        return new PublishingLoadBalancer(index, topicRouteData);
+    }
+
     public MessageQueueImpl takeMessageQueueByMessageGroup(String messageGroup) {
         final long hashCode = Hashing.sipHash24().hashBytes(messageGroup.getBytes(StandardCharsets.UTF_8)).asLong();
         final int index = LongMath.mod(hashCode, messageQueues.size());