You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/04/21 02:48:21 UTC

[rocketmq] branch develop updated: [ISSUE #4127] [BrokerOuterAPI] Anonymous new Runnable() can be replaced with lambda

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 519c0b8a4 [ISSUE #4127] [BrokerOuterAPI] Anonymous new Runnable() can be replaced with lambda
519c0b8a4 is described below

commit 519c0b8a46eac983ccb6848b0f0b5fbd2cf7db15
Author: 李晓双 Li Xiao Shuang <64...@qq.com>
AuthorDate: Thu Apr 21 10:48:07 2022 +0800

    [ISSUE #4127] [BrokerOuterAPI] Anonymous new Runnable() can be replaced with lambda
---
 .../apache/rocketmq/broker/BrokerController.java   |  2 +-
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 96 ++++++++++------------
 2 files changed, 46 insertions(+), 52 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 772eec6dc..a29552056 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -928,7 +928,7 @@ public class BrokerController {
 
         if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
             || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
-            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
+            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
             for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                 TopicConfig tmp =
                     new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index de7f3fce8..c5b53a777 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -63,7 +63,7 @@ public class BrokerOuterAPI {
     private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
     private String nameSrvAddr = null;
     private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
-        new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
+        new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
 
     public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
         this(nettyClientConfig, null);
@@ -142,21 +142,18 @@ public class BrokerOuterAPI {
             requestHeader.setBodyCrc32(bodyCrc32);
             final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
             for (final String namesrvAddr : nameServerAddressList) {
-                brokerOuterExecutor.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
-                            if (result != null) {
-                                registerBrokerResultList.add(result);
-                            }
-
-                            log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
-                        } catch (Exception e) {
-                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
-                        } finally {
-                            countDownLatch.countDown();
+                brokerOuterExecutor.execute(() -> {
+                    try {
+                        RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
+                        if (result != null) {
+                            registerBrokerResultList.add(result);
                         }
+
+                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
+                    } catch (Exception e) {
+                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
+                    } finally {
+                        countDownLatch.countDown();
                     }
                 });
             }
@@ -269,46 +266,43 @@ public class BrokerOuterAPI {
         if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
             final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
             for (final String namesrvAddr : nameServerAddressList) {
-                brokerOuterExecutor.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
-                            requestHeader.setBrokerAddr(brokerAddr);
-                            requestHeader.setBrokerId(brokerId);
-                            requestHeader.setBrokerName(brokerName);
-                            requestHeader.setClusterName(clusterName);
-                            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
-                            request.setBody(topicConfigWrapper.getDataVersion().encode());
-                            RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
-                            DataVersion nameServerDataVersion = null;
-                            Boolean changed = false;
-                            switch (response.getCode()) {
-                                case ResponseCode.SUCCESS: {
-                                    QueryDataVersionResponseHeader queryDataVersionResponseHeader =
-                                        (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
-                                    changed = queryDataVersionResponseHeader.getChanged();
-                                    byte[] body = response.getBody();
-                                    if (body != null) {
-                                        nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
-                                        if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
-                                            changed = true;
-                                        }
-                                    }
-                                    if (changed == null || changed) {
-                                        changedList.add(Boolean.TRUE);
+                brokerOuterExecutor.execute(() -> {
+                    try {
+                        QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
+                        requestHeader.setBrokerAddr(brokerAddr);
+                        requestHeader.setBrokerId(brokerId);
+                        requestHeader.setBrokerName(brokerName);
+                        requestHeader.setClusterName(clusterName);
+                        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
+                        request.setBody(topicConfigWrapper.getDataVersion().encode());
+                        RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
+                        DataVersion nameServerDataVersion = null;
+                        Boolean changed = false;
+                        switch (response.getCode()) {
+                            case ResponseCode.SUCCESS: {
+                                QueryDataVersionResponseHeader queryDataVersionResponseHeader =
+                                    (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
+                                changed = queryDataVersionResponseHeader.getChanged();
+                                byte[] body = response.getBody();
+                                if (body != null) {
+                                    nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
+                                    if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
+                                        changed = true;
                                     }
                                 }
-                                default:
-                                    break;
+                                if (changed == null || changed) {
+                                    changedList.add(Boolean.TRUE);
+                                }
                             }
-                            log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
-                        } catch (Exception e) {
-                            changedList.add(Boolean.TRUE);
-                            log.error("Query data version from name server {}  Exception, {}", namesrvAddr, e);
-                        } finally {
-                            countDownLatch.countDown();
+                            default:
+                                break;
                         }
+                        log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
+                    } catch (Exception e) {
+                        changedList.add(Boolean.TRUE);
+                        log.error("Query data version from name server {}  Exception, {}", namesrvAddr, e);
+                    } finally {
+                        countDownLatch.countDown();
                     }
                 });