You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2018/06/22 11:58:24 UTC

[rocketmq] 01/01: Optimize broker topic route registration to relieve stress on Java GC

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 92f0e1f5abb694c90ced9468e9e1d03edcdcedfe
Author: 傅冲 <yu...@alibaba-inc.com>
AuthorDate: Fri May 25 13:37:10 2018 +0800

    Optimize broker topic route registration to relieve stress on Java GC
---
 .../apache/rocketmq/broker/BrokerController.java   | 69 +++++++++++++++-------
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 42 +++++++------
 .../broker/processor/AdminBrokerProcessor.java     | 10 ++--
 .../org/apache/rocketmq/common/BrokerConfig.java   |  2 -
 .../namesrv/RegisterBrokerRequestHeader.java       | 10 ++++
 .../namesrv/processor/DefaultRequestProcessor.java | 26 ++++++++
 6 files changed, 107 insertions(+), 52 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 293e51e..ed85a67 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -62,6 +63,7 @@ import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
@@ -768,6 +770,24 @@ public class BrokerController {
         }
     }
 
+    public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
+        TopicConfig registerTopicConfig = topicConfig;
+        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+            registerTopicConfig =
+                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+                    this.brokerConfig.getBrokerPermission());
+        }
+
+        ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
+        topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+        topicConfigSerializeWrapper.setDataVersion(dataVersion);
+        topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+
+        doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
+    }
+
     public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
         TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
 
@@ -788,30 +808,35 @@ public class BrokerController {
             this.brokerConfig.getBrokerName(),
             this.brokerConfig.getBrokerId(),
             this.brokerConfig.getRegisterBrokerTimeoutMills())) {
-            List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
-                this.brokerConfig.getBrokerClusterName(),
-                this.getBrokerAddr(),
-                this.brokerConfig.getBrokerName(),
-                this.brokerConfig.getBrokerId(),
-                this.getHAServerAddr(),
-                topicConfigWrapper,
-                this.filterServerManager.buildNewFilterServerList(),
-                oneway,
-                this.brokerConfig.getRegisterBrokerTimeoutMills(),
-                this.brokerConfig.isCompressedRegister());
-
-            if (registerBrokerResultList.size() > 0) {
-                RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
-                if (registerBrokerResult != null) {
-                    if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
-                        this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
-                    }
+            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
+        }
+    }
+
+    private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
+        TopicConfigSerializeWrapper topicConfigWrapper) {
+        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
+            this.brokerConfig.getBrokerClusterName(),
+            this.getBrokerAddr(),
+            this.brokerConfig.getBrokerName(),
+            this.brokerConfig.getBrokerId(),
+            this.getHAServerAddr(),
+            topicConfigWrapper,
+            this.filterServerManager.buildNewFilterServerList(),
+            oneway,
+            this.brokerConfig.getRegisterBrokerTimeoutMills(),
+            this.brokerConfig.isCompressedRegister());
+
+        if (registerBrokerResultList.size() > 0) {
+            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
+            if (registerBrokerResult != null) {
+                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
+                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+                }
 
-                    this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
 
-                    if (checkOrderConfig) {
-                        this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
-                    }
+                if (checkOrderConfig) {
+                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                 }
             }
         }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 2825a34..4dee01c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -125,14 +126,28 @@ public class BrokerOuterAPI {
         final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
         List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
         if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
+
+            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
+            requestHeader.setBrokerAddr(brokerAddr);
+            requestHeader.setBrokerId(brokerId);
+            requestHeader.setBrokerName(brokerName);
+            requestHeader.setClusterName(clusterName);
+            requestHeader.setHaServerAddr(haServerAddr);
+            requestHeader.setCompressed(compressed);
+
+            RegisterBrokerBody requestBody = new RegisterBrokerBody();
+            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
+            requestBody.setFilterServerList(filterServerList);
+            final byte[] body = requestBody.encode(compressed);
+            final int bodyCrc32 = UtilAll.crc32(body);
+            requestHeader.setBodyCrc32(bodyCrc32);
             final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
             for (final String namesrvAddr : nameServerAddressList) {
                 brokerOuterExecutor.execute(new Runnable() {
                     @Override
                     public void run() {
                         try {
-                            RegisterBrokerResult result = registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
-                                haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills, compressed);
+                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                             if (result != null) {
                                 registerBrokerResultList.add(result);
                             }
@@ -158,31 +173,14 @@ public class BrokerOuterAPI {
 
     private RegisterBrokerResult registerBroker(
         final String namesrvAddr,
-        final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final long brokerId,
-        final String haServerAddr,
-        final TopicConfigSerializeWrapper topicConfigWrapper,
-        final List<String> filterServerList,
         final boolean oneway,
         final int timeoutMills,
-        final boolean compressed
+        final RegisterBrokerRequestHeader requestHeader,
+        final byte[] body
     ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
         InterruptedException {
-        RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
-        requestHeader.setBrokerAddr(brokerAddr);
-        requestHeader.setBrokerId(brokerId);
-        requestHeader.setBrokerName(brokerName);
-        requestHeader.setClusterName(clusterName);
-        requestHeader.setHaServerAddr(haServerAddr);
-        requestHeader.setCompressed(compressed);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
-
-        RegisterBrokerBody requestBody = new RegisterBrokerBody();
-        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
-        requestBody.setFilterServerList(filterServerList);
-        request.setBody(requestBody.encode(requestHeader.isCompressed()));
+        request.setBody(body);
 
         if (oneway) {
             try {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index c0a4b20..1a704a8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -212,7 +212,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         return false;
     }
 
-    private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
+    private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final CreateTopicRequestHeader requestHeader =
@@ -246,14 +246,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
         this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
 
-        if (brokerController.getBrokerConfig().getRegisterNameServerPeriod() == 0) {
-            this.brokerController.registerBrokerAll(false, true, true);
-        }
+        this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
 
         return null;
     }
 
-    private RemotingCommand deleteTopic(ChannelHandlerContext ctx,
+    private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         DeleteTopicRequestHeader requestHeader =
@@ -299,7 +297,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         return response;
     }
 
-    private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+    private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
         log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 7caf830..203431a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -141,8 +141,6 @@ public class BrokerConfig {
      * This configurable item defines interval of topics registration of broker to name server. Allowing values are
      * between 10, 000 and 60, 000 milliseconds.
      *
-     * If set to 0, newly created topics will be immediately reported to name servers and interval of periodical
-     * registration defaults to 10, 000 in milliseconds.
      */
     private int registerNameServerPeriod = 1000 * 30;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
index 7ed7a40..19175b0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
@@ -38,6 +38,8 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
 
     private boolean compressed;
 
+    private Integer bodyCrc32 = 0;
+
     public void checkFields() throws RemotingCommandException {
     }
 
@@ -88,4 +90,12 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
     public void setCompressed(boolean compressed) {
         this.compressed = compressed;
     }
+
+    public Integer getBodyCrc32() {
+        return bodyCrc32;
+    }
+
+    public void setBodyCrc32(Integer bodyCrc32) {
+        this.bodyCrc32 = bodyCrc32;
+    }
 }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 236e6a1..467078c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MQVersion.Version;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -196,6 +197,12 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         final RegisterBrokerRequestHeader requestHeader =
             (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
 
+        if (!checksum(ctx, request, requestHeader)) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("crc32 not match");
+            return response;
+        }
+
         RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
 
         if (request.getBody() != null) {
@@ -230,6 +237,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         return response;
     }
 
+    private boolean checksum(ChannelHandlerContext ctx, RemotingCommand request,
+        RegisterBrokerRequestHeader requestHeader) {
+        if (requestHeader.getBodyCrc32() != 0) {
+            final int crc32 = UtilAll.crc32(request.getBody());
+            if (crc32 != requestHeader.getBodyCrc32()) {
+                log.warn(String.format("receive registerBroker request,crc32 not match,from %s",
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel())));
+                return false;
+            }
+        }
+        return true;
+    }
+
     public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
@@ -261,6 +281,12 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         final RegisterBrokerRequestHeader requestHeader =
             (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
 
+        if (!checksum(ctx, request, requestHeader)) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("crc32 not match");
+            return response;
+        }
+
         TopicConfigSerializeWrapper topicConfigWrapper;
         if (request.getBody() != null) {
             topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);