You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/04/21 02:48:21 UTC
[rocketmq] branch develop updated: [ISSUE #4127] [BrokerOuterAPI] Anonymous new Runnable() can be replaced with lambda
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 519c0b8a4 [ISSUE #4127] [BrokerOuterAPI] Anonymous new Runnable() can be replaced with lambda
519c0b8a4 is described below
commit 519c0b8a46eac983ccb6848b0f0b5fbd2cf7db15
Author: 李晓双 Li Xiao Shuang <64...@qq.com>
AuthorDate: Thu Apr 21 10:48:07 2022 +0800
[ISSUE #4127] [BrokerOuterAPI] Anonymous new Runnable() can be replaced with lambda
---
.../apache/rocketmq/broker/BrokerController.java | 2 +-
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 96 ++++++++++------------
2 files changed, 46 insertions(+), 52 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 772eec6dc..a29552056 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -928,7 +928,7 @@ public class BrokerController {
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
- ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
+ ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index de7f3fce8..c5b53a777 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -63,7 +63,7 @@ public class BrokerOuterAPI {
private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
private String nameSrvAddr = null;
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
- new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
+ new ArrayBlockingQueue<>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
@@ -142,21 +142,18 @@ public class BrokerOuterAPI {
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
- brokerOuterExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
- if (result != null) {
- registerBrokerResultList.add(result);
- }
-
- log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
- } catch (Exception e) {
- log.warn("registerBroker Exception, {}", namesrvAddr, e);
- } finally {
- countDownLatch.countDown();
+ brokerOuterExecutor.execute(() -> {
+ try {
+ RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
+ if (result != null) {
+ registerBrokerResultList.add(result);
}
+
+ log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
+ } catch (Exception e) {
+ log.warn("registerBroker Exception, {}", namesrvAddr, e);
+ } finally {
+ countDownLatch.countDown();
}
});
}
@@ -269,46 +266,43 @@ public class BrokerOuterAPI {
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
- brokerOuterExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
- requestHeader.setBrokerAddr(brokerAddr);
- requestHeader.setBrokerId(brokerId);
- requestHeader.setBrokerName(brokerName);
- requestHeader.setClusterName(clusterName);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
- request.setBody(topicConfigWrapper.getDataVersion().encode());
- RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
- DataVersion nameServerDataVersion = null;
- Boolean changed = false;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- QueryDataVersionResponseHeader queryDataVersionResponseHeader =
- (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
- changed = queryDataVersionResponseHeader.getChanged();
- byte[] body = response.getBody();
- if (body != null) {
- nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
- if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
- changed = true;
- }
- }
- if (changed == null || changed) {
- changedList.add(Boolean.TRUE);
+ brokerOuterExecutor.execute(() -> {
+ try {
+ QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
+ requestHeader.setBrokerAddr(brokerAddr);
+ requestHeader.setBrokerId(brokerId);
+ requestHeader.setBrokerName(brokerName);
+ requestHeader.setClusterName(clusterName);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
+ request.setBody(topicConfigWrapper.getDataVersion().encode());
+ RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
+ DataVersion nameServerDataVersion = null;
+ Boolean changed = false;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ QueryDataVersionResponseHeader queryDataVersionResponseHeader =
+ (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
+ changed = queryDataVersionResponseHeader.getChanged();
+ byte[] body = response.getBody();
+ if (body != null) {
+ nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
+ if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
+ changed = true;
}
}
- default:
- break;
+ if (changed == null || changed) {
+ changedList.add(Boolean.TRUE);
+ }
}
- log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
- } catch (Exception e) {
- changedList.add(Boolean.TRUE);
- log.error("Query data version from name server {} Exception, {}", namesrvAddr, e);
- } finally {
- countDownLatch.countDown();
+ default:
+ break;
}
+ log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
+ } catch (Exception e) {
+ changedList.add(Boolean.TRUE);
+ log.error("Query data version from name server {} Exception, {}", namesrvAddr, e);
+ } finally {
+ countDownLatch.countDown();
}
});