You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/10/08 09:40:51 UTC

[GitHub] [rocketmq] dongeforever commented on a diff in pull request #4772: [ISSUE #3870] Optimize topic route data notification

dongeforever commented on code in PR #4772:
URL: https://github.com/apache/rocketmq/pull/4772#discussion_r990617511


##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/TopicRouteNotifier.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.routeinfo;
+
+import io.netty.channel.Channel;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.UpdateTopicRouteRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * if topic route info changed, then notify client scheduled
+ */
+public class TopicRouteNotifier {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
+    private final RouteInfoManager routeInfoManager;
+
+    private final RemotingServer remotingServer;
+
+    public TopicRouteNotifier(RemotingServer remotingServer, RouteInfoManager routeInfoManager) {
+        this.routeInfoManager = routeInfoManager;
+        this.remotingServer = remotingServer;
+    }
+
+    /**
+     * if topic route info has changed in the period, then notify client
+     */
+    public void notifyClients() {
+        Map<String, Set<Channel>> topicAndChannelMap = routeInfoManager.getAndResetChangedTopicMap();
+        if (MapUtils.isEmpty(topicAndChannelMap)) {

Review Comment:
   here may need a dynamic config to control whether to notify or not.



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java:
##########
@@ -455,6 +470,10 @@ private void createAndUpdateQueueData(final String brokerName, final TopicConfig
                 log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), existedQD,
                     queueData);
                 queueDataMap.put(brokerName, queueData);
+                // if broker restart, many topic will register, for avoid hot, then ignore
+                if (singleTopicRouteChanged) {

Review Comment:
   In case of "broker restart/failure", it also needs to be notified.
   
   How about using the multi-node to improve performance and reduce CPU cost?
   
   
   
   



##########
namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/TopicRouteNotifier.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.namesrv.routeinfo;
+
+import io.netty.channel.Channel;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.namesrv.UpdateTopicRouteRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * if topic route info changed, then notify client scheduled
+ */
+public class TopicRouteNotifier {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
+    private final RouteInfoManager routeInfoManager;
+
+    private final RemotingServer remotingServer;
+
+    public TopicRouteNotifier(RemotingServer remotingServer, RouteInfoManager routeInfoManager) {
+        this.routeInfoManager = routeInfoManager;
+        this.remotingServer = remotingServer;
+    }
+
+    /**
+     * if topic route info has changed in the period, then notify client
+     */
+    public void notifyClients() {
+        Map<String, Set<Channel>> topicAndChannelMap = routeInfoManager.getAndResetChangedTopicMap();
+        if (MapUtils.isEmpty(topicAndChannelMap)) {
+            return;
+        }
+
+        for (Map.Entry<String, Set<Channel>> entry : topicAndChannelMap.entrySet()) {
+            notifyClientsByTopic(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private void notifyClientsByTopic(String topic, Set<Channel> channelSet) {
+        if (topic == null || CollectionUtils.isEmpty(channelSet)) {
+            return;
+        }
+        for (Channel channel : channelSet) {
+            RemotingCommand remotingCommand = transToCommand(topic);
+            try {
+                remotingServer.invokeOneway(channel, remotingCommand, 50);
+            } catch (Exception e) {
+                log.error("invoke client exception. topic={}, channel={}, error={}", topic, channel, e.toString());
+            }
+        }
+    }
+

Review Comment:
   Here may need to support multi-topic mode,  that is to notify the channel multi topics at one time.
   
   Maybe the getTopicRoute needs to support multi-mode too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org