You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/03/07 06:38:55 UTC

[GitHub] zhouxinyu closed pull request #205: [ROCKETMQ-319] Improve broker register performance and reduce memory usage

zhouxinyu closed pull request #205: [ROCKETMQ-319] Improve broker register performance and reduce memory usage  
URL: https://github.com/apache/rocketmq/pull/205
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 8891bd322..d08688bcf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -729,14 +729,14 @@ public void start() throws Exception {
             this.filterServerManager.start();
         }
 
-        this.registerBrokerAll(true, false);
+        this.registerBrokerAll(true, false, true);
 
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    BrokerController.this.registerBrokerAll(true, false);
+                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                 } catch (Throwable e) {
                     log.error("registerBrokerAll Exception", e);
                 }
@@ -752,7 +752,7 @@ public void run() {
         }
     }
 
-    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
+    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
         TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
 
         if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
@@ -767,28 +767,56 @@ public synchronized void registerBrokerAll(final boolean checkOrderConfig, boole
             topicConfigWrapper.setTopicConfigTable(topicConfigTable);
         }
 
-        RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
-            this.brokerConfig.getBrokerClusterName(),
+        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
             this.getBrokerAddr(),
             this.brokerConfig.getBrokerName(),
             this.brokerConfig.getBrokerId(),
-            this.getHAServerAddr(),
-            topicConfigWrapper,
-            this.filterServerManager.buildNewFilterServerList(),
-            oneway,
-            this.brokerConfig.getRegisterBrokerTimeoutMills());
-
-        if (registerBrokerResult != null) {
-            if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
-                this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
+            List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
+                this.brokerConfig.getBrokerClusterName(),
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId(),
+                this.getHAServerAddr(),
+                topicConfigWrapper,
+                this.filterServerManager.buildNewFilterServerList(),
+                oneway,
+                this.brokerConfig.getRegisterBrokerTimeoutMills(),
+                this.brokerConfig.isCompressedRegister());
+
+            if (registerBrokerResultList.size() > 0) {
+                RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
+                if (registerBrokerResult != null) {
+                    if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
+                        this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+                    }
+
+                    this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+
+                    if (checkOrderConfig) {
+                        this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
+                    }
+                }
             }
+        }
+    }
 
-            this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+    private boolean needRegister(final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId,
+        final int timeoutMills) {
 
-            if (checkOrderConfig) {
-                this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
+        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
+        List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
+        boolean needRegister = false;
+        for (Boolean changed : changeList) {
+            if (changed) {
+                needRegister = true;
+                break;
             }
         }
+        return needRegister;
     }
 
     public TopicConfigManager getTopicConfigManager() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index cba70a0ee..bfb90e93e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -16,11 +16,19 @@
  */
 package org.apache.rocketmq.broker.out;
 
+import com.google.common.collect.Lists;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
@@ -31,6 +39,8 @@
 import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
@@ -52,6 +62,8 @@
     private final RemotingClient remotingClient;
     private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
     private String nameSrvAddr = null;
+    private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
+        new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
 
     public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
         this(nettyClientConfig, null);
@@ -97,7 +109,7 @@ public void updateNameServerAddressList(final String addrs) {
         this.remotingClient.updateNameServerAddressList(lst);
     }
 
-    public RegisterBrokerResult registerBrokerAll(
+    public List<RegisterBrokerResult> registerBrokerAll(
         final String clusterName,
         final String brokerAddr,
         final String brokerName,
@@ -106,27 +118,41 @@ public RegisterBrokerResult registerBrokerAll(
         final TopicConfigSerializeWrapper topicConfigWrapper,
         final List<String> filterServerList,
         final boolean oneway,
-        final int timeoutMills) {
-        RegisterBrokerResult registerBrokerResult = null;
+        final int timeoutMills,
+        final boolean compressed) {
 
+        final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
         List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
-        if (nameServerAddressList != null) {
-            for (String namesrvAddr : nameServerAddressList) {
-                try {
-                    RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
-                        haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
-                    if (result != null) {
-                        registerBrokerResult = result;
+        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
+            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
+            for (final String namesrvAddr : nameServerAddressList) {
+                brokerOuterExecutor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            RegisterBrokerResult result = registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
+                                haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills, compressed);
+                            if (result != null) {
+                                registerBrokerResultList.add(result);
+                            }
+
+                            log.info("register broker to name server {} OK", namesrvAddr);
+                        } catch (Exception e) {
+                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
+                        } finally {
+                            countDownLatch.countDown();
+                        }
                     }
+                });
+            }
 
-                    log.info("register broker to name server {} OK", namesrvAddr);
-                } catch (Exception e) {
-                    log.warn("registerBroker Exception, {}", namesrvAddr, e);
-                }
+            try {
+                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
             }
         }
 
-        return registerBrokerResult;
+        return registerBrokerResultList;
     }
 
     private RegisterBrokerResult registerBroker(
@@ -139,7 +165,8 @@ private RegisterBrokerResult registerBroker(
         final TopicConfigSerializeWrapper topicConfigWrapper,
         final List<String> filterServerList,
         final boolean oneway,
-        final int timeoutMills
+        final int timeoutMills,
+        final boolean compressed
     ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
         InterruptedException {
         RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
@@ -148,12 +175,13 @@ private RegisterBrokerResult registerBroker(
         requestHeader.setBrokerName(brokerName);
         requestHeader.setClusterName(clusterName);
         requestHeader.setHaServerAddr(haServerAddr);
+        requestHeader.setCompressed(compressed);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
 
         RegisterBrokerBody requestBody = new RegisterBrokerBody();
         requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
         requestBody.setFilterServerList(filterServerList);
-        request.setBody(requestBody.encode());
+        request.setBody(requestBody.encode(requestHeader.isCompressed()));
 
         if (oneway) {
             try {
@@ -231,6 +259,71 @@ public void unregisterBroker(
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
+    public List<Boolean> needRegister(
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId,
+        final TopicConfigSerializeWrapper topicConfigWrapper,
+        final int timeoutMills) {
+        final List<Boolean> changedList = new CopyOnWriteArrayList<>();
+        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
+        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
+            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
+            for (final String namesrvAddr : nameServerAddressList) {
+                brokerOuterExecutor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
+                            requestHeader.setBrokerAddr(brokerAddr);
+                            requestHeader.setBrokerId(brokerId);
+                            requestHeader.setBrokerName(brokerName);
+                            requestHeader.setClusterName(clusterName);
+                            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
+                            request.setBody(topicConfigWrapper.getDataVersion().encode());
+                            RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
+                            DataVersion nameServerDataVersion = null;
+                            Boolean changed = false;
+                            switch (response.getCode()) {
+                                case ResponseCode.SUCCESS: {
+                                    QueryDataVersionResponseHeader queryDataVersionResponseHeader =
+                                        (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
+                                    changed = queryDataVersionResponseHeader.getChanged();
+                                    byte[] body = response.getBody();
+                                    if (body != null) {
+                                        nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
+                                        if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
+                                            changed = true;
+                                        }
+                                    }
+                                    if (changed == null || changed) {
+                                        changedList.add(Boolean.TRUE);
+                                    }
+                                }
+                                default:
+                                    break;
+                            }
+                            log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
+                        } catch (Exception e) {
+                            changedList.add(Boolean.TRUE);
+                            log.error("Query data version from name server {}  Exception, {}", namesrvAddr, e);
+                        } finally {
+                            countDownLatch.countDown();
+                        }
+                    }
+                });
+
+            }
+            try {
+                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                log.error("query dataversion from nameserver countDownLatch await Exception", e);
+            }
+        }
+        return changedList;
+    }
+
     public TopicConfigSerializeWrapper getAllTopicConfig(
         final String addr) throws RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, InterruptedException, MQBrokerException {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index d69a78700..7b21ab47f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -245,7 +245,7 @@ private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
         topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
 
         this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
-        this.brokerController.registerBrokerAll(false, true);
+        this.brokerController.registerBrokerAll(false, true, true);
 
         return null;
     }
@@ -310,8 +310,8 @@ private RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCo
                     log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
                     this.brokerController.getConfiguration().update(properties);
                     if (properties.containsKey("brokerPermission")) {
-                        this.brokerController.registerBrokerAll(false, false);
                         this.brokerController.getTopicConfigManager().getDataVersion().nextVersion();
+                        this.brokerController.registerBrokerAll(false, false, true);
                     }
                 } else {
                     log.error("string2Properties error");
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index cd30a089b..46d161e92 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -210,7 +210,7 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri
         }
 
         if (createNew) {
-            this.brokerController.registerBrokerAll(false, true);
+            this.brokerController.registerBrokerAll(false, true,true);
         }
 
         return topicConfig;
@@ -254,7 +254,7 @@ public TopicConfig createTopicInSendMessageBackMethod(
         }
 
         if (createNew) {
-            this.brokerController.registerBrokerAll(false, true);
+            this.brokerController.registerBrokerAll(false, true,true);
         }
 
         return topicConfig;
@@ -279,7 +279,7 @@ public void updateTopicUnitFlag(final String topic, final boolean unit) {
             this.dataVersion.nextVersion();
 
             this.persist();
-            this.brokerController.registerBrokerAll(false, true);
+            this.brokerController.registerBrokerAll(false, true,true);
         }
     }
 
@@ -299,7 +299,7 @@ public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub)
             this.dataVersion.nextVersion();
 
             this.persist();
-            this.brokerController.registerBrokerAll(false, true);
+            this.brokerController.registerBrokerAll(false, true,true);
         }
     }
 
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
new file mode 100644
index 000000000..69e0dd382
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import org.mockito.Mock;
+import static org.mockito.Mockito.when;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerOuterAPITest {
+    @Mock
+    private ChannelHandlerContext handlerContext;
+    @Spy
+    private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+    @Mock
+    private MessageStore messageStore;
+    private String clusterName = "clusterName";
+    private String brokerName = "brokerName";
+    private String brokerAddr = "brokerAddr";
+    private long brokerId = 0L;
+    private String nameserver1 = "127.0.0.1";
+    private String nameserver2 = "127.0.0.2";
+    private String nameserver3 = "127.0.0.3";
+    private int timeOut = 3000;
+
+    @Mock
+    private NettyRemotingClient nettyRemotingClient;
+
+    private BrokerOuterAPI brokerOuterAPI;
+
+    public void init() throws Exception {
+        brokerOuterAPI = new BrokerOuterAPI(new NettyClientConfig(), null);
+        Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+        field.setAccessible(true);
+        field.set(brokerOuterAPI, nettyRemotingClient);
+    }
+
+    @Test
+    public void test_needRegister_normal() throws Exception {
+        init();
+        brokerOuterAPI.start();
+        final RemotingCommand response = buildResponse(Boolean.TRUE);
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+
+        when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
+        when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response);
+        List<Boolean> booleanList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut);
+        assertEquals(3, booleanList.size());
+        assertEquals(false, booleanList.contains(Boolean.FALSE));
+    }
+
+    @Test
+    public void test_needRegister_timeout() throws Exception {
+        init();
+        brokerOuterAPI.start();
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+
+        when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
+
+        when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock invocation) throws Throwable {
+                if (invocation.getArgument(0) == nameserver1) {
+                    return buildResponse(Boolean.TRUE);
+                } else if (invocation.getArgument(0) == nameserver2) {
+                    return buildResponse(Boolean.FALSE);
+                } else if (invocation.getArgument(0) == nameserver3) {
+                    TimeUnit.MILLISECONDS.sleep(timeOut + 20);
+                    return buildResponse(Boolean.TRUE);
+                }
+                return buildResponse(Boolean.TRUE);
+            }
+        });
+        List<Boolean> booleanList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut);
+        assertEquals(2, booleanList.size());
+        boolean success = Iterables.any(booleanList,
+            new Predicate<Boolean>() {
+                public boolean apply(Boolean input) {
+                    return input ? true : false;
+                }
+            });
+
+        assertEquals(true, success);
+
+    }
+
+    @Test
+    public void test_register_normal() throws Exception {
+        init();
+        brokerOuterAPI.start();
+
+        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
+        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+
+        when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
+        when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response);
+        List<RegisterBrokerResult> registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), false, timeOut, true);
+
+        assertEquals(3, registerBrokerResultList.size());
+    }
+
+    @Test
+    public void test_register_timeout() throws Exception {
+        init();
+        brokerOuterAPI.start();
+
+        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+
+        when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
+        when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock invocation) throws Throwable {
+                if (invocation.getArgument(0) == nameserver1) {
+                    return response;
+                } else if (invocation.getArgument(0) == nameserver2) {
+                    return response;
+                } else if (invocation.getArgument(0) == nameserver3) {
+                    TimeUnit.MILLISECONDS.sleep(timeOut + 20);
+                    return response;
+                }
+                return response;
+            }
+        });
+        List<RegisterBrokerResult> registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), false, timeOut, true);
+
+        assertEquals(2, registerBrokerResultList.size());
+    }
+
+    private RemotingCommand buildResponse(Boolean changed) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
+        final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        responseHeader.setChanged(changed);
+        return response;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index efb36b50a..a5e9d1209 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.common;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
@@ -23,9 +25,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
 public class BrokerConfig {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
@@ -133,6 +132,10 @@
     private boolean filterSupportRetry = false;
     private boolean enablePropertyFilter = false;
 
+    private boolean compressedRegister = false;
+
+    private boolean forceRegister = false;
+
     public boolean isTraceOn() {
         return traceOn;
     }
@@ -598,4 +601,20 @@ public boolean isEnablePropertyFilter() {
     public void setEnablePropertyFilter(boolean enablePropertyFilter) {
         this.enablePropertyFilter = enablePropertyFilter;
     }
+
+    public boolean isCompressedRegister() {
+        return compressedRegister;
+    }
+
+    public void setCompressedRegister(boolean compressedRegister) {
+        this.compressedRegister = compressedRegister;
+    }
+
+    public boolean isForceRegister() {
+        return forceRegister;
+    }
+
+    public void setForceRegister(boolean forceRegister) {
+        this.forceRegister = forceRegister;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
index 71b00fdd7..e54000deb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
+++ b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
@@ -78,4 +78,13 @@ public int hashCode() {
         }
         return result;
     }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("DataVersion[");
+        sb.append("timestamp=").append(timestamp);
+        sb.append(", counter=").append(counter);
+        sb.append(']');
+        return sb.toString();
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
index 3860ec3cc..564d60c54 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
@@ -23,14 +23,21 @@
 public class ThreadFactoryImpl implements ThreadFactory {
     private final AtomicLong threadIndex = new AtomicLong(0);
     private final String threadNamePrefix;
+    private final boolean daemon;
 
     public ThreadFactoryImpl(final String threadNamePrefix) {
+        this(threadNamePrefix, false);
+    }
+
+    public ThreadFactoryImpl(final String threadNamePrefix, boolean daemon) {
         this.threadNamePrefix = threadNamePrefix;
+        this.daemon = daemon;
     }
 
     @Override
     public Thread newThread(Runnable r) {
-        return new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
-
+        Thread thread = new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
+        thread.setDaemon(daemon);
+        return thread;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 5900c0b9d..8cf2d46ad 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -165,4 +165,6 @@
     public static final int SEND_BATCH_MESSAGE = 320;
 
     public static final int QUERY_CONSUME_QUEUE = 321;
+
+    public static final int QUERY_DATA_VERSION = 322;
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
index c220927c2..2b49b6d14 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -17,14 +17,155 @@
 
 package org.apache.rocketmq.common.protocol.body;
 
+import com.alibaba.fastjson.JSON;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RegisterBrokerBody extends RemotingSerializable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RegisterBrokerBody.class);
     private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
     private List<String> filterServerList = new ArrayList<String>();
 
+    public byte[] encode(boolean compress) {
+
+        if (!compress) {
+            return super.encode();
+        }
+        long start = System.currentTimeMillis();
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION));
+        DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
+        ConcurrentMap<String, TopicConfig> topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
+        assert topicConfigTable != null;
+        try {
+            byte[] buffer = dataVersion.encode();
+
+            // write data version
+            outputStream.write(convertIntToByteArray(buffer.length));
+            outputStream.write(buffer);
+
+            int topicNumber = topicConfigTable.size();
+
+            // write number of topic configs
+            outputStream.write(convertIntToByteArray(topicNumber));
+
+            // write topic config entry one by one.
+            for (ConcurrentMap.Entry<String, TopicConfig> next : topicConfigTable.entrySet()) {
+                buffer = next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET);
+                outputStream.write(convertIntToByteArray(buffer.length));
+                outputStream.write(buffer);
+            }
+
+            buffer = JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET);
+
+            // write filter server list json length
+            outputStream.write(convertIntToByteArray(buffer.length));
+
+            // write filter server list json
+            outputStream.write(buffer);
+
+            outputStream.finish();
+            long interval = System.currentTimeMillis() - start;
+            if (interval > 50) {
+                LOGGER.info("Compressing takes {}ms", interval);
+            }
+            return byteArrayOutputStream.toByteArray();
+        } catch (IOException e) {
+            LOGGER.error("Failed to compress RegisterBrokerBody object", e);
+        }
+
+        return null;
+    }
+
+    public static RegisterBrokerBody decode(byte[] data, boolean compressed) throws IOException {
+        if (!compressed) {
+            return RegisterBrokerBody.decode(data, RegisterBrokerBody.class);
+        }
+        long start = System.currentTimeMillis();
+        InflaterInputStream inflaterInputStream = new InflaterInputStream(new ByteArrayInputStream(data));
+        int dataVersionLength = readInt(inflaterInputStream);
+        byte[] dataVersionBytes = readBytes(inflaterInputStream, dataVersionLength);
+        DataVersion dataVersion = DataVersion.decode(dataVersionBytes, DataVersion.class);
+
+        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+        registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
+        ConcurrentMap<String, TopicConfig> topicConfigTable = registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();
+
+        int topicConfigNumber = readInt(inflaterInputStream);
+        LOGGER.debug("{} topic configs to extract", topicConfigNumber);
+
+        for (int i = 0; i < topicConfigNumber; i++) {
+            int topicConfigJsonLength = readInt(inflaterInputStream);
+
+            byte[] buffer = readBytes(inflaterInputStream, topicConfigJsonLength);
+            TopicConfig topicConfig = new TopicConfig();
+            String topicConfigJson = new String(buffer, MixAll.DEFAULT_CHARSET);
+            topicConfig.decode(topicConfigJson);
+            topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+
+        int filterServerListJsonLength = readInt(inflaterInputStream);
+
+        byte[] filterServerListBuffer = readBytes(inflaterInputStream, filterServerListJsonLength);
+        String filterServerListJson = new String(filterServerListBuffer, MixAll.DEFAULT_CHARSET);
+        List<String> filterServerList = new ArrayList<String>();
+        try {
+            filterServerList = JSON.parseArray(filterServerListJson, String.class);
+        } catch (Exception e) {
+            LOGGER.error("Decompressing occur Exception {}", filterServerListJson);
+        }
+
+        registerBrokerBody.setFilterServerList(filterServerList);
+        long interval = System.currentTimeMillis() - start;
+        if (interval > 50) {
+            LOGGER.info("Decompressing takes {}ms", interval);
+        }
+        return registerBrokerBody;
+    }
+
+    private static byte[] convertIntToByteArray(int n) {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(4);
+        byteBuffer.putInt(n);
+        return byteBuffer.array();
+    }
+
+    private static byte[] readBytes(InflaterInputStream inflaterInputStream, int length) throws IOException {
+        byte[] buffer = new byte[length];
+        int bytesRead = 0;
+        while (bytesRead < length) {
+            int len = inflaterInputStream.read(buffer, bytesRead, length - bytesRead);
+            if (len == -1) {
+                throw new IOException("End of compressed data has reached");
+            } else {
+                bytesRead += len;
+            }
+        }
+        return buffer;
+    }
+
+    private static int readInt(InflaterInputStream inflaterInputStream) throws IOException {
+        byte[] buffer = readBytes(inflaterInputStream, 4);
+        ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
+        return byteBuffer.getInt();
+    }
+
     public TopicConfigSerializeWrapper getTopicConfigSerializeWrapper() {
         return topicConfigSerializeWrapper;
     }
@@ -40,4 +181,16 @@ public void setTopicConfigSerializeWrapper(TopicConfigSerializeWrapper topicConf
     public void setFilterServerList(List<String> filterServerList) {
         this.filterServerList = filterServerList;
     }
+
+    public static ConcurrentMap<String, TopicConfig> cloneTopicConfigTable(
+        ConcurrentMap<String, TopicConfig> topicConfigConcurrentMap) {
+        ConcurrentHashMap<String, TopicConfig> result = new ConcurrentHashMap<String, TopicConfig>();
+        if (topicConfigConcurrentMap != null) {
+            for (Map.Entry<String, TopicConfig> entry : topicConfigConcurrentMap.entrySet()) {
+                result.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return result;
+
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java
new file mode 100644
index 000000000..ac6a617db
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class QueryDataVersionRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String brokerName;
+    @CFNotNull
+    private String brokerAddr;
+    @CFNotNull
+    private String clusterName;
+    @CFNotNull
+    private Long brokerId;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public Long getBrokerId() {
+        return brokerId;
+    }
+
+    public void setBrokerId(Long brokerId) {
+        this.brokerId = brokerId;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java
new file mode 100644
index 000000000..90741e5f5
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class QueryDataVersionResponseHeader implements CommandCustomHeader {
+    @CFNotNull
+    private Boolean changed;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public Boolean getChanged() {
+        return changed;
+    }
+
+    public void setChanged(Boolean changed) {
+        this.changed = changed;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("QueryDataVersionResponseHeader{");
+        sb.append("changed=").append(changed);
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
index 45d5b6e9e..7ed7a403d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
@@ -36,7 +36,8 @@
     @CFNotNull
     private Long brokerId;
 
-    @Override
+    private boolean compressed;
+
     public void checkFields() throws RemotingCommandException {
     }
 
@@ -79,4 +80,12 @@ public Long getBrokerId() {
     public void setBrokerId(Long brokerId) {
         this.brokerId = brokerId;
     }
+
+    public boolean isCompressed() {
+        return compressed;
+    }
+
+    public void setCompressed(boolean compressed) {
+        this.compressed = compressed;
+    }
 }
diff --git a/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
new file mode 100644
index 000000000..87a0fc008
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+public class RegisterBrokerBodyTest {
+    @Test
+    public void test_encode_decode() throws IOException {
+        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+        registerBrokerBody.setTopicConfigSerializeWrapper(topicConfigSerializeWrapper);
+        
+        ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
+        for (int i = 0; i < 10000; i++) {
+            topicConfigTable.put(String.valueOf(i), new TopicConfig(String.valueOf(i)));
+        }
+
+        topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+
+        byte[] compareEncode = registerBrokerBody.encode(true);
+        byte[] encode2 = registerBrokerBody.encode(false);
+        System.out.println(compareEncode.length);
+        System.out.println(encode2.length);
+        RegisterBrokerBody decodeRegisterBrokerBody = RegisterBrokerBody.decode(compareEncode, true);
+
+        assertEquals(registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable().size(), decodeRegisterBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable().size());
+
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index ed5b20b16..d0a62696a 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -20,6 +20,7 @@
 import java.io.UnsupportedEncodingException;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MQVersion.Version;
 import org.apache.rocketmq.common.MixAll;
@@ -39,6 +40,8 @@
 import org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
@@ -79,6 +82,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
                 return this.getKVConfig(ctx, request);
             case RequestCode.DELETE_KV_CONFIG:
                 return this.deleteKVConfig(ctx, request);
+            case RequestCode.QUERY_DATA_VERSION:
+                return queryBrokerTopicConfig(ctx, request);
             case RequestCode.REGISTER_BROKER:
                 Version brokerVersion = MQVersion.value2Version(request.getVersion());
                 if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
@@ -192,7 +197,11 @@ public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx,
         RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
 
         if (request.getBody() != null) {
-            registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
+            try {
+                registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
+            } catch (Exception e) {
+                throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
+            }
         } else {
             registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
             registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
@@ -219,6 +228,30 @@ public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx,
         return response;
     }
 
+    public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
+        final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();
+        final QueryDataVersionRequestHeader requestHeader =
+            (QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);
+        DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);
+
+        Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);
+        if (!changed) {
+            this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr());
+        }
+
+        DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        if (nameSeverDataVersion != null) {
+            response.setBody(nameSeverDataVersion.encode());
+        }
+        responseHeader.setChanged(changed);
+        return response;
+    }
+
     public RemotingCommand registerBroker(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 109d3e810..36647df2e 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -183,13 +183,24 @@ public RegisterBrokerResult registerBroker(
         return result;
     }
 
-    private boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
+    public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
+        DataVersion prev = queryBrokerTopicConfig(brokerAddr);
+        return null == prev || !prev.equals(dataVersion);
+    }
+
+    public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
         BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
-        if (null == prev || !prev.getDataVersion().equals(dataVersion)) {
-            return true;
+        if (prev != null) {
+            return prev.getDataVersion();
         }
+        return null;
+    }
 
-        return false;
+    public void updateBrokerInfoUpdateTimestamp(final String brokerAddr) {
+        BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
+        if (prev != null) {
+            prev.setLastUpdateTimestamp(System.currentTimeMillis());
+        }
     }
 
     private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services