You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/10/15 14:39:56 UTC

[GitHub] duhengforever closed pull request #445: improved HTTP functionality for broker,nameServer and java client

duhengforever closed pull request #445: improved HTTP functionality for broker,nameServer and java client
URL: https://github.com/apache/rocketmq/pull/445
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/broker/pom.xml b/broker/pom.xml
index f10ae5373..ebf258f48 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -15,7 +15,8 @@
   limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index f45674d6e..50067225a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -100,11 +100,12 @@
 import org.apache.rocketmq.store.stats.BrokerStats;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
-
 public class BrokerController {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
-    private static final InternalLogger LOG_WATER_MARK = InternalLoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);
+    private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory
+        .getLogger(LoggerName.PROTECTION_LOGGER_NAME);
+    private static final InternalLogger LOG_WATER_MARK = InternalLoggerFactory
+        .getLogger(LoggerName.WATER_MARK_LOGGER_NAME);
     private final BrokerConfig brokerConfig;
     private final NettyServerConfig nettyServerConfig;
     private final NettyClientConfig nettyClientConfig;
@@ -122,8 +123,8 @@
     private final ConsumerIdsChangeListener consumerIdsChangeListener;
     private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
     private final BrokerOuterAPI brokerOuterAPI;
-    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
-        "BrokerControllerScheduledThread"));
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+        .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("BrokerControllerScheduledThread"));
     private final SlaveSynchronize slaveSynchronize;
     private final BlockingQueue<Runnable> sendThreadPoolQueue;
     private final BlockingQueue<Runnable> pullThreadPoolQueue;
@@ -156,12 +157,8 @@
     private TransactionalMessageService transactionalMessageService;
     private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
 
-    public BrokerController(
-        final BrokerConfig brokerConfig,
-        final NettyServerConfig nettyServerConfig,
-        final NettyClientConfig nettyClientConfig,
-        final MessageStoreConfig messageStoreConfig
-    ) {
+    public BrokerController(final BrokerConfig brokerConfig, final NettyServerConfig nettyServerConfig,
+        final NettyClientConfig nettyClientConfig, final MessageStoreConfig messageStoreConfig) {
         this.brokerConfig = brokerConfig;
         this.nettyServerConfig = nettyServerConfig;
         this.nettyClientConfig = nettyClientConfig;
@@ -183,22 +180,26 @@ public BrokerController(
 
         this.slaveSynchronize = new SlaveSynchronize(this);
 
-        this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
-        this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
-        this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
-        this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
-        this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
-        this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
+        this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(
+            this.brokerConfig.getSendThreadPoolQueueCapacity());
+        this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(
+            this.brokerConfig.getPullThreadPoolQueueCapacity());
+        this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(
+            this.brokerConfig.getQueryThreadPoolQueueCapacity());
+        this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(
+            this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
+        this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(
+            this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
+        this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(
+            this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
 
         this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
-        this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
+        this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(),
+            this.getNettyServerConfig().getListenPort()));
 
         this.brokerFastFailure = new BrokerFastFailure(this);
-        this.configuration = new Configuration(
-            log,
-            BrokerPathConfigHelper.getBrokerConfigPath(),
-            this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
-        );
+        this.configuration = new Configuration(log, BrokerPathConfigHelper.getBrokerConfigPath(), this.brokerConfig,
+            this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig);
     }
 
     public BrokerConfig getBrokerConfig() {
@@ -226,14 +227,15 @@ public boolean initialize() throws CloneNotSupportedException {
 
         if (result) {
             try {
-                this.messageStore =
-                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
-                        this.brokerConfig);
+                this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,
+                    this.messageArrivingListener, this.brokerConfig);
                 this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
-                //load plugin
-                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
+                // load plugin
+                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig,
+                    brokerStatsManager, messageArrivingListener, brokerConfig);
                 this.messageStore = MessageStoreFactory.build(context, this.messageStore);
-                this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
+                this.messageStore.getDispatcherList()
+                    .addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
             } catch (IOException e) {
                 result = false;
                 log.error("Failed to initialize", e);
@@ -246,55 +248,37 @@ public boolean initialize() throws CloneNotSupportedException {
             this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
             NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
             fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
+            fastConfig.setHttpListenPort(nettyServerConfig.getHttpListenPort() - 2);
             this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
             this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                this.brokerConfig.getSendMessageThreadPoolNums(),
-                this.brokerConfig.getSendMessageThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.sendThreadPoolQueue,
+                this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(),
+                1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue,
                 new ThreadFactoryImpl("SendMessageThread_"));
 
             this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                this.brokerConfig.getPullMessageThreadPoolNums(),
-                this.brokerConfig.getPullMessageThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.pullThreadPoolQueue,
+                this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(),
+                1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue,
                 new ThreadFactoryImpl("PullMessageThread_"));
 
             this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                 this.brokerConfig.getQueryMessageThreadPoolNums(),
-                this.brokerConfig.getQueryMessageThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.queryThreadPoolQueue,
-                new ThreadFactoryImpl("QueryMessageThread_"));
-
-            this.adminBrokerExecutor =
-                Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
-                    "AdminBrokerThread_"));
-
-            this.clientManageExecutor = new ThreadPoolExecutor(
-                this.brokerConfig.getClientManageThreadPoolNums(),
-                this.brokerConfig.getClientManageThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.clientManagerThreadPoolQueue,
-                new ThreadFactoryImpl("ClientManageThread_"));
-
-            this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
-                this.brokerConfig.getHeartbeatThreadPoolNums(),
-                this.brokerConfig.getHeartbeatThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.heartbeatThreadPoolQueue,
-                new ThreadFactoryImpl("HeartbeatThread_",true));
-
-
-            this.consumerManageExecutor =
-                Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
-                    "ConsumerManageThread_"));
+                this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS,
+                this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_"));
+
+            this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(),
+                new ThreadFactoryImpl("AdminBrokerThread_"));
+
+            this.clientManageExecutor = new ThreadPoolExecutor(this.brokerConfig.getClientManageThreadPoolNums(),
+                this.brokerConfig.getClientManageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS,
+                this.clientManagerThreadPoolQueue, new ThreadFactoryImpl("ClientManageThread_"));
+
+            this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(this.brokerConfig.getHeartbeatThreadPoolNums(),
+                this.brokerConfig.getHeartbeatThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS,
+                this.heartbeatThreadPoolQueue, new ThreadFactoryImpl("HeartbeatThread_", true));
+
+            this.consumerManageExecutor = Executors.newFixedThreadPool(
+                this.brokerConfig.getConsumerManageThreadPoolNums(),
+                new ThreadFactoryImpl("ConsumerManageThread_"));
 
             this.registerProcessor();
 
@@ -360,7 +344,8 @@ public void run() {
                 @Override
                 public void run() {
                     try {
-                        log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
+                        log.info("dispatch behind commit log {} bytes",
+                            BrokerController.this.getMessageStore().dispatchBehindBytes());
                     } catch (Throwable e) {
                         log.error("schedule dispatchBehindBytes error.", e);
                     }
@@ -370,6 +355,10 @@ public void run() {
             if (this.brokerConfig.getNamesrvAddr() != null) {
                 this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                 log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
+            }
+            if (this.brokerConfig.getNamesrvHttpAddr() != null) {
+                this.brokerOuterAPI.updateNameServerHttpAddressList(this.brokerConfig.getNamesrvHttpAddr());
+                log.info("Set user specified name server http address: {}", this.brokerConfig.getNamesrvHttpAddr());
             } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
@@ -385,7 +374,8 @@ public void run() {
             }
 
             if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
-                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
+                if (this.messageStoreConfig.getHaMasterAddress() != null
+                    && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                     this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                     this.updateMasterHAServerAddrPeriodically = false;
                 } else {
@@ -420,12 +410,9 @@ public void run() {
             if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                 // Register a listener to reload SslContext
                 try {
-                    fileWatchService = new FileWatchService(
-                        new String[] {
-                            TlsSystemConfig.tlsServerCertPath,
-                            TlsSystemConfig.tlsServerKeyPath,
-                            TlsSystemConfig.tlsServerTrustCertPath
-                        },
+                    fileWatchService = new FileWatchService(new String[] {
+                        TlsSystemConfig.tlsServerCertPath,
+                        TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath},
                         new FileWatchService.Listener() {
                             boolean certChanged, keyChanged = false;
 
@@ -463,15 +450,20 @@ private void reloadServerSslContext() {
     }
 
     private void initialTransaction() {
-        this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
+        this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID,
+            TransactionalMessageService.class);
         if (null == this.transactionalMessageService) {
-            this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
-            log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
+            this.transactionalMessageService = new TransactionalMessageServiceImpl(
+                new TransactionalMessageBridge(this, this.getMessageStore()));
+            log.warn("Load default transaction message hook service: {}",
+                TransactionalMessageServiceImpl.class.getSimpleName());
         }
-        this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
+        this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID,
+            AbstractTransactionalMessageCheckListener.class);
         if (null == this.transactionalMessageCheckListener) {
             this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
-            log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
+            log.warn("Load default discard message hook service: {}",
+                DefaultTransactionalMessageCheckListener.class.getSimpleName());
         }
         this.transactionalMessageCheckListener.setBrokerController(this);
         this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
@@ -488,15 +480,19 @@ public void registerProcessor() {
         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
         this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor,
+            this.sendMessageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor,
+            this.sendMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor,
+            this.sendMessageExecutor);
         /**
          * PullMessageProcessor
          */
-        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor,
+            this.pullMessageExecutor);
         this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
 
         /**
@@ -504,40 +500,54 @@ public void registerProcessor() {
          */
         NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
         this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor,
+            this.queryMessageExecutor);
 
         this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor,
+            this.queryMessageExecutor);
 
         /**
          * ClientManageProcessor
          */
         ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
         this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
-        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor,
+            this.clientManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor,
+            this.clientManageExecutor);
 
         this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor,
+            this.clientManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor,
+            this.clientManageExecutor);
 
         /**
          * ConsumerManageProcessor
          */
         ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
-        this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
-
-        this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor,
+            this.consumerManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor,
+            this.consumerManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor,
+            this.consumerManageExecutor);
+
+        this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor,
+            this.consumerManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor,
+            this.consumerManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor,
+            this.consumerManageExecutor);
 
         /**
          * EndTransactionProcessor
          */
-        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
-        this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this),
+            this.sendMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this),
+            this.sendMessageExecutor);
 
         /**
          * Default
@@ -557,14 +567,16 @@ public void setBrokerStats(BrokerStats brokerStats) {
 
     public void protectBroker() {
         if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {
-            final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();
+            final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager
+                .getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();
             while (it.hasNext()) {
                 final Map.Entry<String, MomentStatsItem> next = it.next();
                 final long fallBehindBytes = next.getValue().getValue().get();
                 if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {
                     final String[] split = next.getValue().getStatsKey().split("@");
                     final String group = split[2];
-                    LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);
+                    LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group,
+                        fallBehindBytes);
                     this.subscriptionGroupManager.disableConsume(group);
                 }
             }
@@ -599,9 +611,12 @@ public long headSlowTimeMills4QueryThreadPoolQueue() {
     }
 
     public void printWaterMark() {
-        LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
-        LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
-        LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
+        LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(),
+            headSlowTimeMills4SendThreadPoolQueue());
+        LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(),
+            headSlowTimeMills4PullThreadPoolQueue());
+        LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(),
+            headSlowTimeMills4QueryThreadPoolQueue());
     }
 
     public MessageStore getMessageStore() {
@@ -744,17 +759,18 @@ public void shutdown() {
     }
 
     private void unregisterBrokerAll() {
-        this.brokerOuterAPI.unregisterBrokerAll(
-            this.brokerConfig.getBrokerClusterName(),
-            this.getBrokerAddr(),
-            this.brokerConfig.getBrokerName(),
-            this.brokerConfig.getBrokerId());
+        this.brokerOuterAPI.unregisterBrokerAll(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(),
+            this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId());
     }
 
     public String getBrokerAddr() {
         return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort();
     }
 
+    public String getBrokerHttpAddr() {
+        return "@" + this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getHttpListenPort();
+    }
+
     public void start() throws Exception {
         if (this.messageStore != null) {
             this.messageStore.start();
@@ -822,9 +838,8 @@ public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, Da
         TopicConfig registerTopicConfig = topicConfig;
         if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
             || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
-            registerTopicConfig =
-                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
-                    this.brokerConfig.getBrokerPermission());
+            registerTopicConfig = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(),
+                topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission());
         }
 
         ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
@@ -837,24 +852,22 @@ public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, Da
     }
 
     public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
-        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
+        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager()
+            .buildTopicConfigSerializeWrapper();
 
         if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
             || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
             ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
             for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
-                TopicConfig tmp =
-                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
-                        this.brokerConfig.getBrokerPermission());
+                TopicConfig tmp = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(),
+                    topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission());
                 topicConfigTable.put(topicConfig.getTopicName(), tmp);
             }
             topicConfigWrapper.setTopicConfigTable(topicConfigTable);
         }
 
-        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
-            this.getBrokerAddr(),
-            this.brokerConfig.getBrokerName(),
-            this.brokerConfig.getBrokerId(),
+        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(),
+            this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(),
             this.brokerConfig.getRegisterBrokerTimeoutMills())) {
             doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
         }
@@ -863,16 +876,10 @@ public synchronized void registerBrokerAll(final boolean checkOrderConfig, boole
     private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
         TopicConfigSerializeWrapper topicConfigWrapper) {
         List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
-            this.brokerConfig.getBrokerClusterName(),
-            this.getBrokerAddr(),
-            this.brokerConfig.getBrokerName(),
-            this.brokerConfig.getBrokerId(),
-            this.getHAServerAddr(),
-            topicConfigWrapper,
-            this.filterServerManager.buildNewFilterServerList(),
-            oneway,
-            this.brokerConfig.getRegisterBrokerTimeoutMills(),
-            this.brokerConfig.isCompressedRegister());
+            this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.getBrokerHttpAddr(), this.brokerConfig.getBrokerName(),
+            this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper,
+            this.filterServerManager.buildNewFilterServerList(), oneway,
+            this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isCompressedRegister());
 
         if (registerBrokerResultList.size() > 0) {
             RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
@@ -890,14 +897,13 @@ private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
         }
     }
 
-    private boolean needRegister(final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final long brokerId,
-        final int timeoutMills) {
+    private boolean needRegister(final String clusterName, final String brokerAddr, final String brokerName,
+        final long brokerId, final int timeoutMills) {
 
-        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
-        List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
+        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager()
+            .buildTopicConfigSerializeWrapper();
+        List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId,
+            topicConfigWrapper, timeoutMills);
         boolean needRegister = false;
         for (Boolean changed : changeList) {
             if (changed) {
@@ -1006,8 +1012,7 @@ public TransactionalMessageCheckService getTransactionalMessageCheckService() {
         return transactionalMessageCheckService;
     }
 
-    public void setTransactionalMessageCheckService(
-        TransactionalMessageCheckService transactionalMessageCheckService) {
+    public void setTransactionalMessageCheckService(TransactionalMessageCheckService transactionalMessageCheckService) {
         this.transactionalMessageCheckService = transactionalMessageCheckService;
     }
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 4b986c0d2..1e9ef751a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -162,6 +162,21 @@ public static BrokerController createBrokerController(String[] args) {
                 }
             }
 
+            String namesrvHttpAddr = brokerConfig.getNamesrvHttpAddr();
+            if (null != namesrvHttpAddr) {
+                try {
+                    String[] addrArray = namesrvHttpAddr.split(";");
+                    for (String addr : addrArray) {
+                        RemotingUtil.string2SocketAddress(addr);
+                    }
+                } catch (Exception e) {
+                    System.out.printf(
+                        "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
+                        namesrvAddr);
+                    System.exit(-3);
+                }
+            }
+
             switch (messageStoreConfig.getBrokerRole()) {
                 case ASYNC_MASTER:
                 case SYNC_MASTER:
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 61ceae538..ebc3c2e16 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -44,6 +44,7 @@
     private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
         new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
     private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
+
     public ProducerManager() {
     }
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
index 4c409f225..8da0603f5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
@@ -75,7 +75,7 @@ public void checkProducerTransactionState(
     }
 
     public RemotingCommand callClient(final Channel channel,
-                                      final RemotingCommand request
+        final RemotingCommand request
     ) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
         return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
     }
@@ -105,7 +105,7 @@ public RemotingCommand resetOffset(String topic, String group, long timeStamp, b
     }
 
     public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
-                                       boolean isC) {
+        boolean isC) {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
index 3f4d24d06..2fb2d37e1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.broker.filtersrv;
 
-
 import org.apache.rocketmq.logging.InternalLogger;
 
 public class FilterServerUtil {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 0a8beca2c..1ecb1987d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -29,8 +29,8 @@
 import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
 
 /**
- * BrokerFastFailure will cover {@link BrokerController#sendThreadPoolQueue} and
- * {@link BrokerController#pullThreadPoolQueue}
+ * BrokerFastFailure will cover {@link BrokerController#sendThreadPoolQueue} and {@link
+ * BrokerController#pullThreadPoolQueue}
  */
 public class BrokerFastFailure {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 4dee01cbf..84c697c08 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -111,9 +111,20 @@ public void updateNameServerAddressList(final String addrs) {
         this.remotingClient.updateNameServerAddressList(lst);
     }
 
+    public void updateNameServerHttpAddressList(final String addrs) {
+        List<String> lst = new ArrayList<String>();
+        String[] addrArray = addrs.split(";");
+        for (String addr : addrArray) {
+            lst.add("@" + addr);
+        }
+
+        this.remotingClient.updateNameServerHttpAddressList(lst);
+    }
+
     public List<RegisterBrokerResult> registerBrokerAll(
         final String clusterName,
         final String brokerAddr,
+        final String brokerHttpAddr,
         final String brokerName,
         final long brokerId,
         final String haServerAddr,
@@ -134,6 +145,7 @@ public void updateNameServerAddressList(final String addrs) {
             requestHeader.setClusterName(clusterName);
             requestHeader.setHaServerAddr(haServerAddr);
             requestHeader.setCompressed(compressed);
+            requestHeader.setBrokerHttpAddr(brokerHttpAddr);
 
             RegisterBrokerBody requestBody = new RegisterBrokerBody();
             requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
@@ -147,7 +159,7 @@ public void updateNameServerAddressList(final String addrs) {
                     @Override
                     public void run() {
                         try {
-                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
+                            RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                             if (result != null) {
                                 registerBrokerResultList.add(result);
                             }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 1a704a8c6..e797c3d52 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -246,7 +246,7 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
 
         this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
 
-        this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
+        this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
 
         return null;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index c9e85ed1a..8b3fad0fe 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -54,7 +54,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
         RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final EndTransactionRequestHeader requestHeader =
-            (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
+            (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
         LOGGER.info("Transaction request:{}", requestHeader);
         if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
             response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index b7e7a6187..5f1c2f1a3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -63,7 +63,7 @@ public SendMessageProcessor(final BrokerController brokerController) {
 
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx,
-                                          RemotingCommand request) throws RemotingCommandException {
+        RemotingCommand request) throws RemotingCommandException {
         SendMessageContext mqtraceContext;
         switch (request.getCode()) {
             case RequestCode.CONSUMER_SEND_MSG_BACK:
@@ -99,7 +99,7 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin
         throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final ConsumerSendMsgBackRequestHeader requestHeader =
-            (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
+            (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
 
         if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
 
@@ -249,8 +249,8 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin
     }
 
     private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
-                                      RemotingCommand request,
-                                      MessageExt msg, TopicConfig topicConfig) {
+        RemotingCommand request,
+        MessageExt msg, TopicConfig topicConfig) {
         String newTopic = requestHeader.getTopic();
         if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
             String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
@@ -293,12 +293,12 @@ private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, Remoti
     }
 
     private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
-                                        final RemotingCommand request,
-                                        final SendMessageContext sendMessageContext,
-                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
+        final RemotingCommand request,
+        final SendMessageContext sendMessageContext,
+        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
 
         final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
-        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
+        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
 
         response.setOpaque(request.getOpaque());
 
@@ -366,9 +366,9 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
     }
 
     private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
-                                                   RemotingCommand request, MessageExt msg,
-                                                   SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
-                                                   int queueIdInt) {
+        RemotingCommand request, MessageExt msg,
+        SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
+        int queueIdInt) {
         if (putMessageResult == null) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark("store putMessage return null");
@@ -448,7 +448,7 @@ private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult
 
                 int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
                 int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
-                int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+                int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
 
                 sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
                 sendMessageContext.setCommercialSendTimes(incValue);
@@ -459,7 +459,7 @@ private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult
         } else {
             if (hasSendMessageHook()) {
                 int wroteSize = request.getBody().length;
-                int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+                int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
 
                 sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
                 sendMessageContext.setCommercialSendTimes(incValue);
@@ -471,12 +471,12 @@ private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult
     }
 
     private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
-                                             final RemotingCommand request,
-                                             final SendMessageContext sendMessageContext,
-                                             final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
+        final RemotingCommand request,
+        final SendMessageContext sendMessageContext,
+        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
 
         final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
-        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
+        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
 
         response.setOpaque(request.getOpaque());
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 59c7895eb..a6a98ddde 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -210,7 +210,7 @@ public TopicConfig createTopicInSendMessageMethod(final String topic, final Stri
         }
 
         if (createNew) {
-            this.brokerController.registerBrokerAll(false, true,true);
+            this.brokerController.registerBrokerAll(false, true, true);
         }
 
         return topicConfig;
@@ -254,7 +254,7 @@ public TopicConfig createTopicInSendMessageBackMethod(
         }
 
         if (createNew) {
-            this.brokerController.registerBrokerAll(false, true,true);
+            this.brokerController.registerBrokerAll(false, true, true);
         }
 
         return topicConfig;
@@ -279,7 +279,7 @@ public void updateTopicUnitFlag(final String topic, final boolean unit) {
             this.dataVersion.nextVersion();
 
             this.persist();
-            this.brokerController.registerBrokerAll(false, true,true);
+            this.brokerController.registerBrokerAll(false, true, true);
         }
     }
 
@@ -299,7 +299,7 @@ public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub)
             this.dataVersion.nextVersion();
 
             this.persist();
-            this.brokerController.registerBrokerAll(false, true,true);
+            this.brokerController.registerBrokerAll(false, true, true);
         }
     }
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/util/PositiveAtomicCounter.java b/broker/src/main/java/org/apache/rocketmq/broker/util/PositiveAtomicCounter.java
index 8d92f4308..794517c25 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/util/PositiveAtomicCounter.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/util/PositiveAtomicCounter.java
@@ -22,18 +22,15 @@
     private static final int MASK = 0x7FFFFFFF;
     private final AtomicInteger atom;
 
-
     public PositiveAtomicCounter() {
         atom = new AtomicInteger(0);
     }
 
-
     public final int incrementAndGet() {
         final int rt = atom.incrementAndGet();
         return rt & MASK;
     }
 
-
     public int intValue() {
         return atom.intValue();
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
index 59be7a7e2..7e7f66f30 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
@@ -1,10 +1,13 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to
- * You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License.  You may obtain a copy of the License at
  * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
 package org.apache.rocketmq.broker.util;
@@ -108,7 +111,7 @@ protected static InputStream getResourceAsStream(ClassLoader loader, String name
                         names.add(serviceName);
                     }
 
-                    services.add((T)initService(getContextClassLoader(), serviceName, clazz));
+                    services.add((T) initService(getContextClassLoader(), serviceName, clazz));
 
                     serviceName = reader.readLine();
                 }
@@ -160,10 +163,11 @@ protected static InputStream getResourceAsStream(ClassLoader loader, String name
                         // This indicates a problem with the ClassLoader tree. An incompatible ClassLoader was used to load the implementation.
                         LOG.error(
                             "Class {} loaded from classloader {} does not extend {} as loaded by this classloader.",
-                            new Object[] {serviceClazz.getName(),
+                            new Object[] {
+                                serviceClazz.getName(),
                                 objectId(serviceClazz.getClassLoader()), clazz.getName()});
                     }
-                    return (T)serviceClazz.newInstance();
+                    return (T) serviceClazz.newInstance();
                 } catch (ClassNotFoundException ex) {
                     if (classLoader == thisClassLoader) {
                         // Nothing more to try, onwards.
@@ -186,6 +190,6 @@ protected static InputStream getResourceAsStream(ClassLoader loader, String name
         } catch (Exception e) {
             LOG.error("Unable to init service.", e);
         }
-        return (T)serviceClazz;
+        return (T) serviceClazz;
     }
 }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
index 68d58ef44..e10e49700 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -63,6 +63,7 @@
     private String clusterName = "clusterName";
     private String brokerName = "brokerName";
     private String brokerAddr = "brokerAddr";
+    private String brokerHttpAddr = "brokerAddr";
     private long brokerId = 0L;
     private String nameserver1 = "127.0.0.1";
     private String nameserver2 = "127.0.0.2";
@@ -146,7 +147,7 @@ public void test_register_normal() throws Exception {
 
         when(nettyRemotingClient.getNameServerAddressList()).thenReturn(Lists.asList(nameserver1, nameserver2, new String[] {nameserver3}));
         when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response);
-        List<RegisterBrokerResult> registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), false, timeOut, true);
+        List<RegisterBrokerResult> registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerHttpAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), false, timeOut, true);
 
         assertTrue(registerBrokerResultList.size() > 0);
     }
@@ -177,7 +178,7 @@ public RemotingCommand answer(InvocationOnMock invocation) throws Throwable {
                 return response;
             }
         });
-        List<RegisterBrokerResult> registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), false, timeOut, true);
+        List<RegisterBrokerResult> registerBrokerResultList = brokerOuterAPI.registerBrokerAll(clusterName, brokerAddr, brokerHttpAddr, brokerName, brokerId, "hasServerAddr", topicConfigSerializeWrapper, Lists.<String>newArrayList(), false, timeOut, true);
 
         assertEquals(2, registerBrokerResultList.size());
     }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
index e544d90a1..280cee32c 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
@@ -120,7 +120,7 @@ public MessageExtBrokerInner buildMessage() {
     }
 
     public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize,
-                                               boolean enableCqExt, int cqExtFileSize) {
+        boolean enableCqExt, int cqExtFileSize) {
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize);
         messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize);
@@ -150,7 +150,7 @@ protected DefaultMessageStore gen(ConsumerFilterManager filterManager) throws Ex
             new MessageArrivingListener() {
                 @Override
                 public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
-                                     long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
+                    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
                 }
             }
             , brokerConfig);
@@ -174,7 +174,7 @@ public void dispatch(DispatchRequest request) {
     }
 
     protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount,
-                                                 int msgCountPerTopic) throws Exception {
+        int msgCountPerTopic) throws Exception {
         List<MessageExtBrokerInner> msgs = new ArrayList<MessageExtBrokerInner>();
         for (int i = 0; i < topicCount; i++) {
             String realTopic = topic + i;
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
index 019cc6a31..32a633238 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
@@ -76,7 +76,7 @@ public void init() {
         endTransactionProcessor = new EndTransactionProcessor(brokerController);
     }
 
-    private OperationResult createResponse(int status){
+    private OperationResult createResponse(int status) {
         OperationResult response = new OperationResult();
         response.setPrepareMessage(createDefaultMessageExt());
         response.setResponseCode(status);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index 792fd0fa2..80c196497 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -207,6 +207,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
         assertThat(response[0].getCode()).isEqualTo(ResponseCode.SUCCESS);
 
     }
+
     private RemotingCommand createSendTransactionMsgCommand(int requestCode) {
         SendMessageRequestHeader header = createSendMsgRequestHeader();
         int sysFlag = header.getSysFlag();
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java
index 17bf00b43..ba3f17e8b 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java
@@ -40,7 +40,6 @@
     private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(),
         new NettyClientConfig(), new MessageStoreConfig());
 
-
     @Before
     public void init() {
         listener = new DefaultTransactionalMessageCheckListener();
@@ -53,21 +52,21 @@ public void testResolveHalfMsg() {
     }
 
     @Test
-    public void testSendCheckMessage() throws Exception{
+    public void testSendCheckMessage() throws Exception {
         MessageExt messageExt = createMessageExt();
         listener.sendCheckMessage(messageExt);
     }
 
     @Test
-    public void sendCheckMessage(){
+    public void sendCheckMessage() {
         listener.resolveDiscardMsg(createMessageExt());
     }
 
     private MessageExtBrokerInner createMessageExt() {
         MessageExtBrokerInner inner = new MessageExtBrokerInner();
-        MessageAccessor.putProperty(inner,MessageConst.PROPERTY_REAL_QUEUE_ID,"1");
-        MessageAccessor.putProperty(inner,MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,"1234255");
-        MessageAccessor.putProperty(inner,MessageConst.PROPERTY_REAL_TOPIC,"realTopic");
+        MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_QUEUE_ID, "1");
+        MessageAccessor.putProperty(inner, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "1234255");
+        MessageAccessor.putProperty(inner, MessageConst.PROPERTY_REAL_TOPIC, "realTopic");
         inner.setTransactionId(inner.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
         inner.setBody("check".getBytes());
         inner.setMsgId("12344567890");
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java
index b1c669c99..5f59d2d79 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java
@@ -110,14 +110,14 @@ public void updateConsumeOffset() {
 
     @Test
     public void testGetHalfMessage() {
-        when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(),  ArgumentMatchers.nullable(MessageFilter.class))).thenReturn(createGetMessageResult(GetMessageStatus.NO_MESSAGE_IN_QUEUE));
+        when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class))).thenReturn(createGetMessageResult(GetMessageStatus.NO_MESSAGE_IN_QUEUE));
         PullResult result = transactionBridge.getHalfMessage(0, 0, 1);
         assertThat(result.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
     }
 
     @Test
     public void testGetOpMessage() {
-        when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(),  ArgumentMatchers.nullable(MessageFilter.class))).thenReturn(createGetMessageResult(GetMessageStatus.NO_MESSAGE_IN_QUEUE));
+        when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class))).thenReturn(createGetMessageResult(GetMessageStatus.NO_MESSAGE_IN_QUEUE));
         PullResult result = transactionBridge.getOpMessage(0, 0, 1);
         assertThat(result.getPullStatus()).isEqualTo(PullStatus.NO_NEW_MSG);
     }
@@ -143,25 +143,24 @@ public void testRenewImmunityHalfMessageInner() {
         MessageExt messageExt = createMessageBrokerInner();
         final String offset = "123456789";
         MessageExtBrokerInner msgInner = transactionBridge.renewImmunityHalfMessageInner(messageExt);
-        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET,offset);
+        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET, offset);
         assertThat(msgInner).isNotNull();
-        Map<String,String> properties = msgInner.getProperties();
+        Map<String, String> properties = msgInner.getProperties();
         assertThat(properties).isNotNull();
         String resOffset = properties.get(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
         assertThat(resOffset).isEqualTo(offset);
     }
 
-
     @Test
     public void testRenewHalfMessageInner() {
         MessageExt messageExt = new MessageExt();
         long bornTimeStamp = messageExt.getBornTimestamp();
         MessageExt messageExtRes = transactionBridge.renewHalfMessageInner(messageExt);
-        assertThat( messageExtRes.getBornTimestamp()).isEqualTo(bornTimeStamp);
+        assertThat(messageExtRes.getBornTimestamp()).isEqualTo(bornTimeStamp);
     }
 
     @Test
-    public void testLookMessageByOffset(){
+    public void testLookMessageByOffset() {
         when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
         MessageExt messageExt = transactionBridge.lookMessageByOffset(123);
         assertThat(messageExt).isNotNull();
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java b/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java
index 3fedf7722..0e99544a3 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/util/TransactionalMessageServiceImpl.java
@@ -51,7 +51,8 @@ public OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader
     }
 
     @Override
-    public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {
+    public void check(long transactionTimeout, int transactionCheckMax,
+        AbstractTransactionalMessageCheckListener listener) {
         log.warn("check check!");
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index d798164cd..3dc2fe290 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -49,6 +49,8 @@
 
     private boolean useTLS = TlsSystemConfig.tlsEnable;
 
+    private boolean useHttp = false;
+
     private LanguageCode language = LanguageCode.JAVA;
 
     public String buildMQClientId() {
@@ -99,6 +101,7 @@ public void resetClientConfig(final ClientConfig cc) {
         this.unitName = cc.unitName;
         this.vipChannelEnabled = cc.vipChannelEnabled;
         this.useTLS = cc.useTLS;
+        this.useHttp = cc.useTLS;
         this.language = cc.language;
     }
 
@@ -115,6 +118,7 @@ public ClientConfig cloneClientConfig() {
         cc.unitName = unitName;
         cc.vipChannelEnabled = vipChannelEnabled;
         cc.useTLS = useTLS;
+        cc.useHttp = useHttp;
         cc.language = language;
         return cc;
     }
@@ -191,6 +195,14 @@ public void setUseTLS(boolean useTLS) {
         this.useTLS = useTLS;
     }
 
+    public boolean isUseHttp() {
+        return useHttp;
+    }
+
+    public void setUseHttp(boolean useHttp) {
+        this.useHttp = useHttp;
+    }
+
     public LanguageCode getLanguage() {
         return language;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 1b3885629..fb47274b5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -98,16 +98,16 @@
     private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
     private final Lock lockNamesrv = new ReentrantLock();
     private final Lock lockHeartbeat = new ReentrantLock();
-    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
-        new ConcurrentHashMap<String, HashMap<Long, String>>();
-    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
-        new ConcurrentHashMap<String, HashMap<String, Integer>>();
-    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-        @Override
-        public Thread newThread(Runnable r) {
-            return new Thread(r, "MQClientFactoryScheduledThread");
-        }
-    });
+    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
+    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerHttpAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
+    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable = new ConcurrentHashMap<String, HashMap<String, Integer>>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+        .newSingleThreadScheduledExecutor(new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "MQClientFactoryScheduledThread");
+            }
+        });
     private final ClientRemotingProcessor clientRemotingProcessor;
     private final PullMessageService pullMessageService;
     private final RebalanceService rebalanceService;
@@ -129,7 +129,8 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli
         this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
         this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
         this.clientRemotingProcessor = new ClientRemotingProcessor(this);
-        this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
+        this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook,
+            clientConfig);
 
         if (this.clientConfig.getNamesrvAddr() != null) {
             this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
@@ -149,11 +150,11 @@ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String cli
 
         this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
 
-        log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
-            this.instanceIndex,
-            this.clientId,
-            this.clientConfig,
-            MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
+        log.info(
+            "Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
+            this.instanceIndex, this.clientId, this.clientConfig,
+            MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION),
+            RemotingCommand.getSerializeTypeConfigInThisServer());
     }
 
     public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
@@ -248,7 +249,8 @@ public void start() throws MQClientException {
                 case SHUTDOWN_ALREADY:
                     break;
                 case START_FAILED:
-                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
+                    throw new MQClientException(
+                        "The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                 default:
                     break;
             }
@@ -371,7 +373,8 @@ private void cleanOfflineBroker() {
                 try {
                     ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
 
-                    Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
+                    Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet()
+                        .iterator();
                     while (itBrokerTable.hasNext()) {
                         Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();
                         String brokerName = entry.getKey();
@@ -429,17 +432,18 @@ public void checkClientInBroker() throws MQClientException {
 
                 if (addr != null) {
                     try {
-                        this.getMQClientAPIImpl().checkClientInBroker(
-                            addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000
-                        );
+                        this.getMQClientAPIImpl().checkClientInBroker(addr, entry.getKey(), this.clientId,
+                            subscriptionData, 3 * 1000);
                     } catch (Exception e) {
                         if (e instanceof MQClientException) {
                             throw (MQClientException) e;
                         } else {
                             throw new MQClientException("Check client in broker error, maybe because you use "
-                                + subscriptionData.getExpressionType() + " to filter message, but server has not been upgraded to support!"
-                                + "This error would not affect the launch of consumer, but may has impact on message receiving if you " +
-                                "have use the new features which are not supported by server, please check the log!", e);
+                                + subscriptionData.getExpressionType()
+                                + " to filter message, but server has not been upgraded to support!"
+                                + "This error would not affect the launch of consumer, but may has impact on message receiving if you "
+                                + "have use the new features which are not supported by server, please check the log!",
+                                e);
                         }
                     }
                 }
@@ -550,8 +554,9 @@ private void sendHeartbeatToAllBroker() {
                                 if (this.isBrokerInNameServer(addr)) {
                                     log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
                                 } else {
-                                    log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
-                                        id, addr);
+                                    log.info(
+                                        "send heart beat to broker[{} {} {}] exception, because the broker not up, forget it",
+                                        brokerName, id, addr);
                                 }
                             }
                         }
@@ -592,11 +597,12 @@ public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean is
                 try {
                     TopicRouteData topicRouteData;
                     if (isDefault && defaultMQProducer != null) {
-                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
-                            1000 * 3);
+                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(
+                            defaultMQProducer.getCreateTopicKey(), 1000 * 3);
                         if (topicRouteData != null) {
                             for (QueueData data : topicRouteData.getQueueDatas()) {
-                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
+                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(),
+                                    data.getReadQueueNums());
                                 data.setReadQueueNums(queueNums);
                                 data.setWriteQueueNums(queueNums);
                             }
@@ -618,6 +624,9 @@ public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean is
 
                             for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
+                                if (bd.getBrokerHttpAddrs() != null && !bd.getBrokerHttpAddrs().isEmpty()) {
+                                    this.brokerHttpAddrTable.put(bd.getBrokerName(), bd.getBrokerHttpAddrs());
+                                }
                             }
 
                             // Update Pub info
@@ -636,7 +645,8 @@ public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean is
 
                             // Update sub info
                             {
-                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
+                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic,
+                                    topicRouteData);
                                 Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                 while (it.hasNext()) {
                                     Entry<String, MQConsumerInner> entry = it.next();
@@ -651,10 +661,13 @@ public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean is
                             return true;
                         }
                     } else {
-                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
+                        log.warn(
+                            "updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}",
+                            topic);
                     }
                 } catch (Exception e) {
-                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
+                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
+                        && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                         log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                     }
                 } finally {
@@ -722,33 +735,31 @@ private boolean isBrokerInNameServer(final String brokerAddr) {
     }
 
     private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,
-        final String topic,
-        final String filterClassSource) throws UnsupportedEncodingException {
+        final String topic, final String filterClassSource) throws UnsupportedEncodingException {
         byte[] classBody = null;
         int classCRC = 0;
         try {
             classBody = filterClassSource.getBytes(MixAll.DEFAULT_CHARSET);
             classCRC = UtilAll.crc32(classBody);
         } catch (Exception e1) {
-            log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}",
-                fullClassName,
+            log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", fullClassName,
                 RemotingHelper.exceptionSimpleDesc(e1));
         }
 
         TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
-        if (topicRouteData != null
-            && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
+        if (topicRouteData != null && topicRouteData.getFilterServerTable() != null
+            && !topicRouteData.getFilterServerTable().isEmpty()) {
             Iterator<Entry<String, List<String>>> it = topicRouteData.getFilterServerTable().entrySet().iterator();
             while (it.hasNext()) {
                 Entry<String, List<String>> next = it.next();
                 List<String> value = next.getValue();
                 for (final String fsAddr : value) {
                     try {
-                        this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody,
-                            5000);
+                        this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName,
+                            classCRC, classBody, 5000);
 
-                        log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}", fsAddr, consumerGroup,
-                            topic, fullClassName);
+                        log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}",
+                            fsAddr, consumerGroup, topic, fullClassName);
 
                     } catch (Exception e) {
                         log.error("uploadFilterClassToAllFilterServer Exception", e);
@@ -756,7 +767,8 @@ private void uploadFilterClassToAllFilterServer(final String consumerGroup, fina
                 }
             }
         } else {
-            log.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}",
+            log.warn(
+                "register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}",
                 consumerGroup, topic, fullClassName);
         }
     }
@@ -891,8 +903,10 @@ private void unregisterClient(final String producerGroup, final String consumerG
                     String addr = entry1.getValue();
                     if (addr != null) {
                         try {
-                            this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
-                            log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
+                            this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup,
+                                3000);
+                            log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success",
+                                producerGroup, consumerGroup, brokerName, entry1.getKey(), addr);
                         } catch (RemotingException e) {
                             log.error("unregister client exception from broker: " + addr, e);
                         } catch (InterruptedException e) {
@@ -999,19 +1013,16 @@ public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
     }
 
     public String findBrokerAddressInPublish(final String brokerName) {
-        HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
+        ConcurrentMap<String, HashMap<Long, String>> table = clientConfig.isUseHttp() ? brokerHttpAddrTable : brokerAddrTable;
+        HashMap<Long/* brokerId */, String/* address */> map = table.get(brokerName);
         if (map != null && !map.isEmpty()) {
             return map.get(MixAll.MASTER_ID);
         }
-
         return null;
     }
 
-    public FindBrokerResult findBrokerAddressInSubscribe(
-        final String brokerName,
-        final long brokerId,
-        final boolean onlyThisBroker
-    ) {
+    public FindBrokerResult findBrokerAddressInSubscribe(final String brokerName, final long brokerId,
+        final boolean onlyThisBroker) {
         String brokerAddr = null;
         boolean slave = false;
         boolean found = false;
@@ -1090,7 +1101,8 @@ public void resetOffset(String topic, String group, Map<MessageQueue, Long> offs
             }
             consumer.suspend();
 
-            ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
+            ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl()
+                .getProcessQueueTable();
             for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
                 MessageQueue mq = entry.getKey();
                 if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
@@ -1171,14 +1183,14 @@ public DefaultMQProducer getDefaultMQProducer() {
         return topicRouteTable;
     }
 
-    public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg,
-        final String consumerGroup,
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String consumerGroup,
         final String brokerName) {
         MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
         if (null != mqConsumerInner) {
             DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
 
-            ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName);
+            ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg,
+                brokerName);
             return result;
         }
 
@@ -1201,7 +1213,8 @@ public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) {
 
         String nsAddr = strBuilder.toString();
         consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_NAMESERVER_ADDR, nsAddr);
-        consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CONSUME_TYPE, mqConsumerInner.consumeType().name());
+        consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CONSUME_TYPE,
+            mqConsumerInner.consumeType().name());
         consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CLIENT_VERSION,
             MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 442f456aa..bb752c1ab 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -31,6 +31,9 @@
     private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
     @ImportantField
     private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+
+    private String namesrvHttpAddr;
+
     @ImportantField
     private String brokerIP1 = RemotingUtil.getLocalAddress();
     private String brokerIP2 = RemotingUtil.getLocalAddress();
@@ -63,7 +66,7 @@
     private int adminBrokerThreadPoolNums = 16;
     private int clientManageThreadPoolNums = 32;
     private int consumerManageThreadPoolNums = 32;
-    private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors());
+    private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
 
     private int flushConsumerOffsetInterval = 1000 * 5;
 
@@ -406,6 +409,14 @@ public void setNamesrvAddr(String namesrvAddr) {
         this.namesrvAddr = namesrvAddr;
     }
 
+    public String getNamesrvHttpAddr() {
+        return namesrvHttpAddr;
+    }
+
+    public void setNamesrvHttpAddr(String namesrvHttpAddr) {
+        this.namesrvHttpAddr = namesrvHttpAddr;
+    }
+
     public long getBrokerId() {
         return brokerId;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
index 19175b04b..ae845e8c4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java
@@ -29,6 +29,9 @@
     private String brokerName;
     @CFNotNull
     private String brokerAddr;
+
+    private String brokerHttpAddr;
+
     @CFNotNull
     private String clusterName;
     @CFNotNull
@@ -59,6 +62,14 @@ public void setBrokerAddr(String brokerAddr) {
         this.brokerAddr = brokerAddr;
     }
 
+    public String getBrokerHttpAddr() {
+        return brokerHttpAddr;
+    }
+
+    public void setBrokerHttpAddr(String brokerHttpAddr) {
+        this.brokerHttpAddr = brokerHttpAddr;
+    }
+
     public String getClusterName() {
         return clusterName;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
index 36599fbc8..5f7cf51a9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
@@ -28,16 +28,19 @@
     private String brokerName;
     private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
 
+    private HashMap<Long/*brokerId */ , String/*borker http address*/> brokerHttpAddrs;
+    
     private final Random random = new Random();
 
     public BrokerData() {
 
     }
 
-    public BrokerData(String cluster, String brokerName, HashMap<Long, String> brokerAddrs) {
+    public BrokerData(String cluster, String brokerName, HashMap<Long, String> brokerAddrs ,HashMap<Long, String> brokerHttpAddrs) {
         this.cluster = cluster;
         this.brokerName = brokerName;
         this.brokerAddrs = brokerAddrs;
+        this.brokerHttpAddrs = brokerHttpAddrs;
     }
 
     /**
@@ -64,6 +67,14 @@ public String selectBrokerAddr() {
     public void setBrokerAddrs(HashMap<Long, String> brokerAddrs) {
         this.brokerAddrs = brokerAddrs;
     }
+    
+    public HashMap<Long, String> getBrokerHttpAddrs() {
+        return brokerHttpAddrs;
+    }
+
+    public void setBrokerHttpAddrs(HashMap<Long, String> brokerHttpAddrs) {
+        this.brokerHttpAddrs = brokerHttpAddrs;
+    }
 
     public String getCluster() {
         return cluster;
@@ -78,6 +89,7 @@ public int hashCode() {
         final int prime = 31;
         int result = 1;
         result = prime * result + ((brokerAddrs == null) ? 0 : brokerAddrs.hashCode());
+        result = prime * result + ((brokerHttpAddrs == null) ? 0 : brokerHttpAddrs.hashCode());
         result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
         return result;
     }
@@ -96,6 +108,11 @@ public boolean equals(Object obj) {
                 return false;
         } else if (!brokerAddrs.equals(other.brokerAddrs))
             return false;
+        if (brokerHttpAddrs == null) {
+            if (other.brokerHttpAddrs != null)
+                return false;
+        } else if (!brokerHttpAddrs.equals(other.brokerHttpAddrs))
+            return false;
         if (brokerName == null) {
             if (other.brokerName != null)
                 return false;
@@ -106,7 +123,7 @@ public boolean equals(Object obj) {
 
     @Override
     public String toString() {
-        return "BrokerData [brokerName=" + brokerName + ", brokerAddrs=" + brokerAddrs + "]";
+        return "BrokerData [brokerName=" + brokerName + ", brokerAddrs=" + brokerAddrs + ",brokerHttpAddrs" + brokerHttpAddrs + "]";
     }
 
     @Override
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
index e8f54b8d7..8dbe89511 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
@@ -29,6 +29,7 @@
     private String orderTopicConf;
     private List<QueueData> queueDatas;
     private List<BrokerData> brokerDatas;
+
     private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
 
     public TopicRouteData cloneTopicRouteData() {
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
index 24e7c263d..ebfd73e9a 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
@@ -82,6 +82,7 @@ public static NamesrvController createNamesrvController(String[] args) throws IO
         final NamesrvConfig namesrvConfig = new NamesrvConfig();
         final NettyServerConfig nettyServerConfig = new NettyServerConfig();
         nettyServerConfig.setListenPort(9876);
+        nettyServerConfig.setHttpListenPort(29999);
         if (commandLine.hasOption('c')) {
             String file = commandLine.getOptionValue('c');
             if (file != null) {
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 467078c44..260e8ce8d 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -77,7 +77,6 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
                 request);
         }
 
-
         switch (request.getCode()) {
             case RequestCode.PUT_KV_CONFIG:
                 return this.putKVConfig(ctx, request);
@@ -219,6 +218,7 @@ public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx,
         RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
             requestHeader.getClusterName(),
             requestHeader.getBrokerAddr(),
+            requestHeader.getBrokerHttpAddr(),
             requestHeader.getBrokerName(),
             requestHeader.getBrokerId(),
             requestHeader.getHaServerAddr(),
@@ -299,6 +299,7 @@ public RemotingCommand registerBroker(ChannelHandlerContext ctx,
         RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
             requestHeader.getClusterName(),
             requestHeader.getBrokerAddr(),
+            requestHeader.getBrokerHttpAddr(),
             requestHeader.getBrokerName(),
             requestHeader.getBrokerId(),
             requestHeader.getHaServerAddr(),
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 00962ef24..00209cb32 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -102,6 +102,7 @@ public void deleteTopic(final String topic) {
     public RegisterBrokerResult registerBroker(
         final String clusterName,
         final String brokerAddr,
+        final String brokerHttpAddr,
         final String brokerName,
         final long brokerId,
         final String haServerAddr,
@@ -125,10 +126,11 @@ public RegisterBrokerResult registerBroker(
                 BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                 if (null == brokerData) {
                     registerFirst = true;
-                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
+                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>(), new HashMap<Long, String>());
                     this.brokerAddrTable.put(brokerName, brokerData);
                 }
                 String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
+                brokerData.getBrokerHttpAddrs().put(brokerId, brokerHttpAddr);
                 registerFirst = registerFirst || (null == oldAddr);
 
                 if (null != topicConfigWrapper
@@ -389,7 +391,7 @@ public TopicRouteData pickupTopicRouteData(final String topic) {
                         BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                         if (null != brokerData) {
                             BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
-                                .getBrokerAddrs().clone());
+                                .getBrokerAddrs().clone(), (HashMap<Long, String>) brokerData.getBrokerHttpAddrs().clone());
                             brokerDataList.add(brokerDataClone);
                             foundBrokerData = true;
                             for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
index d4a2f66f9..a2e08b230 100644
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
@@ -16,16 +16,18 @@
  */
 package org.apache.rocketmq.namesrv.processor;
 
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.namesrv.NamesrvConfig;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.common.protocol.RequestCode;
@@ -37,6 +39,7 @@
 import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -46,9 +49,8 @@
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 
 public class DefaultRequestProcessorTest {
     private DefaultRequestProcessor defaultRequestProcessor;
@@ -179,6 +181,7 @@ public void testProcessRequest_RegisterBroker() throws RemotingCommandException,
         BrokerData broker = new BrokerData();
         broker.setBrokerName("broker");
         broker.setBrokerAddrs((HashMap) Maps.newHashMap(new Long(2333), "10.10.1.1"));
+        broker.setBrokerHttpAddrs((HashMap) Maps.newHashMap(new Long(2333), "10.10.1.1") );
 
         assertThat((Map) brokerAddrTable.get(routes))
             .contains(new HashMap.SimpleEntry("broker", broker));
@@ -206,7 +209,8 @@ public void testProcessRequest_RegisterBrokerWithFilterServer() throws RemotingC
 
         BrokerData broker = new BrokerData();
         broker.setBrokerName("broker");
-        broker.setBrokerAddrs((HashMap) Maps.newHashMap(new Long(2333), "10.10.1.1"));
+        broker.setBrokerAddrs( (HashMap) Maps.newHashMap(new Long(2333), "10.10.1.1") );
+        broker.setBrokerHttpAddrs((HashMap) Maps.newHashMap(new Long(2333), "10.10.1.1") );
 
         assertThat((Map) brokerAddrTable.get(routes))
             .contains(new HashMap.SimpleEntry("broker", broker));
@@ -242,6 +246,7 @@ private static RemotingCommand genSampleRegisterCmd(boolean reg) {
             reg ? RequestCode.REGISTER_BROKER : RequestCode.UNREGISTER_BROKER, header);
         request.addExtField("brokerName", "broker");
         request.addExtField("brokerAddr", "10.10.1.1");
+        request.addExtField("brokerHttpAddr", "10.10.1.1");
         request.addExtField("clusterName", "cluster");
         request.addExtField("haServerAddr", "10.10.2.1");
         request.addExtField("brokerId", "2333");
@@ -268,7 +273,7 @@ private void registerRouteInfoManager() {
         topicConfigConcurrentHashMap.put("unit-test", topicConfig);
         topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
         Channel channel = mock(Channel.class);
-        RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", "default-broker", 1234, "127.0.0.1:1001",
+        RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", "127.0.0.1:19999", "default-broker", 1234, "127.0.0.1:1001",
             topicConfigSerializeWrapper, new ArrayList<String>(), channel);
 
     }
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
index 5ab77be1e..3729c2e19 100644
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
+++ b/namesrv/src/test/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManagerTest.java
@@ -73,7 +73,7 @@ public void testRegisterBroker() {
         topicConfigConcurrentHashMap.put("unit-test", topicConfig);
         topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
         Channel channel = mock(Channel.class);
-        RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", "default-broker", 1234, "127.0.0.1:1001",
+        RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", "127.0.0.1:19999", "default-broker", 1234, "127.0.0.1:1001",
             topicConfigSerializeWrapper, new ArrayList<String>(), channel);
         assertThat(registerBrokerResult).isNotNull();
     }
diff --git a/pom.xml b/pom.xml
index 1f71cd4b7..67d76a9db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,8 +15,8 @@
   See the License for the specific language governing permissions and
   limitations under the License.
   -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
     <parent>
         <groupId>org.apache</groupId>
@@ -157,7 +157,7 @@
                 </executions>
                 <configuration>
                     <rules>
-                        <banCircularDependencies />
+                        <banCircularDependencies/>
                     </rules>
                     <fail>true</fail>
                 </configuration>
@@ -209,7 +209,7 @@
             </plugin>
             <plugin>
                 <artifactId>maven-help-plugin</artifactId>
-                <version>2.2</version>
+                <version>3.0.1</version>
                 <executions>
                     <execution>
                         <id>generate-effective-dependencies-pom</id>
@@ -607,4 +607,4 @@
             </dependency>
         </dependencies>
     </dependencyManagement>
-</project>
+</project>
\ No newline at end of file
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 55d92f370..0bdc44048 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -15,7 +15,8 @@
   limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index c0754db63..2db58daba 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -29,6 +29,8 @@
 
     void updateNameServerAddressList(final List<String> addrs);
 
+    void updateNameServerHttpAddressList(final List<String> addrs);
+
     List<String> getNameServerAddressList();
 
     RemotingCommand invokeSync(final String addr, final RemotingCommand request,
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
index 7a383e443..a894c2be5 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.remoting.common;
 
-
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/TlsMode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/TlsMode.java
index 996ef0dd2..74c0faa94 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/TlsMode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/TlsMode.java
@@ -20,9 +20,11 @@
 /**
  * For server, three SSL modes are supported: disabled, permissive and enforcing.
  * <ol>
- *     <li><strong>disabled:</strong> SSL is not supported; any incoming SSL handshake will be rejected, causing connection closed.</li>
- *     <li><strong>permissive:</strong> SSL is optional, aka, server in this mode can serve client connections with or without SSL;</li>
- *     <li><strong>enforcing:</strong> SSL is required, aka, non SSL connection will be rejected.</li>
+ * <li><strong>disabled:</strong> SSL is not supported; any incoming SSL handshake will be rejected, causing connection
+ * closed.</li>
+ * <li><strong>permissive:</strong> SSL is optional, aka, server in this mode can serve client connections with or
+ * without SSL;</li>
+ * <li><strong>enforcing:</strong> SSL is required, aka, non SSL connection will be rejected.</li>
  * </ol>
  */
 public enum TlsMode {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
index 2bd15aea3..a22c74e68 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
@@ -29,9 +29,8 @@
 
 /**
  * <p>
- *     By default, file region are directly transferred to socket channel which is known as zero copy. In case we need
- *     to encrypt transmission, data being sent should go through the {@link SslHandler}. This encoder ensures this
- *     process.
+ * By default, file region are directly transferred to socket channel which is known as zero copy. In case we need to
+ * encrypt transmission, data being sent should go through the {@link SslHandler}. This encoder ensures this process.
  * </p>
  */
 public class FileRegionEncoder extends MessageToByteEncoder<FileRegion> {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index fbc071b28..4bf37963c 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -28,8 +28,8 @@
     private long channelNotActiveInterval = 1000 * 60;
 
     /**
-     * IdleStateEvent will be triggered when neither read nor write was performed for
-     * the specified period of this time. Specify {@code 0} to disable
+     * IdleStateEvent will be triggered when neither read nor write was performed for the specified period of this time.
+     * Specify {@code 0} to disable
      */
     private int clientChannelMaxIdleTimeSeconds = 120;
 
@@ -40,6 +40,8 @@
 
     private boolean useTLS;
 
+    private boolean useHttp = false;
+
     public boolean isClientCloseSocketIfTimeout() {
         return clientCloseSocketIfTimeout;
     }
@@ -135,4 +137,13 @@ public boolean isUseTLS() {
     public void setUseTLS(boolean useTLS) {
         this.useTLS = useTLS;
     }
+
+    public boolean isUseHttp() {
+        return useHttp;
+    }
+
+    public void setUseHttp(boolean useHttp) {
+        this.useHttp = useHttp;
+    }
+
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
index 4b4e86e68..f5283c47d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.remoting.netty;
 
-
 import io.netty.util.internal.logging.InternalLogLevel;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -27,7 +26,7 @@
 public class NettyLogger {
 
     private static AtomicBoolean nettyLoggerSeted = new AtomicBoolean(false);
-    
+
     private static InternalLogLevel nettyLogLevel = InternalLogLevel.ERROR;
 
     public static void initNettyLogger() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 8dccebc04..3ed3f9e43 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -70,15 +70,15 @@
     /**
      * This map caches all on-going requests.
      */
-    protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
-        new ConcurrentHashMap<Integer, ResponseFuture>(256);
+    protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable = new ConcurrentHashMap<Integer, ResponseFuture>(
+        256);
 
     /**
      * This container holds all processors per request code, aka, for each incoming request, we may look up the
      * responding processor in this map to handle the request.
      */
-    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
-        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
+    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(
+        64);
 
     /**
      * Executor to feed netty events to user defined {@link ChannelEventListener}.
@@ -86,7 +86,8 @@
     protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
 
     /**
-     * The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
+     * The default request processor to use in case there is no exact match in {@link #processorTable} per request
+     * code.
      */
     protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
 
@@ -130,8 +131,7 @@ public void putNettyEvent(final NettyEvent event) {
      * Entry of incoming command processing.
      *
      * <p>
-     * <strong>Note:</strong>
-     * The incoming remoting command may be
+     * <strong>Note:</strong> The incoming remoting command may be
      * <ul>
      * <li>An inquiry request from a remote peer component;</li>
      * <li>A response to a previous request issued by this very participant.</li>
@@ -166,7 +166,8 @@ public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand ms
      */
     public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
         final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
-        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
+        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor
+            : matched;
         final int opaque = cmd.getOpaque();
 
         if (pair != null) {
@@ -181,7 +182,8 @@ public void run() {
 
                         final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                         if (rpcHook != null) {
-                            rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
+                            rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd,
+                                response);
                         }
 
                         if (!cmd.isOnewayRPC()) {
@@ -204,8 +206,8 @@ public void run() {
                         log.error(cmd.toString());
 
                         if (!cmd.isOnewayRPC()) {
-                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
-                                RemotingHelper.exceptionSimpleDesc(e));
+                            final RemotingCommand response = RemotingCommand.createResponseCommand(
+                                RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e));
                             response.setOpaque(opaque);
                             ctx.writeAndFlush(response);
                         }
@@ -214,7 +216,8 @@ public void run() {
             };
 
             if (pair.getObject1().rejectRequest()) {
-                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
+                final RemotingCommand response = RemotingCommand.createResponseCommand(
+                    RemotingSysResponseCode.SYSTEM_BUSY,
                     "[REJECTREQUEST]system busy, start flow control for a while");
                 response.setOpaque(opaque);
                 ctx.writeAndFlush(response);
@@ -228,12 +231,12 @@ public void run() {
                 if ((System.currentTimeMillis() % 10000) == 0) {
                     log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                         + ", too many requests and system thread pool busy, RejectedExecutionException "
-                        + pair.getObject2().toString()
-                        + " request code: " + cmd.getCode());
+                        + pair.getObject2().toString() + " request code: " + cmd.getCode());
                 }
 
                 if (!cmd.isOnewayRPC()) {
-                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
+                    final RemotingCommand response = RemotingCommand.createResponseCommand(
+                        RemotingSysResponseCode.SYSTEM_BUSY,
                         "[OVERLOAD]system busy, start flow control for a while");
                     response.setOpaque(opaque);
                     ctx.writeAndFlush(response);
@@ -241,8 +244,8 @@ public void run() {
             }
         } else {
             String error = " request type " + cmd.getCode() + " not supported";
-            final RemotingCommand response =
-                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
+            final RemotingCommand response = RemotingCommand
+                .createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
             response.setOpaque(opaque);
             ctx.writeAndFlush(response);
             log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
@@ -270,7 +273,8 @@ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cm
                 responseFuture.release();
             }
         } else {
-            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+            log.warn("receive response, but not matched any request, "
+                + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
             log.warn(cmd.toString());
         }
     }
@@ -390,7 +394,8 @@ public void operationComplete(ChannelFuture f) throws Exception {
                     throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                         responseFuture.getCause());
                 } else {
-                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
+                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr),
+                        responseFuture.getCause());
                 }
             }
 
@@ -401,8 +406,8 @@ public void operationComplete(ChannelFuture f) throws Exception {
     }
 
     public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
-        final InvokeCallback invokeCallback)
-        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException,
+        RemotingTimeoutException, RemotingSendRequestException {
         long beginStartTime = System.currentTimeMillis();
         final int opaque = request.getOpaque();
         boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
@@ -413,7 +418,8 @@ public void invokeAsyncImpl(final Channel channel, final RemotingCommand request
                 throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
             }
 
-            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
+            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime,
+                invokeCallback, once);
             this.responseTable.put(opaque, responseFuture);
             try {
                 channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@@ -424,24 +430,23 @@ public void operationComplete(ChannelFuture f) throws Exception {
                             return;
                         }
                         requestFail(opaque);
-                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
+                        log.warn("send a request command to channel <{}> failed.",
+                            RemotingHelper.parseChannelRemoteAddr(channel));
                     }
                 });
             } catch (Exception e) {
                 responseFuture.release();
-                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
+                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel)
+                    + "> Exception", e);
                 throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
             }
         } else {
             if (timeoutMillis <= 0) {
                 throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
             } else {
-                String info =
-                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
-                        timeoutMillis,
-                        this.semaphoreAsync.getQueueLength(),
-                        this.semaphoreAsync.availablePermits()
-                    );
+                String info = String.format(
+                    "invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
+                    timeoutMillis, this.semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits());
                 log.warn(info);
                 throw new RemotingTimeoutException(info);
             }
@@ -465,6 +470,7 @@ private void requestFail(final int opaque) {
 
     /**
      * mark the request of the specified channel as fail and to invoke fail callback immediately
+     *
      * @param channel the channel which is close already
      */
     protected void failFast(final Channel channel) {
@@ -481,7 +487,8 @@ protected void failFast(final Channel channel) {
     }
 
     public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
-        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
+        RemotingSendRequestException {
         request.markOnewayRPC();
         boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
         if (acquired) {
@@ -507,10 +514,7 @@ public void operationComplete(ChannelFuture f) throws Exception {
             } else {
                 String info = String.format(
                     "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
-                    timeoutMillis,
-                    this.semaphoreOneway.getQueueLength(),
-                    this.semaphoreOneway.availablePermits()
-                );
+                    timeoutMillis, this.semaphoreOneway.getQueueLength(), this.semaphoreOneway.availablePermits());
                 log.warn(info);
                 throw new RemotingTimeoutException(info);
             }
@@ -525,7 +529,8 @@ public void putNettyEvent(final NettyEvent event) {
             if (this.eventQueue.size() <= maxSize) {
                 this.eventQueue.add(event);
             } else {
-                log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
+                log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(),
+                    event.toString());
             }
         }
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 33c2eed8d..edff33444 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -16,24 +16,6 @@
  */
 package org.apache.rocketmq.remoting.netty;
 
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.security.cert.CertificateException;
@@ -53,6 +35,9 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -64,10 +49,31 @@
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.http.HttpClientHandler;
+import org.apache.rocketmq.remoting.netty.http.NettyHTTPRequestEncoder;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpResponseDecoder;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+
 public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
 
@@ -75,6 +81,7 @@
 
     private final NettyClientConfig nettyClientConfig;
     private final Bootstrap bootstrap = new Bootstrap();
+    private final Bootstrap httpBootstrap = new Bootstrap();
     private final EventLoopGroup eventLoopGroupWorker;
     private final Lock lockChannelTables = new ReentrantLock();
     private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
@@ -82,6 +89,8 @@
     private final Timer timer = new Timer("ClientHouseKeepingService", true);
 
     private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
+    private final AtomicReference<List<String>> namesrvHttpAddrList = new AtomicReference<List<String>>();
+
     private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
     private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
     private final Lock lockNamesrvChannel = new ReentrantLock();
@@ -190,6 +199,27 @@ public void initChannel(SocketChannel ch) throws Exception {
                 }
             });
 
+        Bootstrap httpHandler = this.httpBootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
+            .option(ChannelOption.TCP_NODELAY, true)
+            .option(ChannelOption.SO_KEEPALIVE, false)
+            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
+            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
+            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
+            .handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                public void initChannel(SocketChannel ch) throws Exception {
+                    ChannelPipeline pipeline = ch.pipeline();
+
+                    pipeline.addLast(
+                        defaultEventExecutorGroup,
+                        new NettyHTTPRequestEncoder(),
+                        new HttpObjectAggregator(65536),
+                        new HttpResponseDecoder(),
+                        new NettyConnectManageHandler(),
+                        new HttpClientHandler(new NettyClientHandler()));
+                }
+            });
+
         this.timer.scheduleAtFixedRate(new TimerTask() {
             @Override
             public void run() {
@@ -357,6 +387,32 @@ public void updateNameServerAddressList(List<String> addrs) {
         }
     }
 
+    @Override
+    public void updateNameServerHttpAddressList(List<String> addrs) {
+        List<String> old = this.namesrvHttpAddrList.get();
+        boolean update = false;
+
+        if (!addrs.isEmpty()) {
+            if (null == old) {
+                update = true;
+            } else if (addrs.size() != old.size()) {
+                update = true;
+            } else {
+                for (int i = 0; i < addrs.size() && !update; i++) {
+                    if (!old.contains(addrs.get(i))) {
+                        update = true;
+                    }
+                }
+            }
+
+            if (update) {
+                Collections.shuffle(addrs);
+                log.info("name server http address updated. NEW : {} , OLD: {}", addrs, old);
+                this.namesrvHttpAddrList.set(addrs);
+            }
+        }
+    }
+
     @Override
     public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
         throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
@@ -455,7 +511,11 @@ private Channel getAndCreateNameserverChannel() throws InterruptedException {
     }
 
     private Channel createChannel(final String addr) throws InterruptedException {
+        if (addr.charAt(0) == '@') {
+            return this.httpBootstrap.connect(RemotingHelper.string2SocketAddress(addr.substring(1))).sync().channel();
+        }
         ChannelWrapper cw = this.channelTables.get(addr);
+
         if (cw != null && cw.isOK()) {
             cw.getChannel().close();
             channelTables.remove(addr);
@@ -584,7 +644,7 @@ public boolean isChannelWritable(String addr) {
 
     @Override
     public List<String> getNameServerAddressList() {
-        return this.namesrvAddrList.get();
+        return nettyClientConfig.isUseHttp() ? this.namesrvHttpAddrList.get() : this.namesrvAddrList.get();
     }
 
     @Override
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 198484251..61672deef 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -16,27 +16,6 @@
  */
 package org.apache.rocketmq.remoting.netty;
 
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.cert.CertificateException;
@@ -47,6 +26,9 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -58,13 +40,39 @@
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.http.HttpServerHandler;
+import org.apache.rocketmq.remoting.netty.http.NettyHTTPResponseEncoder;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+
 public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
     private final ServerBootstrap serverBootstrap;
+    private final ServerBootstrap httpServerBootstrap;
     private final EventLoopGroup eventLoopGroupSelector;
     private final EventLoopGroup eventLoopGroupBoss;
     private final NettyServerConfig nettyServerConfig;
@@ -91,6 +99,7 @@ public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
         final ChannelEventListener channelEventListener) {
         super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
         this.serverBootstrap = new ServerBootstrap();
+        this.httpServerBootstrap = new ServerBootstrap();
         this.nettyServerConfig = nettyServerConfig;
         this.channelEventListener = channelEventListener;
 
@@ -204,6 +213,33 @@ public void initChannel(SocketChannel ch) throws Exception {
                     }
                 });
 
+        ServerBootstrap httpChildHandler =
+            this.httpServerBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
+                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
+                .option(ChannelOption.SO_BACKLOG, 4096)
+                .option(ChannelOption.SO_REUSEADDR, true)
+                .option(ChannelOption.SO_KEEPALIVE, false)
+                .childOption(ChannelOption.TCP_NODELAY, true)
+                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
+                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
+                .localAddress(new InetSocketAddress(this.nettyServerConfig.getHttpListenPort()))
+                .childHandler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    public void initChannel(SocketChannel ch) throws Exception {
+
+                        ch.pipeline()
+                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
+                                new HandshakeHandler(TlsSystemConfig.tlsMode))
+                            .addLast(defaultEventExecutorGroup,
+                                new HttpRequestDecoder(),
+                                new HttpObjectAggregator(65536),
+                                new NettyHTTPResponseEncoder(),
+                                new ChunkedWriteHandler(),
+                                new HttpServerHandler(new NettyServerHandler())
+                            );
+                    }
+                });
+
         if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
             childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
         }
@@ -212,6 +248,9 @@ public void initChannel(SocketChannel ch) throws Exception {
             ChannelFuture sync = this.serverBootstrap.bind().sync();
             InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
             this.port = addr.getPort();
+
+            this.httpServerBootstrap.bind().sync();
+
         } catch (InterruptedException e1) {
             throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
         }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
index a5e2a232d..b8e0c2140 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
@@ -18,6 +18,9 @@
 
 public class NettyServerConfig implements Cloneable {
     private int listenPort = 8888;
+
+    private int httpListenPort = 19999;
+
     private int serverWorkerThreads = 8;
     private int serverCallbackExecutorThreads = 0;
     private int serverSelectorThreads = 3;
@@ -33,8 +36,8 @@
      * make make install
      *
      *
-     * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
-     * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
+     * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \ --host=x86_64-linux-gnu \
+     * --build=x86_64-pc-linux-gnu \ --without-gd
      */
     private boolean useEpollNativeSelector = false;
 
@@ -46,6 +49,14 @@ public void setListenPort(int listenPort) {
         this.listenPort = listenPort;
     }
 
+    public int getHttpListenPort() {
+        return httpListenPort;
+    }
+
+    public void setHttpListenPort(int httpListenPort) {
+        this.httpListenPort = httpListenPort;
+    }
+
     public int getServerWorkerThreads() {
         return serverWorkerThreads;
     }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsHelper.java
index efbdd52c0..7cc846048 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsHelper.java
@@ -83,7 +83,6 @@ public InputStream decryptPrivateKey(final String privateKeyEncryptPath,
         }
     };
 
-
     public static void registerDecryptionStrategy(final DecryptionStrategy decryptionStrategy) {
         TlsHelper.decryptionStrategy = decryptionStrategy;
     }
@@ -112,7 +111,6 @@ public static SslContext buildSslContext(boolean forClient) throws IOException,
             } else {
                 SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().sslProvider(SslProvider.JDK);
 
-
                 if (!tlsClientAuthServer) {
                     sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
                 } else {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsSystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsSystemConfig.java
index 403bd6c9a..3217094f1 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsSystemConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/TlsSystemConfig.java
@@ -39,7 +39,6 @@
     public static final String TLS_CLIENT_AUTHSERVER = "tls.client.authServer";
     public static final String TLS_CLIENT_TRUSTCERTPATH = "tls.client.trustCertPath";
 
-
     /**
      * To determine whether use SSL in client-side, include SDK client and BrokerOuterAPI
      */
@@ -51,9 +50,9 @@
     public static boolean tlsTestModeEnable = Boolean.parseBoolean(System.getProperty(TLS_TEST_MODE_ENABLE, "true"));
 
     /**
-     * Indicates the state of the {@link javax.net.ssl.SSLEngine} with respect to client authentication.
-     * This configuration item really only applies when building the server-side {@link SslContext},
-     * and can be set to none, require or optional.
+     * Indicates the state of the {@link javax.net.ssl.SSLEngine} with respect to client authentication. This
+     * configuration item really only applies when building the server-side {@link SslContext}, and can be set to none,
+     * require or optional.
      */
     public static String tlsServerNeedClientAuth = System.getProperty(TLS_SERVER_NEED_CLIENT_AUTH, "none");
     /**
@@ -107,19 +106,21 @@
     public static String tlsClientTrustCertPath = System.getProperty(TLS_CLIENT_TRUSTCERTPATH, null);
 
     /**
-     * For server, three SSL modes are supported: disabled, permissive and enforcing.
-     * For client, use {@link TlsSystemConfig#tlsEnable} to determine whether use SSL.
+     * For server, three SSL modes are supported: disabled, permissive and enforcing. For client, use {@link
+     * TlsSystemConfig#tlsEnable} to determine whether use SSL.
      * <ol>
-     *     <li><strong>disabled:</strong> SSL is not supported; any incoming SSL handshake will be rejected, causing connection closed.</li>
-     *     <li><strong>permissive:</strong> SSL is optional, aka, server in this mode can serve client connections with or without SSL;</li>
-     *     <li><strong>enforcing:</strong> SSL is required, aka, non SSL connection will be rejected.</li>
+     * <li><strong>disabled:</strong> SSL is not supported; any incoming SSL handshake will be rejected, causing
+     * connection closed.</li>
+     * <li><strong>permissive:</strong> SSL is optional, aka, server in this mode can serve client connections with or
+     * without SSL;</li>
+     * <li><strong>enforcing:</strong> SSL is required, aka, non SSL connection will be rejected.</li>
      * </ol>
      */
     public static TlsMode tlsMode = TlsMode.parse(System.getProperty(TLS_SERVER_MODE, "permissive"));
 
     /**
-     * A config file to store the above TLS related configurations,
-     * except {@link TlsSystemConfig#tlsMode} and {@link TlsSystemConfig#tlsEnable}
+     * A config file to store the above TLS related configurations, except {@link TlsSystemConfig#tlsMode} and {@link
+     * TlsSystemConfig#tlsEnable}
      */
     public static String tlsConfigFile = System.getProperty(TLS_CONFIG_FILE, "/etc/rocketmq/tls.properties");
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/HttpClientHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/HttpClientHandler.java
new file mode 100644
index 000000000..203e04f02
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/HttpClientHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty.http;
+
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import com.alibaba.fastjson.JSON;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultLastHttpContent;
+import io.netty.handler.codec.http.FullHttpResponse;
+
+public class HttpClientHandler extends ChannelInboundHandlerAdapter {
+
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    private SimpleChannelInboundHandler<RemotingCommand> channelInboundHandler;
+
+    public HttpClientHandler(SimpleChannelInboundHandler<RemotingCommand> channelInboundHandler) {
+        this.channelInboundHandler = channelInboundHandler;
+    }
+
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        RemotingCommand remotingCommand = null;
+        try {
+            ByteBuf buf = null;
+            if (msg instanceof FullHttpResponse) {
+                FullHttpResponse response = (FullHttpResponse) msg;
+                buf = response.content();
+            } else if (msg instanceof DefaultLastHttpContent) {
+                DefaultLastHttpContent defaultLastHttpContent = (DefaultLastHttpContent) msg;
+                buf = defaultLastHttpContent.content();
+            }
+            if (buf != null) {
+                byte[] by = new byte[buf.writerIndex()];
+                buf.getBytes(0, by);
+                remotingCommand = JSON.parseObject(by, RemotingCommand.class);
+                this.channelInboundHandler.channelRead(ctx, remotingCommand);
+                ctx.channel().closeFuture();
+            }
+        } catch (Exception e) {
+            ctx.channel().closeFuture();
+            log.error("client decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
+        }
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/HttpServerHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/HttpServerHandler.java
new file mode 100644
index 000000000..3dd045cd0
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/HttpServerHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty.http;
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import com.alibaba.fastjson.JSON;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.base64.Base64;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+
+public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
+
+    private SimpleChannelInboundHandler<RemotingCommand> channelInboundHandler;
+
+    public HttpServerHandler(SimpleChannelInboundHandler<RemotingCommand> channelInboundHandler) {
+        this.channelInboundHandler = channelInboundHandler;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
+        try {
+            if (!fullHttpRequest.getDecoderResult().isSuccess()) {
+                // sendError(channelHandlerContext, HttpResponseStatus.BAD_REQUEST);
+                return;
+            }
+
+            if (fullHttpRequest.getMethod() == HttpMethod.GET) {
+                // sendError(channelHandlerContext,HttpResponseStatus.METHOD_NOT_ALLOWED);
+                return;
+            }
+
+            String uri = fullHttpRequest.getUri();
+            ByteBuf buf = fullHttpRequest.content();
+            byte[] by = new byte[buf.writerIndex()];
+            buf.getBytes(0, by);
+            RemotingCommand remotingCommand = JSON.parseObject(by, RemotingCommand.class);
+            remotingCommand.setCode(URIUtils.getCode(uri));
+            if (remotingCommand.getBody() == null && remotingCommand.getExtFields() != null) {
+                String body = remotingCommand.getExtFields().remove("body");
+                if (body != null) {
+                    remotingCommand.setBody(body.getBytes());
+                } else {
+                    body = remotingCommand.getExtFields().remove("beBody");
+                    if (body != null) {
+                        byte[] bodyBy = body.getBytes();
+                        ByteBuf bodyBuffer = Base64.decode(Unpooled.buffer(bodyBy.length).setBytes(0, bodyBy).writerIndex(bodyBy.length));
+                        byte[] newBodyBy = new byte[bodyBuffer.writerIndex()];
+                        bodyBuffer.getBytes(0, newBodyBy);
+                        remotingCommand.setBody(newBodyBy);
+                    }
+                }
+            }
+            this.channelInboundHandler.channelRead(channelHandlerContext, remotingCommand);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/NettyHTTPRequestEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/NettyHTTPRequestEncoder.java
new file mode 100644
index 000000000..1b1486039
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/NettyHTTPRequestEncoder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty.http;
+
+import java.util.List;
+
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+import com.alibaba.fastjson.JSON;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.base64.Base64;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpRequestEncoder;
+import io.netty.handler.codec.http.HttpVersion;
+
+public class NettyHTTPRequestEncoder extends HttpRequestEncoder {
+
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    public boolean acceptOutboundMessage(Object msg) throws Exception {
+        if (msg instanceof RemotingCommand) {
+            return true;
+        }
+        return super.acceptOutboundMessage(msg);
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
+        try {
+            if (msg instanceof RemotingCommand) {
+                RemotingCommand remotingCommand = (RemotingCommand) msg;
+                remotingCommand.makeCustomHeaderToNet();
+
+                byte[] byteArray = remotingCommand.getBody();
+                if (byteArray != null) {
+                    ByteBuf bodyBuf = Base64.encode(Unpooled.buffer(byteArray.length).setBytes(0, byteArray).writerIndex(byteArray.length));
+                    byte[] newByteArray = new byte[bodyBuf.writerIndex()];
+                    bodyBuf.getBytes(0, newByteArray);
+                    remotingCommand.addExtField("beBody", new String(newByteArray));
+                }
+                DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+                    HttpMethod.POST, URIUtils.getURI(remotingCommand.getCode()), Unpooled.wrappedBuffer(JSON.toJSONBytes(msg)));
+
+                defaultFullHttpRequest.headers().set("content-type", "application/json");
+                defaultFullHttpRequest.headers().set("Content-Length", defaultFullHttpRequest.content().readableBytes());
+                super.encode(ctx, defaultFullHttpRequest, out);
+            } else {
+                super.encode(ctx, msg, out);
+            }
+        } catch (Exception e) {
+            RemotingUtil.closeChannel(ctx.channel());
+            log.error("NettyHTTPRequestEncoder Exception ", e);
+        }
+    }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/NettyHTTPResponseEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/NettyHTTPResponseEncoder.java
new file mode 100644
index 000000000..5a2a81a08
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/NettyHTTPResponseEncoder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty.http;
+
+import java.util.List;
+
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
+
+import com.alibaba.fastjson.JSON;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.FileRegion;
+import io.netty.handler.codec.base64.Base64;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+
+public class NettyHTTPResponseEncoder extends HttpResponseEncoder {
+
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    public boolean acceptOutboundMessage(Object msg) throws Exception {
+        if (msg instanceof RemotingCommand) {
+            return true;
+        }
+        return super.acceptOutboundMessage(msg);
+    }
+
+    protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
+        RemotingCommand remotingCommand = null;
+        try {
+            if (msg instanceof RemotingCommand) {
+                remotingCommand = (RemotingCommand) msg;
+                remotingCommand.makeCustomHeaderToNet();
+                byte[] bodyByte = remotingCommand.getBody();
+                if (bodyByte != null) {
+                    ByteBuf bodyBuf = Base64.encode(Unpooled.buffer(bodyByte.length).setBytes(0, bodyByte).writerIndex(bodyByte.length));
+
+                    remotingCommand.addExtField("beBody", new String(bodyBuf.array()));
+                }
+                byte[] contentByte = JSON.toJSONBytes(msg);
+                ByteBuf bytebuf = Unpooled.buffer(contentByte.length);
+                bytebuf.setBytes(0, contentByte);
+                bytebuf.writerIndex(contentByte.length);
+                DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, bytebuf);
+                defaultFullHttpResponse.headers().set("content-type", "application/json");
+                defaultFullHttpResponse.headers().set("Content-Length", contentByte.length);
+                super.encode(ctx, defaultFullHttpResponse, out);
+
+            } else if (msg instanceof FileRegion) {
+
+            }
+        } catch (Exception e) {
+            if (remotingCommand == null) {
+                RemotingUtil.closeChannel(ctx.channel());
+            } else {
+                RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "client decode exception");
+                response.setOpaque(remotingCommand.getOpaque());
+                ctx.channel().write(response);
+            }
+            log.error("broker encoder exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
+        }
+    }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/URIUtils.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/URIUtils.java
new file mode 100644
index 000000000..a405b8232
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/http/URIUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty.http;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+public class URIUtils {
+
+    private static final Map<Integer, String> CODE_URI_MAP = new HashMap<Integer, String>();
+
+    private static final Map<String, Integer> URI_CODE_MAP = new HashMap<String, Integer>();
+
+    static {
+
+        try {
+            Class<?> clazz = Class.forName("org.apache.rocketmq.common.protocol.RequestCode");
+            Field[] fields = clazz.getDeclaredFields();
+            for (Field field : fields) {
+                if (field.getType() == int.class) {
+                    int code = (Integer) field.get(null);
+                    String[] nameSpilt = field.getName().split("_");
+                    StringBuffer sb = new StringBuffer().append('/').append(nameSpilt[0].toLowerCase());
+                    for (int i = 1; i < nameSpilt.length; i++) {
+                        sb.append(nameSpilt[i].substring(0, 1)).append(nameSpilt[i].substring(1).toLowerCase());
+
+                    }
+                    String uri = sb.toString();
+                    CODE_URI_MAP.put(code, uri);
+                    URI_CODE_MAP.put(uri, code);
+                }
+            }
+        } catch (ClassNotFoundException e) {
+
+        } catch (IllegalArgumentException e) {
+
+        } catch (IllegalAccessException e) {
+
+        }
+    }
+
+    public static int getCode(String uri) {
+        return URI_CODE_MAP.get(uri);
+    }
+
+    public static String getURI(Integer code) {
+        return CODE_URI_MAP.get(code);
+    }
+}
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
index 13bb17282..9ec178e9e 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
@@ -106,8 +106,7 @@ public void setUp() throws InterruptedException {
             tlsClientKeyPath = getCertsPath("badClient.key");
             tlsClientCertPath = getCertsPath("badClient.pem");
             tlsServerAuthClient = false;
-        }
-        else if ("noClientAuthFailure".equals(name.getMethodName())) {
+        } else if ("noClientAuthFailure".equals(name.getMethodName())) {
             //Clear the client cert config to ensure produce the handshake error
             tlsClientKeyPath = "";
             tlsClientCertPath = "";
@@ -152,8 +151,7 @@ public void tearDown() {
     }
 
     /**
-     * Tests that a client and a server configured using two-way SSL auth can successfully
-     * communicate with each other.
+     * Tests that a client and a server configured using two-way SSL auth can successfully communicate with each other.
      */
     @Test
     public void basicClientServerIntegrationTest() throws Exception {
@@ -205,8 +203,8 @@ public void serverRejectsSSLClient() throws Exception {
     }
 
     /**
-     * Tests that a server configured to require client authentication refuses to accept connections
-     * from a client that has an untrusted certificate.
+     * Tests that a server configured to require client authentication refuses to accept connections from a client that
+     * has an untrusted certificate.
      */
     @Test
     public void serverRejectsUntrustedClientCert() throws Exception {
@@ -223,8 +221,7 @@ public void serverAcceptsUntrustedClientCert() throws Exception {
     }
 
     /**
-     * Tests that a server configured to require client authentication actually does require client
-     * authentication.
+     * Tests that a server configured to require client authentication actually does require client authentication.
      */
     @Test
     public void noClientAuthFailure() throws Exception {
@@ -236,8 +233,8 @@ public void noClientAuthFailure() throws Exception {
     }
 
     /**
-     * Tests that a client configured using GrpcSslContexts refuses to talk to a server that has an
-     * an untrusted certificate.
+     * Tests that a client configured using GrpcSslContexts refuses to talk to a server that has an an untrusted
+     * certificate.
      */
     @Test
     public void clientRejectsUntrustedServerCert() throws Exception {
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java
index 6c7327f25..62c1ccf1d 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/FileRegionEncoderTest.java
@@ -33,8 +33,8 @@
 public class FileRegionEncoderTest {
 
     /**
-     * This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to
-     * {@link ByteBuf}.
+     * This unit test case ensures that {@link FileRegionEncoder} indeed wraps {@link FileRegion} to {@link ByteBuf}.
+     *
      * @throws IOException if there is an error.
      */
     @Test
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/HttpServerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/HttpServerTest.java
new file mode 100644
index 000000000..a4f8969eb
--- /dev/null
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/HttpServerTest.java
@@ -0,0 +1,37 @@
+package org.apache.rocketmq.remoting.netty;
+
+import java.util.HashMap;
+
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.junit.Test;
+
+import com.alibaba.fastjson.JSON;
+
+
+public class HttpServerTest {
+
+    @Test
+    public void remotingCommandSerialize() {
+
+        RemotingCommand cmd = RemotingCommand.createResponseCommand(1, "hello laohu");
+
+        cmd.setLanguage(LanguageCode.valueOf((byte) 0));
+        // int version(~32767)
+        cmd.setVersion(3);
+        // int opaque
+        cmd.setOpaque(123112);
+
+        HashMap<String, String> map = new HashMap<String, String>();
+        map.put("lao", "laohu");
+        map.put("niao", "cai");
+        cmd.setExtFields(map);
+
+        String str = JSON.toJSONString(cmd);
+
+        RemotingCommand rc = RemotingSerializable.decode(str.getBytes(), RemotingCommand.class);
+
+    }
+
+}
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
index c3da3e9af..42ca1a64b 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
@@ -36,7 +36,7 @@
     @Test
     public void testProcessResponseCommand() throws InterruptedException {
         final Semaphore semaphore = new Semaphore(0);
-        ResponseFuture responseFuture = new ResponseFuture(null,1, 3000, new InvokeCallback() {
+        ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, new InvokeCallback() {
             @Override
             public void operationComplete(final ResponseFuture responseFuture) {
                 assertThat(semaphore.availablePermits()).isEqualTo(0);
@@ -57,7 +57,7 @@ public void operationComplete(final ResponseFuture responseFuture) {
     @Test
     public void testProcessResponseCommand_NullCallBack() throws InterruptedException {
         final Semaphore semaphore = new Semaphore(0);
-        ResponseFuture responseFuture = new ResponseFuture(null,1, 3000, null,
+        ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, null,
             new SemaphoreReleaseOnlyOnce(semaphore));
 
         remotingAbstract.responseTable.putIfAbsent(1, responseFuture);
@@ -72,7 +72,7 @@ public void testProcessResponseCommand_NullCallBack() throws InterruptedExceptio
     @Test
     public void testProcessResponseCommand_RunCallBackInCurrentThread() throws InterruptedException {
         final Semaphore semaphore = new Semaphore(0);
-        ResponseFuture responseFuture = new ResponseFuture(null,1, 3000, new InvokeCallback() {
+        ResponseFuture responseFuture = new ResponseFuture(null, 1, 3000, new InvokeCallback() {
             @Override
             public void operationComplete(final ResponseFuture responseFuture) {
                 assertThat(semaphore.availablePermits()).isEqualTo(0);
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
index 6b5633df1..5be2d7b14 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
@@ -30,7 +30,7 @@
     private NettyRemotingClient remotingClient = new NettyRemotingClient(new NettyClientConfig());
 
     @Test
-    public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException {        
+    public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException {
         ExecutorService customized = Executors.newCachedThreadPool();
         remotingClient.setCallbackExecutor(customized);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services