You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2018/06/22 11:58:24 UTC
[rocketmq] 01/01: Optimize broker topic route registration to
relieve stress on Java GC
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 92f0e1f5abb694c90ced9468e9e1d03edcdcedfe
Author: 傅冲 <yu...@alibaba-inc.com>
AuthorDate: Fri May 25 13:37:10 2018 +0800
Optimize broker topic route registration to relieve stress on Java GC
---
.../apache/rocketmq/broker/BrokerController.java | 69 +++++++++++++++-------
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 42 +++++++------
.../broker/processor/AdminBrokerProcessor.java | 10 ++--
.../org/apache/rocketmq/common/BrokerConfig.java | 2 -
.../namesrv/RegisterBrokerRequestHeader.java | 10 ++++
.../namesrv/processor/DefaultRequestProcessor.java | 26 ++++++++
6 files changed, 107 insertions(+), 52 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 293e51e..ed85a67 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -62,6 +63,7 @@ import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
@@ -768,6 +770,24 @@ public class BrokerController {
}
}
+ public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
+ TopicConfig registerTopicConfig = topicConfig;
+ if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+ || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+ registerTopicConfig =
+ new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+ this.brokerConfig.getBrokerPermission());
+ }
+
+ ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
+ topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+ topicConfigSerializeWrapper.setDataVersion(dataVersion);
+ topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+
+ doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
+ }
+
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
@@ -788,30 +808,35 @@ public class BrokerController {
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.brokerConfig.getRegisterBrokerTimeoutMills())) {
- List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
- this.getHAServerAddr(),
- topicConfigWrapper,
- this.filterServerManager.buildNewFilterServerList(),
- oneway,
- this.brokerConfig.getRegisterBrokerTimeoutMills(),
- this.brokerConfig.isCompressedRegister());
-
- if (registerBrokerResultList.size() > 0) {
- RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
- if (registerBrokerResult != null) {
- if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
- this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
- }
+ doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
+ }
+ }
+
+ private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
+ TopicConfigSerializeWrapper topicConfigWrapper) {
+ List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.getHAServerAddr(),
+ topicConfigWrapper,
+ this.filterServerManager.buildNewFilterServerList(),
+ oneway,
+ this.brokerConfig.getRegisterBrokerTimeoutMills(),
+ this.brokerConfig.isCompressedRegister());
+
+ if (registerBrokerResultList.size() > 0) {
+ RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
+ if (registerBrokerResult != null) {
+ if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
+ this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+ }
- this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+ this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
- if (checkOrderConfig) {
- this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
- }
+ if (checkOrderConfig) {
+ this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 2825a34..4dee01c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -125,14 +126,28 @@ public class BrokerOuterAPI {
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
+
+ final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
+ requestHeader.setBrokerAddr(brokerAddr);
+ requestHeader.setBrokerId(brokerId);
+ requestHeader.setBrokerName(brokerName);
+ requestHeader.setClusterName(clusterName);
+ requestHeader.setHaServerAddr(haServerAddr);
+ requestHeader.setCompressed(compressed);
+
+ RegisterBrokerBody requestBody = new RegisterBrokerBody();
+ requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
+ requestBody.setFilterServerList(filterServerList);
+ final byte[] body = requestBody.encode(compressed);
+ final int bodyCrc32 = UtilAll.crc32(body);
+ requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
- RegisterBrokerResult result = registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
- haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills, compressed);
+ RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
@@ -158,31 +173,14 @@ public class BrokerOuterAPI {
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
- final boolean compressed
+ final RegisterBrokerRequestHeader requestHeader,
+ final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
- RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
- requestHeader.setBrokerAddr(brokerAddr);
- requestHeader.setBrokerId(brokerId);
- requestHeader.setBrokerName(brokerName);
- requestHeader.setClusterName(clusterName);
- requestHeader.setHaServerAddr(haServerAddr);
- requestHeader.setCompressed(compressed);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
-
- RegisterBrokerBody requestBody = new RegisterBrokerBody();
- requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
- requestBody.setFilterServerList(filterServerList);
- request.setBody(requestBody.encode(requestHeader.isCompressed()));
+ request.setBody(body);
if (oneway) {
try {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index c0a4b20..1a704a8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -212,7 +212,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return false;
}
- private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
+ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
@@ -246,14 +246,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
- if (brokerController.getBrokerConfig().getRegisterNameServerPeriod() == 0) {
- this.brokerController.registerBrokerAll(false, true, true);
- }
+ this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
return null;
}
- private RemotingCommand deleteTopic(ChannelHandlerContext ctx,
+ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DeleteTopicRequestHeader requestHeader =
@@ -299,7 +297,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
- private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+ private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 7caf830..203431a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -141,8 +141,6 @@ public class BrokerConfig {
* This configurable item defines interval of topics registration of broker to name server. Allowing values are
* between 10, 000 and 60, 000 milliseconds.
*
- * If set to 0, newly created topics will be immediately reported to name servers and interval of periodical
- * registration defaults to 10, 000 in milliseconds.
*/
private int registerNameServerPeriod = 1000 * 30;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
index 7ed7a40..19175b0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
@@ -38,6 +38,8 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
private boolean compressed;
+ private Integer bodyCrc32 = 0;
+
public void checkFields() throws RemotingCommandException {
}
@@ -88,4 +90,12 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
public void setCompressed(boolean compressed) {
this.compressed = compressed;
}
+
+ public Integer getBodyCrc32() {
+ return bodyCrc32;
+ }
+
+ public void setBodyCrc32(Integer bodyCrc32) {
+ this.bodyCrc32 = bodyCrc32;
+ }
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 236e6a1..467078c 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MQVersion.Version;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
@@ -196,6 +197,12 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+ if (!checksum(ctx, request, requestHeader)) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("crc32 not match");
+ return response;
+ }
+
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) {
@@ -230,6 +237,19 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
+ private boolean checksum(ChannelHandlerContext ctx, RemotingCommand request,
+ RegisterBrokerRequestHeader requestHeader) {
+ if (requestHeader.getBodyCrc32() != 0) {
+ final int crc32 = UtilAll.crc32(request.getBody());
+ if (crc32 != requestHeader.getBodyCrc32()) {
+ log.warn(String.format("receive registerBroker request,crc32 not match,from %s",
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel())));
+ return false;
+ }
+ }
+ return true;
+ }
+
public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
@@ -261,6 +281,12 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+ if (!checksum(ctx, request, requestHeader)) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("crc32 not match");
+ return response;
+ }
+
TopicConfigSerializeWrapper topicConfigWrapper;
if (request.getBody() != null) {
topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);