You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2018/03/07 06:38:56 UTC

[rocketmq] branch develop updated: [ROCKETMQ-319] Improve broker register performance and reduce memory usage (#205)

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e744222  [ROCKETMQ-319] Improve broker register performance and reduce memory usage   (#205)
e744222 is described below

commit e74422252719cfae89e13b0231f52f1a6538db48
Author: fuyou001 <fu...@gmail.com>
AuthorDate: Wed Mar 7 14:38:54 2018 +0800

    [ROCKETMQ-319] Improve broker register performance and reduce memory usage   (#205)
---
 .../apache/rocketmq/broker/BrokerController.java   |  62 +++++--
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 127 ++++++++++++--
 .../broker/processor/AdminBrokerProcessor.java     |   4 +-
 .../rocketmq/broker/topic/TopicConfigManager.java  |   8 +-
 .../apache/rocketmq/broker/BrokerOuterAPITest.java | 191 +++++++++++++++++++++
 .../org/apache/rocketmq/common/BrokerConfig.java   |  25 ++-
 .../org/apache/rocketmq/common/DataVersion.java    |   9 +
 .../apache/rocketmq/common/ThreadFactoryImpl.java  |  11 +-
 .../rocketmq/common/protocol/RequestCode.java      |   2 +
 .../common/protocol/body/RegisterBrokerBody.java   | 153 +++++++++++++++++
 ...der.java => QueryDataVersionRequestHeader.java} |  16 +-
 .../namesrv/QueryDataVersionResponseHeader.java}   |  34 ++--
 .../namesrv/RegisterBrokerRequestHeader.java       |  11 +-
 .../rocketmq/common/RegisterBrokerBodyTest.java    |  51 ++++++
 .../namesrv/processor/DefaultRequestProcessor.java |  35 +++-
 .../namesrv/routeinfo/RouteInfoManager.java        |  19 +-
 16 files changed, 682 insertions(+), 76 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 409b1d0..60f287a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -729,14 +729,14 @@ public class BrokerController {
             this.filterServerManager.start();
         }
 
-        this.registerBrokerAll(true, false);
+        this.registerBrokerAll(true, false, true);
 
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
             @Override
             public void run() {
                 try {
-                    BrokerController.this.registerBrokerAll(true, false);
+                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                 } catch (Throwable e) {
                     log.error("registerBrokerAll Exception", e);
                 }
@@ -752,7 +752,7 @@ public class BrokerController {
         }
     }
 
-    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
+    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
         TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
 
         if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
@@ -767,28 +767,56 @@ public class BrokerController {
             topicConfigWrapper.setTopicConfigTable(topicConfigTable);
         }
 
-        RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
-            this.brokerConfig.getBrokerClusterName(),
+        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
             this.getBrokerAddr(),
             this.brokerConfig.getBrokerName(),
             this.brokerConfig.getBrokerId(),
-            this.getHAServerAddr(),
-            topicConfigWrapper,
-            this.filterServerManager.buildNewFilterServerList(),
-            oneway,
-            this.brokerConfig.getRegisterBrokerTimeoutMills());
-
-        if (registerBrokerResult != null) {
-            if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
-                this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
+            List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
+                this.brokerConfig.getBrokerClusterName(),
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId(),
+                this.getHAServerAddr(),
+                topicConfigWrapper,
+                this.filterServerManager.buildNewFilterServerList(),
+                oneway,
+                this.brokerConfig.getRegisterBrokerTimeoutMills(),
+                this.brokerConfig.isCompressedRegister());
+
+            if (registerBrokerResultList.size() > 0) {
+                RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
+                if (registerBrokerResult != null) {
+                    if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
+                        this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+                    }
+
+                    this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+
+                    if (checkOrderConfig) {
+                        this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
+                    }
+                }
             }
+        }
+    }
 
-            this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+    private boolean needRegister(final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId,
+        final int timeoutMills) {
 
-            if (checkOrderConfig) {
-                this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
+        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
+        List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
+        boolean needRegister = false;
+        for (Boolean changed : changeList) {
+            if (changed) {
+                needRegister = true;
+                break;
             }
         }
+        return needRegister;
     }
 
     public TopicConfigManager getTopicConfigManager() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 87c00a3..262e2d2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -16,11 +16,19 @@
  */
 package org.apache.rocketmq.broker.out;
 
+import com.google.common.collect.Lists;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -33,6 +41,8 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
@@ -52,6 +62,8 @@ public class BrokerOuterAPI {
     private final RemotingClient remotingClient;
     private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
     private String nameSrvAddr = null;
+    private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
+        new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
 
     public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
         this(nettyClientConfig, null);
@@ -97,7 +109,7 @@ public class BrokerOuterAPI {
         this.remotingClient.updateNameServerAddressList(lst);
     }
 
-    public RegisterBrokerResult registerBrokerAll(
+    public List<RegisterBrokerResult> registerBrokerAll(
         final String clusterName,
         final String brokerAddr,
         final String brokerName,
@@ -106,27 +118,41 @@ public class BrokerOuterAPI {
         final TopicConfigSerializeWrapper topicConfigWrapper,
         final List<String> filterServerList,
         final boolean oneway,
-        final int timeoutMills) {
-        RegisterBrokerResult registerBrokerResult = null;
+        final int timeoutMills,
+        final boolean compressed) {
 
+        final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
         List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
-        if (nameServerAddressList != null) {
-            for (String namesrvAddr : nameServerAddressList) {
-                try {
-                    RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
-                        haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
-                    if (result != null) {
-                        registerBrokerResult = result;
+        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
+            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
+            for (final String namesrvAddr : nameServerAddressList) {
+                brokerOuterExecutor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            RegisterBrokerResult result = registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
+                                haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills, compressed);
+                            if (result != null) {
+                                registerBrokerResultList.add(result);
+                            }
+
+                            log.info("register broker to name server {} OK", namesrvAddr);
+                        } catch (Exception e) {
+                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
+                        } finally {
+                            countDownLatch.countDown();
+                        }
                     }
+                });
+            }
 
-                    log.info("register broker to name server {} OK", namesrvAddr);
-                } catch (Exception e) {
-                    log.warn("registerBroker Exception, {}", namesrvAddr, e);
-                }
+            try {
+                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
             }
         }
 
-        return registerBrokerResult;
+        return registerBrokerResultList;
     }
 
     private RegisterBrokerResult registerBroker(
@@ -139,7 +165,8 @@ public class BrokerOuterAPI {
         final TopicConfigSerializeWrapper topicConfigWrapper,
         final List<String> filterServerList,
         final boolean oneway,
-        final int timeoutMills
+        final int timeoutMills,
+        final boolean compressed
     ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
         InterruptedException {
         RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
@@ -148,12 +175,13 @@ public class BrokerOuterAPI {
         requestHeader.setBrokerName(brokerName);
         requestHeader.setClusterName(clusterName);
         requestHeader.setHaServerAddr(haServerAddr);
+        requestHeader.setCompressed(compressed);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
 
         RegisterBrokerBody requestBody = new RegisterBrokerBody();
         requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
         requestBody.setFilterServerList(filterServerList);
-        request.setBody(requestBody.encode());
+        request.setBody(requestBody.encode(requestHeader.isCompressed()));
 
         if (oneway) {
             try {
@@ -231,6 +259,71 @@ public class BrokerOuterAPI {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
+    public List<Boolean> needRegister(
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId,
+        final TopicConfigSerializeWrapper topicConfigWrapper,
+        final int timeoutMills) {
+        final List<Boolean> changedList = new CopyOnWriteArrayList<>();
+        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
+        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
+            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
+            for (final String namesrvAddr : nameServerAddressList) {
+                brokerOuterExecutor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
+                            requestHeader.setBrokerAddr(brokerAddr);
+                            requestHeader.setBrokerId(brokerId);
+                            requestHeader.setBrokerName(brokerName);
+                            requestHeader.setClusterName(clusterName);
+                            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
+                            request.setBody(topicConfigWrapper.getDataVersion().encode());
+                            RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
+                            DataVersion nameServerDataVersion = null;
+                            Boolean changed = false;
+                            switch (response.getCode()) {
+                                case ResponseCode.SUCCESS: {
+                                    QueryDataVersionResponseHeader queryDataVersionResponseHeader =
+                                        (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
+                                    changed = queryDataVersionResponseHeader.getChanged();
+                                    byte[] body = response.getBody();
+                                    if (body != null) {
+                                        nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
+                                        if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
+                                            changed = true;
+                                        }
+                                    }
+                                    if (changed == null || changed) {
+                                        changedList.add(Boolean.TRUE);
+                                    }
+                                }
+                                default:
+                                    break;
+                            }
+                            log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
+                        } catch (Exception e) {
+                            changedList.add(Boolean.TRUE);
+                            log.error("Query data version from name server {}  Exception, {}", namesrvAddr, e);
+                        } finally {
+                            countDownLatch.countDown();
+                        }
+                    }
+                });
+
+            }
+            try {
+                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                log.error("query dataversion from nameserver countDownLatch await Exception", e);
+            }
+        }
+        return changedList;
+    }
+
     public TopicConfigSerializeWrapper getAllTopicConfig(
         final String addr) throws RemotingConnectException, RemotingSendRequestException,
         RemotingTimeoutException, InterruptedException, MQBrokerException {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index e8be2d4..a9e54aa 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -245,7 +245,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
 
         this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
-        this.brokerController.registerBrokerAll(false, true);
+        this.brokerController.registerBrokerAll(false, true, true);
 
         return null;
     }
@@ -310,8 +310,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                     log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
                     this.brokerController.getConfiguration().update(properties);
                     if (properties.containsKey("brokerPermission")) {
-                        this.brokerController.registerBrokerAll(false, false);
                         this.brokerController.getTopicConfigManager().getDataVersion().nextVersion();
+                        this.brokerController.registerBrokerAll(false, false, true);
                     }
                 } else {
                     log.error("string2Properties error");
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 29e2280..cdae66f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -210,7 +210,7 @@ public class TopicConfigManager extends ConfigManager {
         }
 
         if (createNew) {
-            this.brokerController.registerBrokerAll(false, true);
+            this.brokerController.registerBrokerAll(false, true,true);
         }
 
         return topicConfig;
@@ -254,7 +254,7 @@ public class TopicConfigManager extends ConfigManager {
         }
 
         if (createNew) {
-            this.brokerController.registerBrokerAll(false, true);
+            this.brokerController.registerBrokerAll(false, true,true);
         }
 
         return topicConfig;
@@ -279,7 +279,7 @@ public class TopicConfigManager extends ConfigManager {
             this.dataVersion.nextVersion();
 
             this.persist();
-            this.brokerController.registerBrokerAll(false, true);
+            this.brokerController.registerBrokerAll(false, true,true);
         }
     }
 
@@ -299,7 +299,7 @@ public class TopicConfigManager extends ConfigManager {
             this.dataVersion.nextVersion();
 
             this.persist();
-            this.brokerController.registerBrokerAll(false, true);
+            this.brokerController.registerBrokerAll(false, true,true);
         }
     }
 
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
new file mode 100644
index 0000000..69e0dd3
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import org.mockito.Mock;
+import static org.mockito.Mockito.when;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BrokerOuterAPITest {
+    @Mock
+    private ChannelHandlerContext handlerContext;
+    @Spy
+    private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+    @Mock
+    private MessageStore messageStore;
+    private String clusterName = "clusterName";
+    private String brokerName = "brokerName";
+    private String brokerAddr = "brokerAddr";
+    private long brokerId = 0L;
+    private String nameserver1 = "127.0.0.1";
+    private String nameserver2 = "127.0.0.2";
+    private String nameserver3 = "127.0.0.3";
+    private int timeOut = 3000;
+
+    @Mock
+    private NettyRemotingClient nettyRemotingClient;
+
+    private BrokerOuterAPI brokerOuterAPI;
+
+    public void init() throws Exception {
+        brokerOuterAPI = new BrokerOuterAPI(new NettyClientConfig(), null);
+        Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
+        field.setAccessible(true);
+        field.set(brokerOuterAPI, nettyRemotingClient);
+    }
+
+    @Test
+    public void test_needRegister_normal() throws Exception {
+        init();
+        brokerOuterAPI.start();
+        final RemotingCommand response = buildResponse(Boolean.TRUE);
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+
+        when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
+        when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response);
+        List<Boolean> booleanList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut);
+        assertEquals(3, booleanList.size());
+        assertEquals(false, booleanList.contains(Boolean.FALSE));
+    }
+
+    @Test
+    public void test_needRegister_timeout() throws Exception {
+        init();
+        brokerOuterAPI.start();
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+
+        when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
+
+        when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock invocation) throws Throwable {
+                if (invocation.getArgument(0) == nameserver1) {
+                    return buildResponse(Boolean.TRUE);
+                } else if (invocation.getArgument(0) == nameserver2) {
+                    return buildResponse(Boolean.FALSE);
+                } else if (invocation.getArgument(0) == nameserver3) {
+                    TimeUnit.MILLISECONDS.sleep(timeOut + 20);
+                    return buildResponse(Boolean.TRUE);
+                }
+                return buildResponse(Boolean.TRUE);
+            }
+        });
+        List<Boolean> booleanList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut);
+        assertEquals(2, booleanList.size());
+        boolean success = Iterables.any(booleanList,
+            new Predicate<Boolean>() {
+                public boolean apply(Boolean input) {
+                    return input ? true : false;
+                }
+            });
+
+        assertEquals(true, success);
+
+    }
+
+    @Test
+    public void test_register_normal() throws Exception {
+        init();
+        brokerOuterAPI.start();
+
+        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
+        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+
+        when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
+        when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response);
+        List<RegisterBrokerResult> registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), false, timeOut, true);
+
+        assertEquals(3, registerBrokerResultList.size());
+    }
+
+    @Test
+    public void test_register_timeout() throws Exception {
+        init();
+        brokerOuterAPI.start();
+
+        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+
+        when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
+        when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenAnswer(new Answer<RemotingCommand>() {
+            @Override
+            public RemotingCommand answer(InvocationOnMock invocation) throws Throwable {
+                if (invocation.getArgument(0) == nameserver1) {
+                    return response;
+                } else if (invocation.getArgument(0) == nameserver2) {
+                    return response;
+                } else if (invocation.getArgument(0) == nameserver3) {
+                    TimeUnit.MILLISECONDS.sleep(timeOut + 20);
+                    return response;
+                }
+                return response;
+            }
+        });
+        List<RegisterBrokerResult> registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), false, timeOut, true);
+
+        assertEquals(2, registerBrokerResultList.size());
+    }
+
+    private RemotingCommand buildResponse(Boolean changed) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
+        final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        responseHeader.setChanged(changed);
+        return response;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 4d7eb46..4468b2d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.common;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
@@ -23,9 +25,6 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
 public class BrokerConfig {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
@@ -133,6 +132,10 @@ public class BrokerConfig {
     private boolean filterSupportRetry = false;
     private boolean enablePropertyFilter = false;
 
+    private boolean compressedRegister = false;
+
+    private boolean forceRegister = false;
+
     public boolean isTraceOn() {
         return traceOn;
     }
@@ -598,4 +601,20 @@ public class BrokerConfig {
     public void setEnablePropertyFilter(boolean enablePropertyFilter) {
         this.enablePropertyFilter = enablePropertyFilter;
     }
+
+    public boolean isCompressedRegister() {
+        return compressedRegister;
+    }
+
+    public void setCompressedRegister(boolean compressedRegister) {
+        this.compressedRegister = compressedRegister;
+    }
+
+    public boolean isForceRegister() {
+        return forceRegister;
+    }
+
+    public void setForceRegister(boolean forceRegister) {
+        this.forceRegister = forceRegister;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
index 71b00fd..e54000d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
+++ b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
@@ -78,4 +78,13 @@ public class DataVersion extends RemotingSerializable {
         }
         return result;
     }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("DataVersion[");
+        sb.append("timestamp=").append(timestamp);
+        sb.append(", counter=").append(counter);
+        sb.append(']');
+        return sb.toString();
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
index 3860ec3..564d60c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
@@ -23,14 +23,21 @@ import java.util.concurrent.atomic.AtomicLong;
 public class ThreadFactoryImpl implements ThreadFactory {
     private final AtomicLong threadIndex = new AtomicLong(0);
     private final String threadNamePrefix;
+    private final boolean daemon;
 
     public ThreadFactoryImpl(final String threadNamePrefix) {
+        this(threadNamePrefix, false);
+    }
+
+    public ThreadFactoryImpl(final String threadNamePrefix, boolean daemon) {
         this.threadNamePrefix = threadNamePrefix;
+        this.daemon = daemon;
     }
 
     @Override
     public Thread newThread(Runnable r) {
-        return new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
-
+        Thread thread = new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
+        thread.setDaemon(daemon);
+        return thread;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 5900c0b..8cf2d46 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -165,4 +165,6 @@ public class RequestCode {
     public static final int SEND_BATCH_MESSAGE = 320;
 
     public static final int QUERY_CONSUME_QUEUE = 321;
+
+    public static final int QUERY_DATA_VERSION = 322;
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
index c220927..2b49b6d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -17,14 +17,155 @@
 
 package org.apache.rocketmq.common.protocol.body;
 
+import com.alibaba.fastjson.JSON;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RegisterBrokerBody extends RemotingSerializable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RegisterBrokerBody.class);
     private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
     private List<String> filterServerList = new ArrayList<String>();
 
+    public byte[] encode(boolean compress) {
+
+        if (!compress) {
+            return super.encode();
+        }
+        long start = System.currentTimeMillis();
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION));
+        DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
+        ConcurrentMap<String, TopicConfig> topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
+        assert topicConfigTable != null;
+        try {
+            byte[] buffer = dataVersion.encode();
+
+            // write data version
+            outputStream.write(convertIntToByteArray(buffer.length));
+            outputStream.write(buffer);
+
+            int topicNumber = topicConfigTable.size();
+
+            // write number of topic configs
+            outputStream.write(convertIntToByteArray(topicNumber));
+
+            // write topic config entry one by one.
+            for (ConcurrentMap.Entry<String, TopicConfig> next : topicConfigTable.entrySet()) {
+                buffer = next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET);
+                outputStream.write(convertIntToByteArray(buffer.length));
+                outputStream.write(buffer);
+            }
+
+            buffer = JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET);
+
+            // write filter server list json length
+            outputStream.write(convertIntToByteArray(buffer.length));
+
+            // write filter server list json
+            outputStream.write(buffer);
+
+            outputStream.finish();
+            long interval = System.currentTimeMillis() - start;
+            if (interval > 50) {
+                LOGGER.info("Compressing takes {}ms", interval);
+            }
+            return byteArrayOutputStream.toByteArray();
+        } catch (IOException e) {
+            LOGGER.error("Failed to compress RegisterBrokerBody object", e);
+        }
+
+        return null;
+    }
+
+    public static RegisterBrokerBody decode(byte[] data, boolean compressed) throws IOException {
+        if (!compressed) {
+            return RegisterBrokerBody.decode(data, RegisterBrokerBody.class);
+        }
+        long start = System.currentTimeMillis();
+        InflaterInputStream inflaterInputStream = new InflaterInputStream(new ByteArrayInputStream(data));
+        int dataVersionLength = readInt(inflaterInputStream);
+        byte[] dataVersionBytes = readBytes(inflaterInputStream, dataVersionLength);
+        DataVersion dataVersion = DataVersion.decode(dataVersionBytes, DataVersion.class);
+
+        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+        registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
+        ConcurrentMap<String, TopicConfig> topicConfigTable = registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();
+
+        int topicConfigNumber = readInt(inflaterInputStream);
+        LOGGER.debug("{} topic configs to extract", topicConfigNumber);
+
+        for (int i = 0; i < topicConfigNumber; i++) {
+            int topicConfigJsonLength = readInt(inflaterInputStream);
+
+            byte[] buffer = readBytes(inflaterInputStream, topicConfigJsonLength);
+            TopicConfig topicConfig = new TopicConfig();
+            String topicConfigJson = new String(buffer, MixAll.DEFAULT_CHARSET);
+            topicConfig.decode(topicConfigJson);
+            topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+
+        int filterServerListJsonLength = readInt(inflaterInputStream);
+
+        byte[] filterServerListBuffer = readBytes(inflaterInputStream, filterServerListJsonLength);
+        String filterServerListJson = new String(filterServerListBuffer, MixAll.DEFAULT_CHARSET);
+        List<String> filterServerList = new ArrayList<String>();
+        try {
+            filterServerList = JSON.parseArray(filterServerListJson, String.class);
+        } catch (Exception e) {
+            LOGGER.error("Decompressing occur Exception {}", filterServerListJson);
+        }
+
+        registerBrokerBody.setFilterServerList(filterServerList);
+        long interval = System.currentTimeMillis() - start;
+        if (interval > 50) {
+            LOGGER.info("Decompressing takes {}ms", interval);
+        }
+        return registerBrokerBody;
+    }
+
+    private static byte[] convertIntToByteArray(int n) {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(4);
+        byteBuffer.putInt(n);
+        return byteBuffer.array();
+    }
+
+    private static byte[] readBytes(InflaterInputStream inflaterInputStream, int length) throws IOException {
+        byte[] buffer = new byte[length];
+        int bytesRead = 0;
+        while (bytesRead < length) {
+            int len = inflaterInputStream.read(buffer, bytesRead, length - bytesRead);
+            if (len == -1) {
+                throw new IOException("End of compressed data has reached");
+            } else {
+                bytesRead += len;
+            }
+        }
+        return buffer;
+    }
+
+    private static int readInt(InflaterInputStream inflaterInputStream) throws IOException {
+        byte[] buffer = readBytes(inflaterInputStream, 4);
+        ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
+        return byteBuffer.getInt();
+    }
+
     public TopicConfigSerializeWrapper getTopicConfigSerializeWrapper() {
         return topicConfigSerializeWrapper;
     }
@@ -40,4 +181,16 @@ public class RegisterBrokerBody extends RemotingSerializable {
     public void setFilterServerList(List<String> filterServerList) {
         this.filterServerList = filterServerList;
     }
+
+    public static ConcurrentMap<String, TopicConfig> cloneTopicConfigTable(
+        ConcurrentMap<String, TopicConfig> topicConfigConcurrentMap) {
+        ConcurrentHashMap<String, TopicConfig> result = new ConcurrentHashMap<String, TopicConfig>();
+        if (topicConfigConcurrentMap != null) {
+            for (Map.Entry<String, TopicConfig> entry : topicConfigConcurrentMap.entrySet()) {
+                result.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return result;
+
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java
similarity index 83%
copy from common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java
index 45d5b6e..ac6a617 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionRequestHeader.java
@@ -15,16 +15,13 @@
  * limitations under the License.
  */
 
-/**
- * $Id: RegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
 package org.apache.rocketmq.common.protocol.header.namesrv;
 
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class RegisterBrokerRequestHeader implements CommandCustomHeader {
+public class QueryDataVersionRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private String brokerName;
     @CFNotNull
@@ -32,12 +29,11 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private String clusterName;
     @CFNotNull
-    private String haServerAddr;
-    @CFNotNull
     private Long brokerId;
 
     @Override
     public void checkFields() throws RemotingCommandException {
+
     }
 
     public String getBrokerName() {
@@ -64,14 +60,6 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
         this.clusterName = clusterName;
     }
 
-    public String getHaServerAddr() {
-        return haServerAddr;
-    }
-
-    public void setHaServerAddr(String haServerAddr) {
-        this.haServerAddr = haServerAddr;
-    }
-
     public Long getBrokerId() {
         return brokerId;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java
similarity index 50%
copy from common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java
index 3860ec3..90741e5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/QueryDataVersionResponseHeader.java
@@ -15,22 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.common;
+package org.apache.rocketmq.common.protocol.header.namesrv;
 
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class ThreadFactoryImpl implements ThreadFactory {
-    private final AtomicLong threadIndex = new AtomicLong(0);
-    private final String threadNamePrefix;
+public class QueryDataVersionResponseHeader implements CommandCustomHeader {
+    @CFNotNull
+    private Boolean changed;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
 
-    public ThreadFactoryImpl(final String threadNamePrefix) {
-        this.threadNamePrefix = threadNamePrefix;
     }
 
-    @Override
-    public Thread newThread(Runnable r) {
-        return new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
+    public Boolean getChanged() {
+        return changed;
+    }
 
+    public void setChanged(Boolean changed) {
+        this.changed = changed;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("QueryDataVersionResponseHeader{");
+        sb.append("changed=").append(changed);
+        sb.append('}');
+        return sb.toString();
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
index 45d5b6e..7ed7a40 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
@@ -36,7 +36,8 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
     @CFNotNull
     private Long brokerId;
 
-    @Override
+    private boolean compressed;
+
     public void checkFields() throws RemotingCommandException {
     }
 
@@ -79,4 +80,12 @@ public class RegisterBrokerRequestHeader implements CommandCustomHeader {
     public void setBrokerId(Long brokerId) {
         this.brokerId = brokerId;
     }
+
+    public boolean isCompressed() {
+        return compressed;
+    }
+
+    public void setCompressed(boolean compressed) {
+        this.compressed = compressed;
+    }
 }
diff --git a/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
new file mode 100644
index 0000000..87a0fc0
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/RegisterBrokerBodyTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+public class RegisterBrokerBodyTest {
+    @Test
+    public void test_encode_decode() throws IOException {
+        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+        registerBrokerBody.setTopicConfigSerializeWrapper(topicConfigSerializeWrapper);
+        
+        ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
+        for (int i = 0; i < 10000; i++) {
+            topicConfigTable.put(String.valueOf(i), new TopicConfig(String.valueOf(i)));
+        }
+
+        topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
+
+        byte[] compareEncode = registerBrokerBody.encode(true);
+        byte[] encode2 = registerBrokerBody.encode(false);
+        System.out.println(compareEncode.length);
+        System.out.println(encode2.length);
+        RegisterBrokerBody decodeRegisterBrokerBody = RegisterBrokerBody.decode(compareEncode, true);
+
+        assertEquals(registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable().size(), decodeRegisterBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable().size());
+
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 4906886..236e6a1 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext;
 import java.io.UnsupportedEncodingException;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MQVersion.Version;
 import org.apache.rocketmq.common.MixAll;
@@ -41,6 +42,8 @@ import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHea
 import org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
@@ -81,6 +84,8 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
                 return this.getKVConfig(ctx, request);
             case RequestCode.DELETE_KV_CONFIG:
                 return this.deleteKVConfig(ctx, request);
+            case RequestCode.QUERY_DATA_VERSION:
+                return queryBrokerTopicConfig(ctx, request);
             case RequestCode.REGISTER_BROKER:
                 Version brokerVersion = MQVersion.value2Version(request.getVersion());
                 if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
@@ -194,7 +199,11 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
 
         if (request.getBody() != null) {
-            registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
+            try {
+                registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
+            } catch (Exception e) {
+                throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
+            }
         } else {
             registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
             registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
@@ -221,6 +230,30 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
         return response;
     }
 
+    public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);
+        final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();
+        final QueryDataVersionRequestHeader requestHeader =
+            (QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);
+        DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);
+
+        Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);
+        if (!changed) {
+            this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr());
+        }
+
+        DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        if (nameSeverDataVersion != null) {
+            response.setBody(nameSeverDataVersion.encode());
+        }
+        responseHeader.setChanged(changed);
+        return response;
+    }
+
     public RemotingCommand registerBroker(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index ef02dd0..00962ef 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -183,13 +183,24 @@ public class RouteInfoManager {
         return result;
     }
 
-    private boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
+    public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
+        DataVersion prev = queryBrokerTopicConfig(brokerAddr);
+        return null == prev || !prev.equals(dataVersion);
+    }
+
+    public DataVersion queryBrokerTopicConfig(final String brokerAddr) {
         BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
-        if (null == prev || !prev.getDataVersion().equals(dataVersion)) {
-            return true;
+        if (prev != null) {
+            return prev.getDataVersion();
         }
+        return null;
+    }
 
-        return false;
+    public void updateBrokerInfoUpdateTimestamp(final String brokerAddr) {
+        BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
+        if (prev != null) {
+            prev.setLastUpdateTimestamp(System.currentTimeMillis());
+        }
     }
 
     private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {

-- 
To stop receiving notification emails like this one, please contact
yukon@apache.org.