You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/15 06:59:58 UTC
[rocketmq] 02/02: Fix check style
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 1c69b64806001b97a3a279dfaeb730ada8c6aa05
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Dec 15 14:59:19 2021 +0800
Fix check style
---
.../apache/rocketmq/broker/BrokerController.java | 295 ++++++++++-----------
1 file changed, 144 insertions(+), 151 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index d0d319d..76c8007 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -157,7 +157,7 @@ public class BrokerController {
private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
private final BrokerOuterAPI brokerOuterAPI;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "BrokerControllerScheduledThread"));
+ "BrokerControllerScheduledThread"));
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> ackThreadPoolQueue;
@@ -204,10 +204,10 @@ public class BrokerController {
private long shouldStartTime;
public BrokerController(
- final BrokerConfig brokerConfig,
- final NettyServerConfig nettyServerConfig,
- final NettyClientConfig nettyClientConfig,
- final MessageStoreConfig messageStoreConfig
+ final BrokerConfig brokerConfig,
+ final NettyServerConfig nettyServerConfig,
+ final NettyClientConfig nettyClientConfig,
+ final MessageStoreConfig messageStoreConfig
) {
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
@@ -223,7 +223,7 @@ public class BrokerController {
this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this);
this.sendMessageProcessor = new SendMessageProcessor(this);
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService,
- this.popMessageProcessor);
+ this.popMessageProcessor);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
this.consumerFilterManager = new ConsumerFilterManager(this);
@@ -255,9 +255,9 @@ public class BrokerController {
this.brokerFastFailure = new BrokerFastFailure(this);
this.configuration = new Configuration(
- log,
- BrokerPathConfigHelper.getBrokerConfigPath(),
- this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
+ log,
+ BrokerPathConfigHelper.getBrokerConfigPath(),
+ this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);
}
@@ -297,8 +297,8 @@ public class BrokerController {
if (result) {
try {
this.messageStore =
- new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
- this.brokerConfig);
+ new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
+ this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog) ((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
@@ -322,77 +322,76 @@ public class BrokerController {
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getSendMessageThreadPoolNums(),
- this.brokerConfig.getSendMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.sendThreadPoolQueue,
- new ThreadFactoryImpl("SendMessageThread_"));
+ this.brokerConfig.getSendMessageThreadPoolNums(),
+ this.brokerConfig.getSendMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.sendThreadPoolQueue,
+ new ThreadFactoryImpl("SendMessageThread_"));
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getPullMessageThreadPoolNums(),
- this.brokerConfig.getPullMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.pullThreadPoolQueue,
- new ThreadFactoryImpl("PullMessageThread_"));
+ this.brokerConfig.getPullMessageThreadPoolNums(),
+ this.brokerConfig.getPullMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.pullThreadPoolQueue,
+ new ThreadFactoryImpl("PullMessageThread_"));
this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getAckMessageThreadPoolNums(),
- this.brokerConfig.getAckMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.ackThreadPoolQueue,
- new ThreadFactoryImpl("AckMessageThread_"));
-
+ this.brokerConfig.getAckMessageThreadPoolNums(),
+ this.brokerConfig.getAckMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.ackThreadPoolQueue,
+ new ThreadFactoryImpl("AckMessageThread_"));
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
- this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.replyThreadPoolQueue,
- new ThreadFactoryImpl("ProcessReplyMessageThread_"));
+ this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+ this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.replyThreadPoolQueue,
+ new ThreadFactoryImpl("ProcessReplyMessageThread_"));
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getQueryMessageThreadPoolNums(),
- this.brokerConfig.getQueryMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.queryThreadPoolQueue,
- new ThreadFactoryImpl("QueryMessageThread_"));
+ this.brokerConfig.getQueryMessageThreadPoolNums(),
+ this.brokerConfig.getQueryMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.queryThreadPoolQueue,
+ new ThreadFactoryImpl("QueryMessageThread_"));
this.adminBrokerExecutor =
- Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
- "AdminBrokerThread_"));
+ Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
+ "AdminBrokerThread_"));
this.clientManageExecutor = new ThreadPoolExecutor(
- this.brokerConfig.getClientManageThreadPoolNums(),
- this.brokerConfig.getClientManageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.clientManagerThreadPoolQueue,
- new ThreadFactoryImpl("ClientManageThread_"));
+ this.brokerConfig.getClientManageThreadPoolNums(),
+ this.brokerConfig.getClientManageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.clientManagerThreadPoolQueue,
+ new ThreadFactoryImpl("ClientManageThread_"));
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getHeartbeatThreadPoolNums(),
- this.brokerConfig.getHeartbeatThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.heartbeatThreadPoolQueue,
- new ThreadFactoryImpl("HeartbeatThread_", true));
+ this.brokerConfig.getHeartbeatThreadPoolNums(),
+ this.brokerConfig.getHeartbeatThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.heartbeatThreadPoolQueue,
+ new ThreadFactoryImpl("HeartbeatThread_", true));
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getEndTransactionThreadPoolNums(),
- this.brokerConfig.getEndTransactionThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.endTransactionThreadPoolQueue,
- new ThreadFactoryImpl("EndTransactionThread_"));
+ this.brokerConfig.getEndTransactionThreadPoolNums(),
+ this.brokerConfig.getEndTransactionThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.endTransactionThreadPoolQueue,
+ new ThreadFactoryImpl("EndTransactionThread_"));
this.consumerManageExecutor =
- Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
- "ConsumerManageThread_"));
+ Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
+ "ConsumerManageThread_"));
this.registerProcessor();
@@ -466,8 +465,8 @@ public class BrokerController {
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
this.loadBalanceExecutor =
- Executors.newFixedThreadPool(this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), new ThreadFactoryImpl(
- "LoadBalanceProcessorThread_"));
+ Executors.newFixedThreadPool(this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), new ThreadFactoryImpl(
+ "LoadBalanceProcessorThread_"));
if (this.brokerConfig.getNamesrvAddr() != null) {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
@@ -494,7 +493,6 @@ public class BrokerController {
}
}, 1, 5, TimeUnit.SECONDS);
-
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
@@ -523,38 +521,38 @@ public class BrokerController {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
- new String[]{
- TlsSystemConfig.tlsServerCertPath,
- TlsSystemConfig.tlsServerKeyPath,
- TlsSystemConfig.tlsServerTrustCertPath
- },
- new FileWatchService.Listener() {
- boolean certChanged, keyChanged = false;
-
- @Override
- public void onChanged(String path) {
- if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
- log.info("The trust certificate changed, reload the ssl context");
- reloadServerSslContext();
- }
- if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
- certChanged = true;
- }
- if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
- keyChanged = true;
- }
- if (certChanged && keyChanged) {
- log.info("The certificate and private key changed, reload the ssl context");
- certChanged = keyChanged = false;
- reloadServerSslContext();
- }
+ new String[] {
+ TlsSystemConfig.tlsServerCertPath,
+ TlsSystemConfig.tlsServerKeyPath,
+ TlsSystemConfig.tlsServerTrustCertPath
+ },
+ new FileWatchService.Listener() {
+ boolean certChanged, keyChanged = false;
+
+ @Override
+ public void onChanged(String path) {
+ if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
+ log.info("The trust certificate changed, reload the ssl context");
+ reloadServerSslContext();
}
-
- private void reloadServerSslContext() {
- ((NettyRemotingServer) remotingServer).loadSslContext();
- ((NettyRemotingServer) fastRemotingServer).loadSslContext();
+ if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
+ certChanged = true;
+ }
+ if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
+ keyChanged = true;
+ }
+ if (certChanged && keyChanged) {
+ log.info("The certificate and private key changed, reload the ssl context");
+ certChanged = keyChanged = false;
+ reloadServerSslContext();
}
- });
+ }
+
+ private void reloadServerSslContext() {
+ ((NettyRemotingServer) remotingServer).loadSslContext();
+ ((NettyRemotingServer) fastRemotingServer).loadSslContext();
+ }
+ });
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
@@ -611,7 +609,6 @@ public class BrokerController {
}
}
-
private void initialRpcHooks() {
List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
@@ -623,7 +620,6 @@ public class BrokerController {
}
}
-
public String getBrokerAddrByName(String brokerName) {
return this.brokerName2AddrMap.get(brokerName);
}
@@ -962,10 +958,10 @@ public class BrokerController {
private void unregisterBrokerAll() {
this.brokerOuterAPI.unregisterBrokerAll(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId());
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId());
}
public String getBrokerAddr() {
@@ -1048,7 +1044,6 @@ public class BrokerController {
this.brokerFastFailure.start();
}
-
}
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
@@ -1064,30 +1059,30 @@ public class BrokerController {
topicConfigSerializeWrapper.setDataVersion(dataVersion);
ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigList.stream()
- .map(topicConfig -> {
- TopicConfig registerTopicConfig;
- if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
- || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
- registerTopicConfig =
- new TopicConfig(topicConfig.getTopicName(),
- topicConfig.getReadQueueNums(),
- topicConfig.getWriteQueueNums(),
- this.brokerConfig.getBrokerPermission());
- } else {
- registerTopicConfig = new TopicConfig(topicConfig);
- }
- return registerTopicConfig;
- })
- .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
+ .map(topicConfig -> {
+ TopicConfig registerTopicConfig;
+ if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+ || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+ registerTopicConfig =
+ new TopicConfig(topicConfig.getTopicName(),
+ topicConfig.getReadQueueNums(),
+ topicConfig.getWriteQueueNums(),
+ this.brokerConfig.getBrokerPermission());
+ } else {
+ registerTopicConfig = new TopicConfig(topicConfig);
+ }
+ return registerTopicConfig;
+ })
+ .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream()
- .map(TopicConfig::getTopicName)
- .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
- .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info)))
- .orElse(null))
- .filter(Objects::nonNull)
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ .map(TopicConfig::getTopicName)
+ .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
+ .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info)))
+ .orElse(null))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!topicQueueMappingInfoMap.isEmpty()) {
topicConfigSerializeWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
}
@@ -1107,39 +1102,39 @@ public class BrokerController {
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
- || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+ || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
- new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
- this.brokerConfig.getBrokerPermission());
+ new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+ this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
- this.brokerConfig.getRegisterBrokerTimeoutMills())) {
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
- TopicConfigAndMappingSerializeWrapper topicConfigWrapper) {
+ TopicConfigAndMappingSerializeWrapper topicConfigWrapper) {
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
- this.getHAServerAddr(),
- topicConfigWrapper,
- this.filterServerManager.buildNewFilterServerList(),
- oneway,
- this.brokerConfig.getRegisterBrokerTimeoutMills(),
- this.brokerConfig.isCompressedRegister());
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.getHAServerAddr(),
+ topicConfigWrapper,
+ this.filterServerManager.buildNewFilterServerList(),
+ oneway,
+ this.brokerConfig.getRegisterBrokerTimeoutMills(),
+ this.brokerConfig.isCompressedRegister());
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
@@ -1158,10 +1153,10 @@ public class BrokerController {
}
private boolean needRegister(final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final int timeoutMills) {
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final int timeoutMills) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
@@ -1279,7 +1274,7 @@ public class BrokerController {
}
public void setTransactionalMessageCheckService(
- TransactionalMessageCheckService transactionalMessageCheckService) {
+ TransactionalMessageCheckService transactionalMessageCheckService) {
this.transactionalMessageCheckService = transactionalMessageCheckService;
}
@@ -1296,11 +1291,10 @@ public class BrokerController {
}
public void setTransactionalMessageCheckListener(
- AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
+ AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}
-
public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;
@@ -1367,7 +1361,6 @@ public class BrokerController {
log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}
-
public void changeToMaster(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
return;