You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2018/06/22 11:58:23 UTC

[rocketmq] branch develop updated (3ddc96f -> 92f0e1f)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git.


    omit 3ddc96f  Fix checkstyle
    omit c0138f4  #325 refactor method
    omit 63927d5  #325 register nameserver  add crc32 check
    omit 1e2aefe  #325 register nameserver  add crc32 check
    omit 283c539  #325 register nameserver  add crc32 check
    omit 130a56e  #325 register nameserver  add crc32 check
    omit e8f1020  #325 all nameserer share topicroute Data
    omit b978ff1  #issues-311 refactor new topic register logic
     new 92f0e1f  Optimize broker topic route registration to relieve stress on Java GC

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (3ddc96f)
            \
             N -- N -- N   refs/heads/develop (92f0e1f)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


[rocketmq] 01/01: Optimize broker topic route registration to relieve stress on Java GC

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 92f0e1f5abb694c90ced9468e9e1d03edcdcedfe
Author: 傅冲 <yu...@alibaba-inc.com>
AuthorDate: Fri May 25 13:37:10 2018 +0800

    Optimize broker topic route registration to relieve stress on Java GC
---
 .../apache/rocketmq/broker/BrokerController.java   | 69 +++++++++++++++-------
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 42 +++++++------
 .../broker/processor/AdminBrokerProcessor.java     | 10 ++--
 .../org/apache/rocketmq/common/BrokerConfig.java   |  2 -
 .../namesrv/RegisterBrokerRequestHeader.java       | 10 ++++
 .../namesrv/processor/DefaultRequestProcessor.java | 26 ++++++++
 6 files changed, 107 insertions(+), 52 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 293e51e..ed85a67 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -62,6 +63,7 @@ import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
@@ -768,6 +770,24 @@ public class BrokerController {
         }
     }
 
+    public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
+        TopicConfig registerTopicConfig = topicConfig;
+        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+            registerTopicConfig =
+                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+                    this.brokerConfig.getBrokerPermission());
+        }
+
+        ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
+        topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+        topicConfigSerializeWrapper.setDataVersion(dataVersion);
+        topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+
+        doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
+    }
+
     public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
         TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
 
@@ -788,30 +808,35 @@ public class BrokerController {
             this.brokerConfig.getBrokerName(),
             this.brokerConfig.getBrokerId(),
             this.brokerConfig.getRegisterBrokerTimeoutMills())) {
-            List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
-                this.brokerConfig.getBrokerClusterName(),
-                this.getBrokerAddr(),
-                this.brokerConfig.getBrokerName(),
-                this.brokerConfig.getBrokerId(),
-                this.getHAServerAddr(),
-                topicConfigWrapper,
-                this.filterServerManager.buildNewFilterServerList(),
-                oneway,
-                this.brokerConfig.getRegisterBrokerTimeoutMills(),
-                this.brokerConfig.isCompressedRegister());
-
-            if (registerBrokerResultList.size() > 0) {
-                RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
-                if (registerBrokerResult != null) {
-                    if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
-                        this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
-                    }
+            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
+        }
+    }
+
+    private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
+        TopicConfigSerializeWrapper topicConfigWrapper) {
+        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
+            this.brokerConfig.getBrokerClusterName(),
+            this.getBrokerAddr(),
+            this.brokerConfig.getBrokerName(),
+            this.brokerConfig.getBrokerId(),
+            this.getHAServerAddr(),
+            topicConfigWrapper,
+            this.filterServerManager.buildNewFilterServerList(),
+            oneway,
+            this.brokerConfig.getRegisterBrokerTimeoutMills(),
+            this.brokerConfig.isCompressedRegister());
+
+        if (registerBrokerResultList.size() > 0) {
+            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
+            if (registerBrokerResult != null) {
+                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
+                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+                }
 
-                    this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
 
-                    if (checkOrderConfig) {
-                        this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
-                    }
+                if (checkOrderConfig) {
+                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                 }
             }
         }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 2825a34..4dee01c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -125,14 +126,28 @@ public class BrokerOuterAPI {
         final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
         List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
         if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
+
+            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
+            requestHeader.setBrokerAddr(brokerAddr);
+            requestHeader.setBrokerId(brokerId);
+            requestHeader.setBrokerName(brokerName);
+            requestHeader.setClusterName(clusterName);
+            requestHeader.setHaServerAddr(haServerAddr);
+            requestHeader.setCompressed(compressed);
+
+            RegisterBrokerBody requestBody = new RegisterBrokerBody();
+            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
+            requestBody.setFilterServerList(filterServerList);
+            final byte[] body = requestBody.encode(compressed);
+            final int bodyCrc32 = UtilAll.crc32(body);
+            requestHeader.setBodyCrc32(bodyCrc32);
             final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
             for (final String namesrvAddr : nameServerAddressList) {
                 brokerOuterExecutor.execute(new Runnable() {
                     @Override
                     public void run() {
                         try {
-                            RegisterBrokerResult result = registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
-                                haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills, compressed);
+                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                             if (result != null) {
                                 registerBrokerResultList.add(result);
                             }
@@ -158,31 +173,14 @@ public class BrokerOuterAPI {
 
     private RegisterBrokerResult registerBroker(
         final String namesrvAddr,
-        final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final long brokerId,
-        final String haServerAddr,
-        final TopicConfigSerializeWrapper topicConfigWrapper,
-        final List<String> filterServerList,
         final boolean oneway,
         final int timeoutMills,
-        final boolean compressed
+        final RegisterBrokerRequestHeader requestHeader,
+        final byte[] body
     ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
         InterruptedException {
-        RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
-        requestHeader.setBrokerAddr(brokerAddr);
-        requestHeader.setBrokerId(brokerId);
-        requestHeader.setBrokerName(brokerName);
-        requestHeader.setClusterName(clusterName);
-        requestHeader.setHaServerAddr(haServerAddr);
-        requestHeader.setCompressed(compressed);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
-
-        RegisterBrokerBody requestBody = new RegisterBrokerBody();
-        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
-        requestBody.setFilterServerList(filterServerList);
-        request.setBody(requestBody.encode(requestHeader.isCompressed()));
+        request.setBody(body);
 
         if (oneway) {
             try {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index c0a4b20..1a704a8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -212,7 +212,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         return false;
     }
 
-    private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
+    private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final CreateTopicRequestHeader requestHeader =
@@ -246,14 +246,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
         this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
 
-        if (brokerController.getBrokerConfig().getRegisterNameServerPeriod() == 0) {
-            this.brokerController.registerBrokerAll(false, true, true);
-        }
+        this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
 
         return null;
     }
 
-    private RemotingCommand deleteTopic(ChannelHandlerContext ctx,
+    private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         DeleteTopicRequestHeader requestHeader =
@@ -299,7 +297,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         return response;
     }
 
-    private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+    private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
         log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 7caf830..203431a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -141,8 +141,6 @@ public class BrokerConfig {
      * This configurable item defines interval of topics registration of broker to name server. Allowing values are
      * between 10, 000 and 60, 000 milliseconds.
      *
-     * If set to 0, newly created topics will be immediately reported to name servers and interval of periodical
-     * registration defaults to 10, 000 in milliseconds.
      */
     private int registerNameServerPeriod = 1000 * 30;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
index 7ed7a40..19175b0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
@@ -38,6 +38,8 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
 
     private boolean compressed;
 
+    private Integer bodyCrc32 = 0;
+
     public void checkFields() throws RemotingCommandException {
     }
 
@@ -88,4 +90,12 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
     public void setCompressed(boolean compressed) {
         this.compressed = compressed;
     }
+
+    public Integer getBodyCrc32() {
+        return bodyCrc32;
+    }
+
+    public void setBodyCrc32(Integer bodyCrc32) {
+        this.bodyCrc32 = bodyCrc32;
+    }
 }
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 236e6a1..467078c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MQVersion.Version;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -196,6 +197,12 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         final RegisterBrokerRequestHeader requestHeader =
             (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
 
+        if (!checksum(ctx, request, requestHeader)) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("crc32 not match");
+            return response;
+        }
+
         RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
 
         if (request.getBody() != null) {
@@ -230,6 +237,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         return response;
     }
 
+    private boolean checksum(ChannelHandlerContext ctx, RemotingCommand request,
+        RegisterBrokerRequestHeader requestHeader) {
+        if (requestHeader.getBodyCrc32() != 0) {
+            final int crc32 = UtilAll.crc32(request.getBody());
+            if (crc32 != requestHeader.getBodyCrc32()) {
+                log.warn(String.format("receive registerBroker request,crc32 not match,from %s",
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel())));
+                return false;
+            }
+        }
+        return true;
+    }
+
     public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
@@ -261,6 +281,12 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         final RegisterBrokerRequestHeader requestHeader =
             (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
 
+        if (!checksum(ctx, request, requestHeader)) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("crc32 not match");
+            return response;
+        }
+
         TopicConfigSerializeWrapper topicConfigWrapper;
         if (request.getBody() != null) {
             topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);