You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2018/03/07 06:38:56 UTC
[rocketmq] branch develop updated: [ROCKETMQ-319] Improve broker
register performance and reduce memory usage (#205)
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e744222 [ROCKETMQ-319] Improve broker register performance and reduce memory usage (#205)
e744222 is described below
commit e74422252719cfae89e13b0231f52f1a6538db48
Author: fuyou001 <fu...@gmail.com>
AuthorDate: Wed Mar 7 14:38:54 2018 +0800
[ROCKETMQ-319] Improve broker register performance and reduce memory usage (#205)
---
.../apache/rocketmq/broker/BrokerController.java | 62 +++++--
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 127 ++++++++++++--
.../broker/processor/AdminBrokerProcessor.java | 4 +-
.../rocketmq/broker/topic/TopicConfigManager.java | 8 +-
.../apache/rocketmq/broker/BrokerOuterAPITest.java | 191 +++++++++++++++++++++
.../org/apache/rocketmq/common/BrokerConfig.java | 25 ++-
.../org/apache/rocketmq/common/DataVersion.java | 9 +
.../apache/rocketmq/common/ThreadFactoryImpl.java | 11 +-
.../rocketmq/common/protocol/RequestCode.java | 2 +
.../common/protocol/body/RegisterBrokerBody.java | 153 +++++++++++++++++
...der.java => QueryDataVersionRequestHeader.java} | 16 +-
.../namesrv/QueryDataVersionResponseHeader.java} | 34 ++--
.../namesrv/RegisterBrokerRequestHeader.java | 11 +-
.../rocketmq/common/RegisterBrokerBodyTest.java | 51 ++++++
.../namesrv/processor/DefaultRequestProcessor.java | 35 +++-
.../namesrv/routeinfo/RouteInfoManager.java | 19 +-
16 files changed, 682 insertions(+), 76 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 409b1d0..60f287a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -729,14 +729,14 @@ public class BrokerController {
this.filterServerManager.start();
}
- this.registerBrokerAll(true, false);
+ this.registerBrokerAll(true, false, true);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
- BrokerController.this.registerBrokerAll(true, false);
+ BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
@@ -752,7 +752,7 @@ public class BrokerController {
}
}
- public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
+ public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
@@ -767,28 +767,56 @@ public class BrokerController {
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
- RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
- this.brokerConfig.getBrokerClusterName(),
+ if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
- this.getHAServerAddr(),
- topicConfigWrapper,
- this.filterServerManager.buildNewFilterServerList(),
- oneway,
- this.brokerConfig.getRegisterBrokerTimeoutMills());
-
- if (registerBrokerResult != null) {
- if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
- this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+ this.brokerConfig.getRegisterBrokerTimeoutMills())) {
+ List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.getHAServerAddr(),
+ topicConfigWrapper,
+ this.filterServerManager.buildNewFilterServerList(),
+ oneway,
+ this.brokerConfig.getRegisterBrokerTimeoutMills(),
+ this.brokerConfig.isCompressedRegister());
+
+ if (registerBrokerResultList.size() > 0) {
+ RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
+ if (registerBrokerResult != null) {
+ if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
+ this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+ }
+
+ this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+
+ if (checkOrderConfig) {
+ this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
+ }
+ }
}
+ }
+ }
- this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+ private boolean needRegister(final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final int timeoutMills) {
- if (checkOrderConfig) {
- this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
+ TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
+ List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
+ boolean needRegister = false;
+ for (Boolean changed : changeList) {
+ if (changed) {
+ needRegister = true;
+ break;
}
}
+ return needRegister;
}
public TopicConfigManager getTopicConfigManager() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 87c00a3..262e2d2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -16,11 +16,19 @@
*/
package org.apache.rocketmq.broker.out;
+import com.google.common.collect.Lists;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -33,6 +41,8 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
@@ -52,6 +62,8 @@ public class BrokerOuterAPI {
private final RemotingClient remotingClient;
private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
private String nameSrvAddr = null;
+ private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
+ new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
@@ -97,7 +109,7 @@ public class BrokerOuterAPI {
this.remotingClient.updateNameServerAddressList(lst);
}
- public RegisterBrokerResult registerBrokerAll(
+ public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
@@ -106,27 +118,41 @@ public class BrokerOuterAPI {
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
- final int timeoutMills) {
- RegisterBrokerResult registerBrokerResult = null;
+ final int timeoutMills,
+ final boolean compressed) {
+ final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
- if (nameServerAddressList != null) {
- for (String namesrvAddr : nameServerAddressList) {
- try {
- RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
- haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
- if (result != null) {
- registerBrokerResult = result;
+ if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
+ final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
+ for (final String namesrvAddr : nameServerAddressList) {
+ brokerOuterExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ RegisterBrokerResult result = registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
+ haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills, compressed);
+ if (result != null) {
+ registerBrokerResultList.add(result);
+ }
+
+ log.info("register broker to name server {} OK", namesrvAddr);
+ } catch (Exception e) {
+ log.warn("registerBroker Exception, {}", namesrvAddr, e);
+ } finally {
+ countDownLatch.countDown();
+ }
}
+ });
+ }
- log.info("register broker to name server {} OK", namesrvAddr);
- } catch (Exception e) {
- log.warn("registerBroker Exception, {}", namesrvAddr, e);
- }
+ try {
+ countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
}
}
- return registerBrokerResult;
+ return registerBrokerResultList;
}
private RegisterBrokerResult registerBroker(
@@ -139,7 +165,8 @@ public class BrokerOuterAPI {
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
- final int timeoutMills
+ final int timeoutMills,
+ final boolean compressed
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
@@ -148,12 +175,13 @@ public class BrokerOuterAPI {
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
+ requestHeader.setCompressed(compressed);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
- request.setBody(requestBody.encode());
+ request.setBody(requestBody.encode(requestHeader.isCompressed()));
if (oneway) {
try {
@@ -231,6 +259,71 @@ public class BrokerOuterAPI {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
+ public List<Boolean> needRegister(
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final int timeoutMills) {
+ final List<Boolean> changedList = new CopyOnWriteArrayList<>();
+ List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
+ if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
+ final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
+ for (final String namesrvAddr : nameServerAddressList) {
+ brokerOuterExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
+ requestHeader.setBrokerAddr(brokerAddr);
+ requestHeader.setBrokerId(brokerId);
+ requestHeader.setBrokerName(brokerName);
+ requestHeader.setClusterName(clusterName);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
+ request.setBody(topicConfigWrapper.getDataVersion().encode());
+ RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
+ DataVersion nameServerDataVersion = null;
+ Boolean changed = false;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ QueryDataVersionResponseHeader queryDataVersionResponseHeader =
+ (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
+ changed = queryDataVersionResponseHeader.getChanged();
+ byte[] body = response.getBody();
+ if (body != null) {
+ nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
+ if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
+ changed = true;
+ }
+ }
+ if (changed == null || changed) {
+ changedList.add(Boolean.TRUE);
+ }
+ }
+ default:
+ break;
+ }
+ log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
+ } catch (Exception e) {
+ changedList.add(Boolean.TRUE);
+ log.error("Query data version from name server {} Exception, {}", namesrvAddr, e);
+ } finally {
+ countDownLatch.countDown();
+ }
+ }
+ });
+
+ }
+ try {
+ countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.error("query dataversion from nameserver countDownLatch await Exception", e);
+ }
+ }
+ return changedList;
+ }
+
public TopicConfigSerializeWrapper getAllTopicConfig(
final String addr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index e8be2d4..a9e54aa 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -245,7 +245,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
- this.brokerController.registerBrokerAll(false, true);
+ this.brokerController.registerBrokerAll(false, true, true);
return null;
}
@@ -310,8 +310,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
this.brokerController.getConfiguration().update(properties);
if (properties.containsKey("brokerPermission")) {
- this.brokerController.registerBrokerAll(false, false);
this.brokerController.getTopicConfigManager().getDataVersion().nextVersion();
+ this.brokerController.registerBrokerAll(false, false, true);
}
} else {
log.error("string2Properties error");
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 29e2280..cdae66f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -210,7 +210,7 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
- this.brokerController.registerBrokerAll(false, true);
+ this.brokerController.registerBrokerAll(false, true,true);
}
return topicConfig;
@@ -254,7 +254,7 @@ public class TopicConfigManager extends ConfigManager {
}
if (createNew) {
- this.brokerController.registerBrokerAll(false, true);
+ this.brokerController.registerBrokerAll(false, true,true);
}
return topicConfig;
@@ -279,7 +279,7 @@ public class TopicConfigManager extends ConfigManager {
this.dataVersion.nextVersion();
this.persist();
- this.brokerController.registerBrokerAll(false, true);
+ this.brokerController.registerBrokerAll(false, true,true);
}
}
@@ -299,7 +299,7 @@ public class TopicConfigManager extends ConfigManager {
this.dataVersion.nextVersion();
this.persist();
- this.brokerController.registerBrokerAll(false, true);
+ this.brokerController.registerBrokerAll(false, true,true);
}
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
new file mode 100644
index 0000000..69e0dd3
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import org.mockito.Mock;
+import static org.mockito.Mockito.when;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerOuterAPITest {
+ @Mock
+ private ChannelHandlerContext handlerContext;
+ @Spy
+ private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+ @Mock
+ private MessageStore messageStore;
+ private String clusterName = "clusterName";
+ private String brokerName = "brokerName";
+ private String brokerAddr = "brokerAddr";
+ private long brokerId = 0L;
+ private String nameserver1 = "127.0.0.1";
+ private String nameserver2 = "127.0.0.2";
+ private String nameserver3 = "127.0.0.3";
+ private int timeOut = 3000;
+
+ @Mock
+ private NettyRemotingClient nettyRemotingClient;
+
+ private BrokerOuterAPI brokerOuterAPI;
+
+ public void init() throws Exception {
+ brokerOuterAPI = new BrokerOuterAPI(new NettyClientConfig(), null);
+ Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+ field.setAccessible(true);
+ field.set(brokerOuterAPI, nettyRemotingClient);
+ }
+
+ @Test
+ public void test_needRegister_normal() throws Exception {
+ init();
+ brokerOuterAPI.start();
+ final RemotingCommand response = buildResponse(Boolean.TRUE);
+
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+
+ when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
+ when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response);
+ List<Boolean> booleanList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut);
+ assertEquals(3, booleanList.size());
+ assertEquals(false, booleanList.contains(Boolean.FALSE));
+ }
+
+ @Test
+ public void test_needRegister_timeout() throws Exception {
+ init();
+ brokerOuterAPI.start();
+
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+
+ when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
+
+ when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock invocation) throws Throwable {
+ if (invocation.getArgument(0) == nameserver1) {
+ return buildResponse(Boolean.TRUE);
+ } else if (invocation.getArgument(0) == nameserver2) {
+ return buildResponse(Boolean.FALSE);
+ } else if (invocation.getArgument(0) == nameserver3) {
+ TimeUnit.MILLISECONDS.sleep(timeOut + 20);
+ return buildResponse(Boolean.TRUE);
+ }
+ return buildResponse(Boolean.TRUE);
+ }
+ });
+ List<Boolean> booleanList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut);
+ assertEquals(2, booleanList.size());
+ boolean success = Iterables.any(booleanList,
+ new Predicate<Boolean>() {
+ public boolean apply(Boolean input) {
+ return input ? true : false;
+ }
+ });
+
+ assertEquals(true, success);
+
+ }
+
+ @Test
+ public void test_register_normal() throws Exception {
+ init();
+ brokerOuterAPI.start();
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
+ final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+
+ when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
+ when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response);
+ List<RegisterBrokerResult> registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), false, timeOut, true);
+
+ assertEquals(3, registerBrokerResultList.size());
+ }
+
+ @Test
+ public void test_register_timeout() throws Exception {
+ init();
+ brokerOuterAPI.start();
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+
+ when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
+ when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenAnswer(new Answer<RemotingCommand>() {
+ @Override
+ public RemotingCommand answer(InvocationOnMock invocation) throws Throwable {
+ if (invocation.getArgument(0) == nameserver1) {
+ return response;
+ } else if (invocation.getArgument(0) == nameserver2) {
+ return response;
+ } else if (invocation.getArgument(0) == nameserver3) {
+ TimeUnit.MILLISECONDS.sleep(timeOut + 20);
+ return response;
+ }
+ return response;
+ }
+ });
+ List<RegisterBrokerResult> registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), false, timeOut, true);
+
+ assertEquals(2, registerBrokerResultList.size());
+ }
+
+ private RemotingCommand buildResponse(Boolean changed) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
+ final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ responseHeader.setChanged(changed);
+ return response;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 4d7eb46..4468b2d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.common;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
@@ -23,9 +25,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
public class BrokerConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
@@ -133,6 +132,10 @@ public class BrokerConfig {
private boolean filterSupportRetry = false;
private boolean enablePropertyFilter = false;
+ private boolean compressedRegister = false;
+
+ private boolean forceRegister = false;
+
public boolean isTraceOn() {
return traceOn;
}
@@ -598,4 +601,20 @@ public class BrokerConfig {
public void setEnablePropertyFilter(boolean enablePropertyFilter) {
this.enablePropertyFilter = enablePropertyFilter;
}
+
+ public boolean isCompressedRegister() {
+ return compressedRegister;
+ }
+
+ public void setCompressedRegister(boolean compressedRegister) {
+ this.compressedRegister = compressedRegister;
+ }
+
+ public boolean isForceRegister() {
+ return forceRegister;
+ }
+
+ public void setForceRegister(boolean forceRegister) {
+ this.forceRegister = forceRegister;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
index 71b00fd..e54000d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
+++ b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
@@ -78,4 +78,13 @@ public class DataVersion extends RemotingSerializable {
}
return result;
}
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("DataVersion[");
+ sb.append("timestamp=").append(timestamp);
+ sb.append(", counter=").append(counter);
+ sb.append(']');
+ return sb.toString();
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
index 3860ec3..564d60c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
@@ -23,14 +23,21 @@ import java.util.concurrent.atomic.AtomicLong;
public class ThreadFactoryImpl implements ThreadFactory {
private final AtomicLong threadIndex = new AtomicLong(0);
private final String threadNamePrefix;
+ private final boolean daemon;
public ThreadFactoryImpl(final String threadNamePrefix) {
+ this(threadNamePrefix, false);
+ }
+
+ public ThreadFactoryImpl(final String threadNamePrefix, boolean daemon) {
this.threadNamePrefix = threadNamePrefix;
+ this.daemon = daemon;
}
@Override
public Thread newThread(Runnable r) {
- return new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
-
+ Thread thread = new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
+ thread.setDaemon(daemon);
+ return thread;
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 5900c0b..8cf2d46 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -165,4 +165,6 @@ public class RequestCode {
public static final int SEND_BATCH_MESSAGE = 320;
public static final int QUERY_CONSUME_QUEUE = 321;
+
+ public static final int QUERY_DATA_VERSION = 322;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
index c220927..2b49b6d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -17,14 +17,155 @@
package org.apache.rocketmq.common.protocol.body;
+import com.alibaba.fastjson.JSON;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RegisterBrokerBody extends RemotingSerializable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RegisterBrokerBody.class);
private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
private List<String> filterServerList = new ArrayList<String>();
+ public byte[] encode(boolean compress) {
+
+ if (!compress) {
+ return super.encode();
+ }
+ long start = System.currentTimeMillis();
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION));
+ DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
+ ConcurrentMap<String, TopicConfig> topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
+ assert topicConfigTable != null;
+ try {
+ byte[] buffer = dataVersion.encode();
+
+ // write data version
+ outputStream.write(convertIntToByteArray(buffer.length));
+ outputStream.write(buffer);
+
+ int topicNumber = topicConfigTable.size();
+
+ // write number of topic configs
+ outputStream.write(convertIntToByteArray(topicNumber));
+
+ // write topic config entry one by one.
+ for (ConcurrentMap.Entry<String, TopicConfig> next : topicConfigTable.entrySet()) {
+ buffer = next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET);
+ outputStream.write(convertIntToByteArray(buffer.length));
+ outputStream.write(buffer);
+ }
+
+ buffer = JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET);
+
+ // write filter server list json length
+ outputStream.write(convertIntToByteArray(buffer.length));
+
+ // write filter server list json
+ outputStream.write(buffer);
+
+ outputStream.finish();
+ long interval = System.currentTimeMillis() - start;
+ if (interval > 50) {
+ LOGGER.info("Compressing takes {}ms", interval);
+ }
+ return byteArrayOutputStream.toByteArray();
+ } catch (IOException e) {
+ LOGGER.error("Failed to compress RegisterBrokerBody object", e);
+ }
+
+ return null;
+ }
+
+ public static RegisterBrokerBody decode(byte[] data, boolean compressed) throws IOException {
+ if (!compressed) {
+ return RegisterBrokerBody.decode(data, RegisterBrokerBody.class);
+ }
+ long start = System.currentTimeMillis();
+ InflaterInputStream inflaterInputStream = new InflaterInputStream(new ByteArrayInputStream(data));
+ int dataVersionLength = readInt(inflaterInputStream);
+ byte[] dataVersionBytes = readBytes(inflaterInputStream, dataVersionLength);
+ DataVersion dataVersion = DataVersion.decode(dataVersionBytes, DataVersion.class);
+
+ RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+ registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
+ ConcurrentMap<String, TopicConfig> topicConfigTable = registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();
+
+ int topicConfigNumber = readInt(inflaterInputStream);
+ LOGGER.debug("{} topic configs to extract", topicConfigNumber);
+
+ for (int i = 0; i < topicConfigNumber; i++) {
+ int topicConfigJsonLength = readInt(inflaterInputStream);
+
+ byte[] buffer = readBytes(inflaterInputStream, topicConfigJsonLength);
+ TopicConfig topicConfig = new TopicConfig();
+ String topicConfigJson = new String(buffer, MixAll.DEFAULT_CHARSET);
+ topicConfig.decode(topicConfigJson);
+ topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
+
+ int filterServerListJsonLength = readInt(inflaterInputStream);
+
+ byte[] filterServerListBuffer = readBytes(inflaterInputStream, filterServerListJsonLength);
+ String filterServerListJson = new String(filterServerListBuffer, MixAll.DEFAULT_CHARSET);
+ List<String> filterServerList = new ArrayList<String>();
+ try {
+ filterServerList = JSON.parseArray(filterServerListJson, String.class);
+ } catch (Exception e) {
+ LOGGER.error("Decompressing occur Exception {}", filterServerListJson);
+ }
+
+ registerBrokerBody.setFilterServerList(filterServerList);
+ long interval = System.currentTimeMillis() - start;
+ if (interval > 50) {
+ LOGGER.info("Decompressing takes {}ms", interval);
+ }
+ return registerBrokerBody;
+ }
+
+ private static byte[] convertIntToByteArray(int n) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(4);
+ byteBuffer.putInt(n);
+ return byteBuffer.array();
+ }
+
+ private static byte[] readBytes(InflaterInputStream inflaterInputStream, int length) throws IOException {
+ byte[] buffer = new byte[length];
+ int bytesRead = 0;
+ while (bytesRead < length) {
+ int len = inflaterInputStream.read(buffer, bytesRead, length - bytesRead);
+ if (len == -1) {
+ throw new IOException("End of compressed data has reached");
+ } else {
+ bytesRead += len;
+ }
+ }
+ return buffer;
+ }
+
+ private static int readInt(InflaterInputStream inflaterInputStream) throws IOException {
+ byte[] buffer = readBytes(inflaterInputStream, 4);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
+ return byteBuffer.getInt();
+ }
+
public TopicConfigSerializeWrapper getTopicConfigSerializeWrapper() {
return topicConfigSerializeWrapper;
}
@@ -40,4 +181,16 @@ public class RegisterBrokerBody extends RemotingSerializable {
public void setFilterServerList(List<String> filterServerList) {
this.filterServerList = filterServerList;
}
+
+ public static ConcurrentMap<String, TopicConfig> cloneTopicConfigTable(
+ ConcurrentMap<String, TopicConfig> topicConfigConcurrentMap) {
+ ConcurrentHashMap<String, TopicConfig> result = new ConcurrentHashMap<String, TopicConfig>();
+ if (topicConfigConcurrentMap != null) {
+ for (Map.Entry<String, TopicConfig> entry : topicConfigConcurrentMap.entrySet()) {
+ result.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return result;
+
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java
similarity index 83%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java
index 45d5b6e..ac6a617 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java
@@ -15,16 +15,13 @@
* limitations under the License.
*/
-/**
- * $Id: RegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
package org.apache.rocketmq.common.protocol.header.namesrv;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class RegisterBrokerRequestHeader implements CommandCustomHeader {
+public class QueryDataVersionRequestHeader implements CommandCustomHeader {
@CFNotNull
private String brokerName;
@CFNotNull
@@ -32,12 +29,11 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
@CFNotNull
private String clusterName;
@CFNotNull
- private String haServerAddr;
- @CFNotNull
private Long brokerId;
@Override
public void checkFields() throws RemotingCommandException {
+
}
public String getBrokerName() {
@@ -64,14 +60,6 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
this.clusterName = clusterName;
}
- public String getHaServerAddr() {
- return haServerAddr;
- }
-
- public void setHaServerAddr(String haServerAddr) {
- this.haServerAddr = haServerAddr;
- }
-
public Long getBrokerId() {
return brokerId;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java
similarity index 50%
copy from common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java
index 3860ec3..90741e5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java
@@ -15,22 +15,34 @@
* limitations under the License.
*/
-package org.apache.rocketmq.common;
+package org.apache.rocketmq.common.protocol.header.namesrv;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class ThreadFactoryImpl implements ThreadFactory {
- private final AtomicLong threadIndex = new AtomicLong(0);
- private final String threadNamePrefix;
+public class QueryDataVersionResponseHeader implements CommandCustomHeader {
+ @CFNotNull
+ private Boolean changed;
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
- public ThreadFactoryImpl(final String threadNamePrefix) {
- this.threadNamePrefix = threadNamePrefix;
}
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
+ public Boolean getChanged() {
+ return changed;
+ }
+ public void setChanged(Boolean changed) {
+ this.changed = changed;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("QueryDataVersionResponseHeader{");
+ sb.append("changed=").append(changed);
+ sb.append('}');
+ return sb.toString();
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
index 45d5b6e..7ed7a40 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
@@ -36,7 +36,8 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
@CFNotNull
private Long brokerId;
- @Override
+ private boolean compressed;
+
public void checkFields() throws RemotingCommandException {
}
@@ -79,4 +80,12 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
public void setBrokerId(Long brokerId) {
this.brokerId = brokerId;
}
+
+ public boolean isCompressed() {
+ return compressed;
+ }
+
+ public void setCompressed(boolean compressed) {
+ this.compressed = compressed;
+ }
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
new file mode 100644
index 0000000..87a0fc0
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+public class RegisterBrokerBodyTest {
+ @Test
+ public void test_encode_decode() throws IOException {
+ RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+ TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+ registerBrokerBody.setTopicConfigSerializeWrapper(topicConfigSerializeWrapper);
+
+ ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
+ for (int i = 0; i < 10000; i++) {
+ topicConfigTable.put(String.valueOf(i), new TopicConfig(String.valueOf(i)));
+ }
+
+ topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+
+ byte[] compareEncode = registerBrokerBody.encode(true);
+ byte[] encode2 = registerBrokerBody.encode(false);
+ System.out.println(compareEncode.length);
+ System.out.println(encode2.length);
+ RegisterBrokerBody decodeRegisterBrokerBody = RegisterBrokerBody.decode(compareEncode, true);
+
+ assertEquals(registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable().size(), decodeRegisterBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable().size());
+
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 4906886..236e6a1 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MQVersion.Version;
import org.apache.rocketmq.common.MixAll;
@@ -41,6 +42,8 @@ import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHea
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
@@ -81,6 +84,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
+ case RequestCode.QUERY_DATA_VERSION:
+ return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
@@ -194,7 +199,11 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) {
- registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
+ try {
+ registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
+ } catch (Exception e) {
+ throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
+ }
} else {
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
@@ -221,6 +230,30 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return response;
}
+ public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
+ final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();
+ final QueryDataVersionRequestHeader requestHeader =
+ (QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);
+ DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);
+
+ Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);
+ if (!changed) {
+ this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr());
+ }
+
+ DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+
+ if (nameSeverDataVersion != null) {
+ response.setBody(nameSeverDataVersion.encode());
+ }
+ responseHeader.setChanged(changed);
+ return response;
+ }
+
public RemotingCommand registerBroker(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index ef02dd0..00962ef 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -183,13 +183,24 @@ public class RouteInfoManager {
return result;
}
- private boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
+ public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
+ DataVersion prev = queryBrokerTopicConfig(brokerAddr);
+ return null == prev || !prev.equals(dataVersion);
+ }
+
+ public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
- if (null == prev || !prev.getDataVersion().equals(dataVersion)) {
- return true;
+ if (prev != null) {
+ return prev.getDataVersion();
}
+ return null;
+ }
- return false;
+ public void updateBrokerInfoUpdateTimestamp(final String brokerAddr) {
+ BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
+ if (prev != null) {
+ prev.setLastUpdateTimestamp(System.currentTimeMillis());
+ }
}
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
--
To stop receiving notification emails like this one, please contact
yukon@apache.org.