You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/15 06:59:58 UTC

[rocketmq] 02/02: Fix check style

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 1c69b64806001b97a3a279dfaeb730ada8c6aa05
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Dec 15 14:59:19 2021 +0800

    Fix check style
---
 .../apache/rocketmq/broker/BrokerController.java   | 295 ++++++++++-----------
 1 file changed, 144 insertions(+), 151 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index d0d319d..76c8007 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -157,7 +157,7 @@ public class BrokerController {
     private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
     private final BrokerOuterAPI brokerOuterAPI;
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
-            "BrokerControllerScheduledThread"));
+        "BrokerControllerScheduledThread"));
     private final SlaveSynchronize slaveSynchronize;
     private final BlockingQueue<Runnable> sendThreadPoolQueue;
     private final BlockingQueue<Runnable> ackThreadPoolQueue;
@@ -204,10 +204,10 @@ public class BrokerController {
     private long shouldStartTime;
 
     public BrokerController(
-            final BrokerConfig brokerConfig,
-            final NettyServerConfig nettyServerConfig,
-            final NettyClientConfig nettyClientConfig,
-            final MessageStoreConfig messageStoreConfig
+        final BrokerConfig brokerConfig,
+        final NettyServerConfig nettyServerConfig,
+        final NettyClientConfig nettyClientConfig,
+        final MessageStoreConfig messageStoreConfig
     ) {
         this.brokerConfig = brokerConfig;
         this.nettyServerConfig = nettyServerConfig;
@@ -223,7 +223,7 @@ public class BrokerController {
         this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this);
         this.sendMessageProcessor = new SendMessageProcessor(this);
         this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService,
-                this.popMessageProcessor);
+            this.popMessageProcessor);
         this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
         this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
         this.consumerFilterManager = new ConsumerFilterManager(this);
@@ -255,9 +255,9 @@ public class BrokerController {
 
         this.brokerFastFailure = new BrokerFastFailure(this);
         this.configuration = new Configuration(
-                log,
-                BrokerPathConfigHelper.getBrokerConfigPath(),
-                this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
+            log,
+            BrokerPathConfigHelper.getBrokerConfigPath(),
+            this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
         );
     }
 
@@ -297,8 +297,8 @@ public class BrokerController {
         if (result) {
             try {
                 this.messageStore =
-                        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
-                                this.brokerConfig);
+                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
+                        this.brokerConfig);
                 if (messageStoreConfig.isEnableDLegerCommitLog()) {
                     DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                     ((DLedgerCommitLog) ((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
@@ -322,77 +322,76 @@ public class BrokerController {
             fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
             this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
             this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                    this.brokerConfig.getSendMessageThreadPoolNums(),
-                    this.brokerConfig.getSendMessageThreadPoolNums(),
-                    1000 * 60,
-                    TimeUnit.MILLISECONDS,
-                    this.sendThreadPoolQueue,
-                    new ThreadFactoryImpl("SendMessageThread_"));
+                this.brokerConfig.getSendMessageThreadPoolNums(),
+                this.brokerConfig.getSendMessageThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.sendThreadPoolQueue,
+                new ThreadFactoryImpl("SendMessageThread_"));
 
             this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                    this.brokerConfig.getPullMessageThreadPoolNums(),
-                    this.brokerConfig.getPullMessageThreadPoolNums(),
-                    1000 * 60,
-                    TimeUnit.MILLISECONDS,
-                    this.pullThreadPoolQueue,
-                    new ThreadFactoryImpl("PullMessageThread_"));
+                this.brokerConfig.getPullMessageThreadPoolNums(),
+                this.brokerConfig.getPullMessageThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.pullThreadPoolQueue,
+                new ThreadFactoryImpl("PullMessageThread_"));
 
             this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                    this.brokerConfig.getAckMessageThreadPoolNums(),
-                    this.brokerConfig.getAckMessageThreadPoolNums(),
-                    1000 * 60,
-                    TimeUnit.MILLISECONDS,
-                    this.ackThreadPoolQueue,
-                    new ThreadFactoryImpl("AckMessageThread_"));
-
+                this.brokerConfig.getAckMessageThreadPoolNums(),
+                this.brokerConfig.getAckMessageThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.ackThreadPoolQueue,
+                new ThreadFactoryImpl("AckMessageThread_"));
 
             this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                    this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
-                    this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
-                    1000 * 60,
-                    TimeUnit.MILLISECONDS,
-                    this.replyThreadPoolQueue,
-                    new ThreadFactoryImpl("ProcessReplyMessageThread_"));
+                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.replyThreadPoolQueue,
+                new ThreadFactoryImpl("ProcessReplyMessageThread_"));
 
             this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                    this.brokerConfig.getQueryMessageThreadPoolNums(),
-                    this.brokerConfig.getQueryMessageThreadPoolNums(),
-                    1000 * 60,
-                    TimeUnit.MILLISECONDS,
-                    this.queryThreadPoolQueue,
-                    new ThreadFactoryImpl("QueryMessageThread_"));
+                this.brokerConfig.getQueryMessageThreadPoolNums(),
+                this.brokerConfig.getQueryMessageThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.queryThreadPoolQueue,
+                new ThreadFactoryImpl("QueryMessageThread_"));
 
             this.adminBrokerExecutor =
-                    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
-                            "AdminBrokerThread_"));
+                Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
+                    "AdminBrokerThread_"));
 
             this.clientManageExecutor = new ThreadPoolExecutor(
-                    this.brokerConfig.getClientManageThreadPoolNums(),
-                    this.brokerConfig.getClientManageThreadPoolNums(),
-                    1000 * 60,
-                    TimeUnit.MILLISECONDS,
-                    this.clientManagerThreadPoolQueue,
-                    new ThreadFactoryImpl("ClientManageThread_"));
+                this.brokerConfig.getClientManageThreadPoolNums(),
+                this.brokerConfig.getClientManageThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.clientManagerThreadPoolQueue,
+                new ThreadFactoryImpl("ClientManageThread_"));
 
             this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
-                    this.brokerConfig.getHeartbeatThreadPoolNums(),
-                    this.brokerConfig.getHeartbeatThreadPoolNums(),
-                    1000 * 60,
-                    TimeUnit.MILLISECONDS,
-                    this.heartbeatThreadPoolQueue,
-                    new ThreadFactoryImpl("HeartbeatThread_", true));
+                this.brokerConfig.getHeartbeatThreadPoolNums(),
+                this.brokerConfig.getHeartbeatThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.heartbeatThreadPoolQueue,
+                new ThreadFactoryImpl("HeartbeatThread_", true));
 
             this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
-                    this.brokerConfig.getEndTransactionThreadPoolNums(),
-                    this.brokerConfig.getEndTransactionThreadPoolNums(),
-                    1000 * 60,
-                    TimeUnit.MILLISECONDS,
-                    this.endTransactionThreadPoolQueue,
-                    new ThreadFactoryImpl("EndTransactionThread_"));
+                this.brokerConfig.getEndTransactionThreadPoolNums(),
+                this.brokerConfig.getEndTransactionThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.endTransactionThreadPoolQueue,
+                new ThreadFactoryImpl("EndTransactionThread_"));
 
             this.consumerManageExecutor =
-                    Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
-                            "ConsumerManageThread_"));
+                Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
+                    "ConsumerManageThread_"));
 
             this.registerProcessor();
 
@@ -466,8 +465,8 @@ public class BrokerController {
             }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
 
             this.loadBalanceExecutor =
-                    Executors.newFixedThreadPool(this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), new ThreadFactoryImpl(
-                            "LoadBalanceProcessorThread_"));
+                Executors.newFixedThreadPool(this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), new ThreadFactoryImpl(
+                    "LoadBalanceProcessorThread_"));
 
             if (this.brokerConfig.getNamesrvAddr() != null) {
                 this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
@@ -494,7 +493,6 @@ public class BrokerController {
                 }
             }, 1, 5, TimeUnit.SECONDS);
 
-
             if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                     if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
@@ -523,38 +521,38 @@ public class BrokerController {
                 // Register a listener to reload SslContext
                 try {
                     fileWatchService = new FileWatchService(
-                            new String[]{
-                                TlsSystemConfig.tlsServerCertPath,
-                                TlsSystemConfig.tlsServerKeyPath,
-                                TlsSystemConfig.tlsServerTrustCertPath
-                            },
-                            new FileWatchService.Listener() {
-                                boolean certChanged, keyChanged = false;
-
-                                @Override
-                                public void onChanged(String path) {
-                                    if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
-                                        log.info("The trust certificate changed, reload the ssl context");
-                                        reloadServerSslContext();
-                                    }
-                                    if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
-                                        certChanged = true;
-                                    }
-                                    if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
-                                        keyChanged = true;
-                                    }
-                                    if (certChanged && keyChanged) {
-                                        log.info("The certificate and private key changed, reload the ssl context");
-                                        certChanged = keyChanged = false;
-                                        reloadServerSslContext();
-                                    }
+                        new String[] {
+                            TlsSystemConfig.tlsServerCertPath,
+                            TlsSystemConfig.tlsServerKeyPath,
+                            TlsSystemConfig.tlsServerTrustCertPath
+                        },
+                        new FileWatchService.Listener() {
+                            boolean certChanged, keyChanged = false;
+
+                            @Override
+                            public void onChanged(String path) {
+                                if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
+                                    log.info("The trust certificate changed, reload the ssl context");
+                                    reloadServerSslContext();
                                 }
-
-                                private void reloadServerSslContext() {
-                                    ((NettyRemotingServer) remotingServer).loadSslContext();
-                                    ((NettyRemotingServer) fastRemotingServer).loadSslContext();
+                                if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
+                                    certChanged = true;
+                                }
+                                if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
+                                    keyChanged = true;
+                                }
+                                if (certChanged && keyChanged) {
+                                    log.info("The certificate and private key changed, reload the ssl context");
+                                    certChanged = keyChanged = false;
+                                    reloadServerSslContext();
                                 }
-                            });
+                            }
+
+                            private void reloadServerSslContext() {
+                                ((NettyRemotingServer) remotingServer).loadSslContext();
+                                ((NettyRemotingServer) fastRemotingServer).loadSslContext();
+                            }
+                        });
                 } catch (Exception e) {
                     log.warn("FileWatchService created error, can't load the certificate dynamically");
                 }
@@ -611,7 +609,6 @@ public class BrokerController {
         }
     }
 
-
     private void initialRpcHooks() {
 
         List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
@@ -623,7 +620,6 @@ public class BrokerController {
         }
     }
 
-
     public String getBrokerAddrByName(String brokerName) {
         return this.brokerName2AddrMap.get(brokerName);
     }
@@ -962,10 +958,10 @@ public class BrokerController {
 
     private void unregisterBrokerAll() {
         this.brokerOuterAPI.unregisterBrokerAll(
-                this.brokerConfig.getBrokerClusterName(),
-                this.getBrokerAddr(),
-                this.brokerConfig.getBrokerName(),
-                this.brokerConfig.getBrokerId());
+            this.brokerConfig.getBrokerClusterName(),
+            this.getBrokerAddr(),
+            this.brokerConfig.getBrokerName(),
+            this.brokerConfig.getBrokerId());
     }
 
     public String getBrokerAddr() {
@@ -1048,7 +1044,6 @@ public class BrokerController {
             this.brokerFastFailure.start();
         }
 
-
     }
 
     public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
@@ -1064,30 +1059,30 @@ public class BrokerController {
         topicConfigSerializeWrapper.setDataVersion(dataVersion);
 
         ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigList.stream()
-                .map(topicConfig -> {
-                    TopicConfig registerTopicConfig;
-                    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
-                            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
-                        registerTopicConfig =
-                                new TopicConfig(topicConfig.getTopicName(),
-                                        topicConfig.getReadQueueNums(),
-                                        topicConfig.getWriteQueueNums(),
-                                        this.brokerConfig.getBrokerPermission());
-                    } else {
-                        registerTopicConfig = new TopicConfig(topicConfig);
-                    }
-                    return registerTopicConfig;
-                })
-                .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
+            .map(topicConfig -> {
+                TopicConfig registerTopicConfig;
+                if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+                    || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+                    registerTopicConfig =
+                        new TopicConfig(topicConfig.getTopicName(),
+                            topicConfig.getReadQueueNums(),
+                            topicConfig.getWriteQueueNums(),
+                            this.brokerConfig.getBrokerPermission());
+                } else {
+                    registerTopicConfig = new TopicConfig(topicConfig);
+                }
+                return registerTopicConfig;
+            })
+            .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
         topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
 
         Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream()
-                .map(TopicConfig::getTopicName)
-                .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
-                        .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info)))
-                        .orElse(null))
-                .filter(Objects::nonNull)
-                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+            .map(TopicConfig::getTopicName)
+            .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
+                .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info)))
+                .orElse(null))
+            .filter(Objects::nonNull)
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
         if (!topicQueueMappingInfoMap.isEmpty()) {
             topicConfigSerializeWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
         }
@@ -1107,39 +1102,39 @@ public class BrokerController {
         ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
 
         if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
-                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
             ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
             for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                 TopicConfig tmp =
-                        new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
-                                this.brokerConfig.getBrokerPermission());
+                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+                        this.brokerConfig.getBrokerPermission());
                 topicConfigTable.put(topicConfig.getTopicName(), tmp);
             }
             topicConfigWrapper.setTopicConfigTable(topicConfigTable);
         }
 
         if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
-                this.getBrokerAddr(),
-                this.brokerConfig.getBrokerName(),
-                this.brokerConfig.getBrokerId(),
-                this.brokerConfig.getRegisterBrokerTimeoutMills())) {
+            this.getBrokerAddr(),
+            this.brokerConfig.getBrokerName(),
+            this.brokerConfig.getBrokerId(),
+            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
             doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
         }
     }
 
     private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
-                                     TopicConfigAndMappingSerializeWrapper topicConfigWrapper) {
+        TopicConfigAndMappingSerializeWrapper topicConfigWrapper) {
         List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
-                this.brokerConfig.getBrokerClusterName(),
-                this.getBrokerAddr(),
-                this.brokerConfig.getBrokerName(),
-                this.brokerConfig.getBrokerId(),
-                this.getHAServerAddr(),
-                topicConfigWrapper,
-                this.filterServerManager.buildNewFilterServerList(),
-                oneway,
-                this.brokerConfig.getRegisterBrokerTimeoutMills(),
-                this.brokerConfig.isCompressedRegister());
+            this.brokerConfig.getBrokerClusterName(),
+            this.getBrokerAddr(),
+            this.brokerConfig.getBrokerName(),
+            this.brokerConfig.getBrokerId(),
+            this.getHAServerAddr(),
+            topicConfigWrapper,
+            this.filterServerManager.buildNewFilterServerList(),
+            oneway,
+            this.brokerConfig.getRegisterBrokerTimeoutMills(),
+            this.brokerConfig.isCompressedRegister());
 
         if (registerBrokerResultList.size() > 0) {
             RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
@@ -1158,10 +1153,10 @@ public class BrokerController {
     }
 
     private boolean needRegister(final String clusterName,
-                                 final String brokerAddr,
-                                 final String brokerName,
-                                 final long brokerId,
-                                 final int timeoutMills) {
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId,
+        final int timeoutMills) {
 
         TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
         List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
@@ -1279,7 +1274,7 @@ public class BrokerController {
     }
 
     public void setTransactionalMessageCheckService(
-            TransactionalMessageCheckService transactionalMessageCheckService) {
+        TransactionalMessageCheckService transactionalMessageCheckService) {
         this.transactionalMessageCheckService = transactionalMessageCheckService;
     }
 
@@ -1296,11 +1291,10 @@ public class BrokerController {
     }
 
     public void setTransactionalMessageCheckListener(
-            AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
+        AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
         this.transactionalMessageCheckListener = transactionalMessageCheckListener;
     }
 
-
     public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
         return endTransactionThreadPoolQueue;
 
@@ -1367,7 +1361,6 @@ public class BrokerController {
         log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
     }
 
-
     public void changeToMaster(BrokerRole role) {
         if (role == BrokerRole.SLAVE) {
             return;