You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:32 UTC

[15/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java
new file mode 100644
index 0000000..18450c6
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -0,0 +1,815 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv.routeinfo;
+
+import com.alibaba.rocketmq.common.DataVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.constant.PermName;
+import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
+import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
+import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import com.alibaba.rocketmq.common.protocol.body.TopicList;
+import com.alibaba.rocketmq.common.protocol.route.BrokerData;
+import com.alibaba.rocketmq.common.protocol.route.QueueData;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RouteInfoManager {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
+    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
+    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
+    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
+    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
+
+
+    public RouteInfoManager() {
+        this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
+        this.brokerAddrTable = new HashMap<String, BrokerData>(128);
+        this.clusterAddrTable = new HashMap<String, Set<String>>(32);
+        this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
+        this.filterServerTable = new HashMap<String, List<String>>(256);
+    }
+
+    public byte[] getAllClusterInfo() {
+        ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();
+        clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);
+        clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);
+        return clusterInfoSerializeWrapper.encode();
+    }
+
+    public void deleteTopic(final String topic) {
+        try {
+            try {
+                this.lock.writeLock().lockInterruptibly();
+                this.topicQueueTable.remove(topic);
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        } catch (Exception e) {
+            log.error("deleteTopic Exception", e);
+        }
+    }
+
+    public byte[] getAllTopicList() {
+        TopicList topicList = new TopicList();
+        try {
+            try {
+                this.lock.readLock().lockInterruptibly();
+                topicList.getTopicList().addAll(this.topicQueueTable.keySet());
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        } catch (Exception e) {
+            log.error("getAllTopicList Exception", e);
+        }
+
+        return topicList.encode();
+    }
+
+    public RegisterBrokerResult registerBroker(
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId,
+            final String haServerAddr,
+            final TopicConfigSerializeWrapper topicConfigWrapper,
+            final List<String> filterServerList,
+            final Channel channel) {
+        RegisterBrokerResult result = new RegisterBrokerResult();
+        try {
+            try {
+                this.lock.writeLock().lockInterruptibly();
+
+
+                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
+                if (null == brokerNames) {
+                    brokerNames = new HashSet<String>();
+                    this.clusterAddrTable.put(clusterName, brokerNames);
+                }
+                brokerNames.add(brokerName);
+
+                boolean registerFirst = false;
+
+
+                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
+                if (null == brokerData) {
+                    registerFirst = true;
+                    brokerData = new BrokerData();
+                    brokerData.setBrokerName(brokerName);
+                    HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
+                    brokerData.setBrokerAddrs(brokerAddrs);
+
+                    this.brokerAddrTable.put(brokerName, brokerData);
+                }
+                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
+                registerFirst = registerFirst || (null == oldAddr);
+
+
+                if (null != topicConfigWrapper //
+                        && MixAll.MASTER_ID == brokerId) {
+                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
+                            || registerFirst) {
+                        ConcurrentHashMap<String, TopicConfig> tcTable =
+                                topicConfigWrapper.getTopicConfigTable();
+                        if (tcTable != null) {
+                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
+                                this.createAndUpdateQueueData(brokerName, entry.getValue());
+                            }
+                        }
+                    }
+                }
+
+
+                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
+                        new BrokerLiveInfo(
+                                System.currentTimeMillis(),
+                                topicConfigWrapper.getDataVersion(),
+                                channel,
+                                haServerAddr));
+                if (null == prevBrokerLiveInfo) {
+                    log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
+                }
+
+
+                if (filterServerList != null) {
+                    if (filterServerList.isEmpty()) {
+                        this.filterServerTable.remove(brokerAddr);
+                    } else {
+                        this.filterServerTable.put(brokerAddr, filterServerList);
+                    }
+                }
+
+
+                if (MixAll.MASTER_ID != brokerId) {
+                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+                    if (masterAddr != null) {
+                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
+                        if (brokerLiveInfo != null) {
+                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
+                            result.setMasterAddr(masterAddr);
+                        }
+                    }
+                }
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        } catch (Exception e) {
+            log.error("registerBroker Exception", e);
+        }
+
+        return result;
+    }
+
+    private boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
+        BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
+        if (null == prev || !prev.getDataVersion().equals(dataVersion)) {
+            return true;
+        }
+
+        return false;
+    }
+
+    private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName(brokerName);
+        queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
+        queueData.setReadQueueNums(topicConfig.getReadQueueNums());
+        queueData.setPerm(topicConfig.getPerm());
+        queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
+
+        List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
+        if (null == queueDataList) {
+            queueDataList = new LinkedList<QueueData>();
+            queueDataList.add(queueData);
+            this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
+            log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);
+        } else {
+            boolean addNewOne = true;
+
+            Iterator<QueueData> it = queueDataList.iterator();
+            while (it.hasNext()) {
+                QueueData qd = it.next();
+                if (qd.getBrokerName().equals(brokerName)) {
+                    if (qd.equals(queueData)) {
+                        addNewOne = false;
+                    } else {
+                        log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
+                                queueData);
+                        it.remove();
+                    }
+                }
+            }
+
+            if (addNewOne) {
+                queueDataList.add(queueData);
+            }
+        }
+    }
+
+    public int wipeWritePermOfBrokerByLock(final String brokerName) {
+        try {
+            try {
+                this.lock.writeLock().lockInterruptibly();
+                return wipeWritePermOfBroker(brokerName);
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        } catch (Exception e) {
+            log.error("wipeWritePermOfBrokerByLock Exception", e);
+        }
+
+        return 0;
+    }
+
+    private int wipeWritePermOfBroker(final String brokerName) {
+        int wipeTopicCnt = 0;
+        Iterator<Entry<String, List<QueueData>>> itTopic = this.topicQueueTable.entrySet().iterator();
+        while (itTopic.hasNext()) {
+            Entry<String, List<QueueData>> entry = itTopic.next();
+            List<QueueData> qdList = entry.getValue();
+
+            Iterator<QueueData> it = qdList.iterator();
+            while (it.hasNext()) {
+                QueueData qd = it.next();
+                if (qd.getBrokerName().equals(brokerName)) {
+                    int perm = qd.getPerm();
+                    perm &= ~PermName.PERM_WRITE;
+                    qd.setPerm(perm);
+                    wipeTopicCnt++;
+                }
+            }
+        }
+
+        return wipeTopicCnt;
+    }
+
+    public void unregisterBroker(
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId) {
+        try {
+            try {
+                this.lock.writeLock().lockInterruptibly();
+                BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
+                if (brokerLiveInfo != null) {
+                    log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
+                            brokerLiveInfo != null ? "OK" : "Failed",
+                            brokerAddr
+                    );
+                }
+
+                this.filterServerTable.remove(brokerAddr);
+
+                boolean removeBrokerName = false;
+                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
+                if (null != brokerData) {
+                    String addr = brokerData.getBrokerAddrs().remove(brokerId);
+                    log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
+                            addr != null ? "OK" : "Failed",
+                            brokerAddr
+                    );
+
+                    if (brokerData.getBrokerAddrs().isEmpty()) {
+                        this.brokerAddrTable.remove(brokerName);
+                        log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
+                                brokerName
+                        );
+
+                        removeBrokerName = true;
+                    }
+                }
+
+                if (removeBrokerName) {
+                    Set<String> nameSet = this.clusterAddrTable.get(clusterName);
+                    if (nameSet != null) {
+                        boolean removed = nameSet.remove(brokerName);
+                        log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
+                                removed ? "OK" : "Failed",
+                                brokerName);
+
+                        if (nameSet.isEmpty()) {
+                            this.clusterAddrTable.remove(clusterName);
+                            log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
+                                    clusterName
+                            );
+                        }
+                    }
+                    this.removeTopicByBrokerName(brokerName);
+                }
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        } catch (Exception e) {
+            log.error("unregisterBroker Exception", e);
+        }
+    }
+
+    private void removeTopicByBrokerName(final String brokerName) {
+        Iterator<Entry<String, List<QueueData>>> itMap = this.topicQueueTable.entrySet().iterator();
+        while (itMap.hasNext()) {
+            Entry<String, List<QueueData>> entry = itMap.next();
+
+            String topic = entry.getKey();
+            List<QueueData> queueDataList = entry.getValue();
+            Iterator<QueueData> it = queueDataList.iterator();
+            while (it.hasNext()) {
+                QueueData qd = it.next();
+                if (qd.getBrokerName().equals(brokerName)) {
+                    log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, qd);
+                    it.remove();
+                }
+            }
+
+            if (queueDataList.isEmpty()) {
+                log.info("removeTopicByBrokerName, remove the topic all queue {}", topic);
+                itMap.remove();
+            }
+        }
+    }
+
+    public TopicRouteData pickupTopicRouteData(final String topic) {
+        TopicRouteData topicRouteData = new TopicRouteData();
+        boolean foundQueueData = false;
+        boolean foundBrokerData = false;
+        Set<String> brokerNameSet = new HashSet<String>();
+        List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
+        topicRouteData.setFilterServerTable(filterServerMap);
+
+        try {
+            try {
+                this.lock.readLock().lockInterruptibly();
+                List<QueueData> queueDataList = this.topicQueueTable.get(topic);
+                if (queueDataList != null) {
+                    topicRouteData.setQueueDatas(queueDataList);
+                    foundQueueData = true;
+
+
+                    Iterator<QueueData> it = queueDataList.iterator();
+                    while (it.hasNext()) {
+                        QueueData qd = it.next();
+                        brokerNameSet.add(qd.getBrokerName());
+                    }
+
+                    for (String brokerName : brokerNameSet) {
+                        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
+                        if (null != brokerData) {
+                            BrokerData brokerDataClone = new BrokerData();
+                            brokerDataClone.setBrokerName(brokerData.getBrokerName());
+                            brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData
+                                    .getBrokerAddrs().clone());
+                            brokerDataList.add(brokerDataClone);
+                            foundBrokerData = true;
+                            for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
+                                List<String> filterServerList = this.filterServerTable.get(brokerAddr);
+                                filterServerMap.put(brokerAddr, filterServerList);
+                            }
+                        }
+                    }
+                }
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        } catch (Exception e) {
+            log.error("pickupTopicRouteData Exception", e);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
+        }
+
+        if (foundBrokerData && foundQueueData) {
+            return topicRouteData;
+        }
+
+        return null;
+    }
+
+    public void scanNotActiveBroker() {
+        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, BrokerLiveInfo> next = it.next();
+            long last = next.getValue().getLastUpdateTimestamp();
+            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
+                RemotingUtil.closeChannel(next.getValue().getChannel());
+                it.remove();
+                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
+                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
+            }
+        }
+    }
+
+    public void onChannelDestroy(String remoteAddr, Channel channel) {
+        String brokerAddrFound = null;
+        if (channel != null) {
+            try {
+                try {
+                    this.lock.readLock().lockInterruptibly();
+                    Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
+                            this.brokerLiveTable.entrySet().iterator();
+                    while (itBrokerLiveTable.hasNext()) {
+                        Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
+                        if (entry.getValue().getChannel() == channel) {
+                            brokerAddrFound = entry.getKey();
+                            break;
+                        }
+                    }
+                } finally {
+                    this.lock.readLock().unlock();
+                }
+            } catch (Exception e) {
+                log.error("onChannelDestroy Exception", e);
+            }
+        }
+
+        if (null == brokerAddrFound) {
+            brokerAddrFound = remoteAddr;
+        } else {
+            log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
+        }
+
+
+        if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
+
+            try {
+                try {
+                    this.lock.writeLock().lockInterruptibly();
+                    this.brokerLiveTable.remove(brokerAddrFound);
+                    this.filterServerTable.remove(brokerAddrFound);
+                    String brokerNameFound = null;
+                    boolean removeBrokerName = false;
+                    Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
+                            this.brokerAddrTable.entrySet().iterator();
+                    while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
+                        BrokerData brokerData = itBrokerAddrTable.next().getValue();
+
+                        Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
+                        while (it.hasNext()) {
+                            Entry<Long, String> entry = it.next();
+                            Long brokerId = entry.getKey();
+                            String brokerAddr = entry.getValue();
+                            if (brokerAddr.equals(brokerAddrFound)) {
+                                brokerNameFound = brokerData.getBrokerName();
+                                it.remove();
+                                log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
+                                        brokerId, brokerAddr);
+                                break;
+                            }
+                        }
+
+                        if (brokerData.getBrokerAddrs().isEmpty()) {
+                            removeBrokerName = true;
+                            itBrokerAddrTable.remove();
+                            log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
+                                    brokerData.getBrokerName());
+                        }
+                    }
+
+                    if (brokerNameFound != null && removeBrokerName) {
+                        Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
+                        while (it.hasNext()) {
+                            Entry<String, Set<String>> entry = it.next();
+                            String clusterName = entry.getKey();
+                            Set<String> brokerNames = entry.getValue();
+                            boolean removed = brokerNames.remove(brokerNameFound);
+                            if (removed) {
+                                log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
+                                        brokerNameFound, clusterName);
+
+
+                                if (brokerNames.isEmpty()) {
+                                    log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
+                                            clusterName);
+                                    it.remove();
+                                }
+
+                                break;
+                            }
+                        }
+                    }
+
+                    if (removeBrokerName) {
+                        Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
+                                this.topicQueueTable.entrySet().iterator();
+                        while (itTopicQueueTable.hasNext()) {
+                            Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
+                            String topic = entry.getKey();
+                            List<QueueData> queueDataList = entry.getValue();
+
+                            Iterator<QueueData> itQueueData = queueDataList.iterator();
+                            while (itQueueData.hasNext()) {
+                                QueueData queueData = itQueueData.next();
+                                if (queueData.getBrokerName().equals(brokerNameFound)) {
+                                    itQueueData.remove();
+                                    log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
+                                            topic, queueData);
+                                }
+                            }
+
+                            if (queueDataList.isEmpty()) {
+                                itTopicQueueTable.remove();
+                                log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
+                                        topic);
+                            }
+                        }
+                    }
+                } finally {
+                    this.lock.writeLock().unlock();
+                }
+            } catch (Exception e) {
+                log.error("onChannelDestroy Exception", e);
+            }
+        }
+    }
+
+    public void printAllPeriodically() {
+        try {
+            try {
+                this.lock.readLock().lockInterruptibly();
+                log.info("--------------------------------------------------------");
+                {
+                    log.info("topicQueueTable SIZE: {}", this.topicQueueTable.size());
+                    Iterator<Entry<String, List<QueueData>>> it = this.topicQueueTable.entrySet().iterator();
+                    while (it.hasNext()) {
+                        Entry<String, List<QueueData>> next = it.next();
+                        log.info("topicQueueTable Topic: {} {}", next.getKey(), next.getValue());
+                    }
+                }
+
+                {
+                    log.info("brokerAddrTable SIZE: {}", this.brokerAddrTable.size());
+                    Iterator<Entry<String, BrokerData>> it = this.brokerAddrTable.entrySet().iterator();
+                    while (it.hasNext()) {
+                        Entry<String, BrokerData> next = it.next();
+                        log.info("brokerAddrTable brokerName: {} {}", next.getKey(), next.getValue());
+                    }
+                }
+
+                {
+                    log.info("brokerLiveTable SIZE: {}", this.brokerLiveTable.size());
+                    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
+                    while (it.hasNext()) {
+                        Entry<String, BrokerLiveInfo> next = it.next();
+                        log.info("brokerLiveTable brokerAddr: {} {}", next.getKey(), next.getValue());
+                    }
+                }
+
+                {
+                    log.info("clusterAddrTable SIZE: {}", this.clusterAddrTable.size());
+                    Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
+                    while (it.hasNext()) {
+                        Entry<String, Set<String>> next = it.next();
+                        log.info("clusterAddrTable clusterName: {} {}", next.getKey(), next.getValue());
+                    }
+                }
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        } catch (Exception e) {
+            log.error("printAllPeriodically Exception", e);
+        }
+    }
+
+
+    public byte[] getSystemTopicList() {
+        TopicList topicList = new TopicList();
+        try {
+            try {
+                this.lock.readLock().lockInterruptibly();
+                for (Map.Entry<String, Set<String>> entry : clusterAddrTable.entrySet()) {
+                    topicList.getTopicList().add(entry.getKey());
+                    topicList.getTopicList().addAll(entry.getValue());
+                }
+
+                if (brokerAddrTable != null && !brokerAddrTable.isEmpty()) {
+                    Iterator<String> it = brokerAddrTable.keySet().iterator();
+                    while (it.hasNext()) {
+                        BrokerData bd = brokerAddrTable.get(it.next());
+                        HashMap<Long, String> brokerAddrs = bd.getBrokerAddrs();
+                        if (bd.getBrokerAddrs() != null && !bd.getBrokerAddrs().isEmpty()) {
+                            Iterator<Long> it2 = brokerAddrs.keySet().iterator();
+                            topicList.setBrokerAddr(brokerAddrs.get(it2.next()));
+                            break;
+                        }
+                    }
+                }
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        } catch (Exception e) {
+            log.error("getAllTopicList Exception", e);
+        }
+
+        return topicList.encode();
+    }
+
+    public byte[] getTopicsByCluster(String cluster) {
+        TopicList topicList = new TopicList();
+        try {
+            try {
+                this.lock.readLock().lockInterruptibly();
+                Set<String> brokerNameSet = this.clusterAddrTable.get(cluster);
+                for (String brokerName : brokerNameSet) {
+                    Iterator<Entry<String, List<QueueData>>> topicTableIt =
+                            this.topicQueueTable.entrySet().iterator();
+                    while (topicTableIt.hasNext()) {
+                        Entry<String, List<QueueData>> topicEntry = topicTableIt.next();
+                        String topic = topicEntry.getKey();
+                        List<QueueData> queueDatas = topicEntry.getValue();
+                        for (QueueData queueData : queueDatas) {
+                            if (brokerName.equals(queueData.getBrokerName())) {
+                                topicList.getTopicList().add(topic);
+                                break;
+                            }
+                        }
+                    }
+                }
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        } catch (Exception e) {
+            log.error("getAllTopicList Exception", e);
+        }
+
+        return topicList.encode();
+    }
+
+    public byte[] getUnitTopics() {
+        TopicList topicList = new TopicList();
+        try {
+            try {
+                this.lock.readLock().lockInterruptibly();
+                Iterator<Entry<String, List<QueueData>>> topicTableIt =
+                        this.topicQueueTable.entrySet().iterator();
+                while (topicTableIt.hasNext()) {
+                    Entry<String, List<QueueData>> topicEntry = topicTableIt.next();
+                    String topic = topicEntry.getKey();
+                    List<QueueData> queueDatas = topicEntry.getValue();
+                    if (queueDatas != null && queueDatas.size() > 0
+                            && TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())) {
+                        topicList.getTopicList().add(topic);
+                    }
+                }
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        } catch (Exception e) {
+            log.error("getAllTopicList Exception", e);
+        }
+
+        return topicList.encode();
+    }
+
+    public byte[] getHasUnitSubTopicList() {
+        TopicList topicList = new TopicList();
+        try {
+            try {
+                this.lock.readLock().lockInterruptibly();
+                Iterator<Entry<String, List<QueueData>>> topicTableIt =
+                        this.topicQueueTable.entrySet().iterator();
+                while (topicTableIt.hasNext()) {
+                    Entry<String, List<QueueData>> topicEntry = topicTableIt.next();
+                    String topic = topicEntry.getKey();
+                    List<QueueData> queueDatas = topicEntry.getValue();
+                    if (queueDatas != null && queueDatas.size() > 0
+                            && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) {
+                        topicList.getTopicList().add(topic);
+                    }
+                }
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        } catch (Exception e) {
+            log.error("getAllTopicList Exception", e);
+        }
+
+        return topicList.encode();
+    }
+
+    public byte[] getHasUnitSubUnUnitTopicList() {
+        TopicList topicList = new TopicList();
+        try {
+            try {
+                this.lock.readLock().lockInterruptibly();
+                Iterator<Entry<String, List<QueueData>>> topicTableIt =
+                        this.topicQueueTable.entrySet().iterator();
+                while (topicTableIt.hasNext()) {
+                    Entry<String, List<QueueData>> topicEntry = topicTableIt.next();
+                    String topic = topicEntry.getKey();
+                    List<QueueData> queueDatas = topicEntry.getValue();
+                    if (queueDatas != null && queueDatas.size() > 0
+                            && !TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())
+                            && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) {
+                        topicList.getTopicList().add(topic);
+                    }
+                }
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        } catch (Exception e) {
+            log.error("getAllTopicList Exception", e);
+        }
+
+        return topicList.encode();
+    }
+}
+
+
+class BrokerLiveInfo {
+    private long lastUpdateTimestamp;
+    private DataVersion dataVersion;
+    private Channel channel;
+    private String haServerAddr;
+
+
+    public BrokerLiveInfo(long lastUpdateTimestamp, DataVersion dataVersion, Channel channel,
+                          String haServerAddr) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+        this.dataVersion = dataVersion;
+        this.channel = channel;
+        this.haServerAddr = haServerAddr;
+    }
+
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+
+    public void setDataVersion(DataVersion dataVersion) {
+        this.dataVersion = dataVersion;
+    }
+
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+
+    public void setChannel(Channel channel) {
+        this.channel = channel;
+    }
+
+
+    public String getHaServerAddr() {
+        return haServerAddr;
+    }
+
+
+    public void setHaServerAddr(String haServerAddr) {
+        this.haServerAddr = haServerAddr;
+    }
+
+
+    @Override
+    public String toString() {
+        return "BrokerLiveInfo [lastUpdateTimestamp=" + lastUpdateTimestamp + ", dataVersion=" + dataVersion
+                + ", channel=" + channel + ", haServerAddr=" + haServerAddr + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/pom.xml b/rocketmq-remoting/pom.xml
new file mode 100644
index 0000000..b229597
--- /dev/null
+++ b/rocketmq-remoting/pom.xml
@@ -0,0 +1,51 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain producerGroup copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>com.alibaba.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-remoting</artifactId>
+    <name>rocketmq-remoting ${project.version}</name>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/ChannelEventListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/ChannelEventListener.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/ChannelEventListener.java
new file mode 100644
index 0000000..eff9551
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/ChannelEventListener.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting;
+
+import io.netty.channel.Channel;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public interface ChannelEventListener {
+    void onChannelConnect(final String remoteAddr, final Channel channel);
+
+
+    void onChannelClose(final String remoteAddr, final Channel channel);
+
+
+    void onChannelException(final String remoteAddr, final Channel channel);
+
+
+    void onChannelIdle(final String remoteAddr, final Channel channel);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/CommandCustomHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/CommandCustomHeader.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/CommandCustomHeader.java
new file mode 100644
index 0000000..7e0bd8c
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/CommandCustomHeader.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface CommandCustomHeader {
+    void checkFields() throws RemotingCommandException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/InvokeCallback.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/InvokeCallback.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/InvokeCallback.java
new file mode 100644
index 0000000..6ba27e1
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/InvokeCallback.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.netty.ResponseFuture;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public interface InvokeCallback {
+    public void operationComplete(final ResponseFuture responseFuture);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java
new file mode 100644
index 0000000..cc2d594
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+
+
+public interface RPCHook {
+    void doBeforeRequest(final String remoteAddr, final RemotingCommand request);
+
+
+    void doAfterResponse(final String remoteAddr, final RemotingCommand request,
+                         final RemotingCommand response);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingClient.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingClient.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingClient.java
new file mode 100644
index 0000000..ad8c0be
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingClient.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public interface RemotingClient extends RemotingService {
+
+    public void updateNameServerAddressList(final List<String> addrs);
+
+
+    public List<String> getNameServerAddressList();
+
+
+    public RemotingCommand invokeSync(final String addr, final RemotingCommand request,
+                                      final long timeoutMillis) throws InterruptedException, RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException;
+
+
+    public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
+                            final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
+            RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
+
+
+    public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
+            throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
+            RemotingTimeoutException, RemotingSendRequestException;
+
+
+    public void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
+                                  final ExecutorService executor);
+
+
+    public boolean isChannelWriteable(final String addr);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingServer.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingServer.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingServer.java
new file mode 100644
index 0000000..ae84c1b
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingServer.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.common.Pair;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.Channel;
+
+import java.util.concurrent.ExecutorService;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public interface RemotingServer extends RemotingService {
+
+    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
+                           final ExecutorService executor);
+
+
+    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
+
+
+    int localListenPort();
+
+
+    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
+
+
+    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
+                               final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
+            RemotingTimeoutException;
+
+
+    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
+                     final InvokeCallback invokeCallback) throws InterruptedException,
+            RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
+
+
+    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
+            throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
+            RemotingSendRequestException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingService.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingService.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingService.java
new file mode 100644
index 0000000..cddac3e
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingService.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting;
+
+public interface RemotingService {
+    void start();
+
+
+    void shutdown();
+
+
+    void registerRPCHook(RPCHook rpcHook);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNotNull.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNotNull.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNotNull.java
new file mode 100644
index 0000000..4ca077d
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNotNull.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * @author shijia.wxr
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE})
+public @interface CFNotNull {
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNullable.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNullable.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNullable.java
new file mode 100644
index 0000000..1318854
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNullable.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * @author shijia.wxr
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE})
+public @interface CFNullable {
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/Pair.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/Pair.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/Pair.java
new file mode 100644
index 0000000..091224e
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/Pair.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.common;
+
+/**
+ * @author shijia.wxr
+ */
+public class Pair<T1, T2> {
+    private T1 object1;
+    private T2 object2;
+
+
+    public Pair(T1 object1, T2 object2) {
+        this.object1 = object1;
+        this.object2 = object2;
+    }
+
+
+    public T1 getObject1() {
+        return object1;
+    }
+
+
+    public void setObject1(T1 object1) {
+        this.object1 = object1;
+    }
+
+
+    public T2 getObject2() {
+        return object2;
+    }
+
+
+    public void setObject2(T2 object2) {
+        this.object2 = object2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingHelper.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingHelper.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingHelper.java
new file mode 100644
index 0000000..9dcdd83
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingHelper.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.common;
+
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.Channel;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingHelper {
+    public static final String ROCKETMQ_REMOTING = "RocketmqRemoting";
+    public static final String DEFAULT_CHARSET = "UTF-8";
+
+    public static String exceptionSimpleDesc(final Throwable e) {
+        StringBuffer sb = new StringBuffer();
+        if (e != null) {
+            sb.append(e.toString());
+
+            StackTraceElement[] stackTrace = e.getStackTrace();
+            if (stackTrace != null && stackTrace.length > 0) {
+                StackTraceElement elment = stackTrace[0];
+                sb.append(", ");
+                sb.append(elment.toString());
+            }
+        }
+
+        return sb.toString();
+    }
+
+    public static SocketAddress string2SocketAddress(final String addr) {
+        String[] s = addr.split(":");
+        InetSocketAddress isa = new InetSocketAddress(s[0], Integer.parseInt(s[1]));
+        return isa;
+    }
+
+    public static RemotingCommand invokeSync(final String addr, final RemotingCommand request,
+                                             final long timeoutMillis) throws InterruptedException, RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException {
+        long beginTime = System.currentTimeMillis();
+        SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
+        SocketChannel socketChannel = RemotingUtil.connect(socketAddress);
+        if (socketChannel != null) {
+            boolean sendRequestOK = false;
+
+            try {
+
+                socketChannel.configureBlocking(true);
+
+                //bugfix  http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802
+                socketChannel.socket().setSoTimeout((int) timeoutMillis);
+
+
+                ByteBuffer byteBufferRequest = request.encode();
+                while (byteBufferRequest.hasRemaining()) {
+                    int length = socketChannel.write(byteBufferRequest);
+                    if (length > 0) {
+                        if (byteBufferRequest.hasRemaining()) {
+                            if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
+
+                                throw new RemotingSendRequestException(addr);
+                            }
+                        }
+                    } else {
+                        throw new RemotingSendRequestException(addr);
+                    }
+
+
+                    Thread.sleep(1);
+                }
+
+                sendRequestOK = true;
+
+                ByteBuffer byteBufferSize = ByteBuffer.allocate(4);
+                while (byteBufferSize.hasRemaining()) {
+                    int length = socketChannel.read(byteBufferSize);
+                    if (length > 0) {
+                        if (byteBufferSize.hasRemaining()) {
+                            if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
+
+                                throw new RemotingTimeoutException(addr, timeoutMillis);
+                            }
+                        }
+                    } else {
+                        throw new RemotingTimeoutException(addr, timeoutMillis);
+                    }
+
+
+                    Thread.sleep(1);
+                }
+
+                int size = byteBufferSize.getInt(0);
+                ByteBuffer byteBufferBody = ByteBuffer.allocate(size);
+                while (byteBufferBody.hasRemaining()) {
+                    int length = socketChannel.read(byteBufferBody);
+                    if (length > 0) {
+                        if (byteBufferBody.hasRemaining()) {
+                            if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
+
+                                throw new RemotingTimeoutException(addr, timeoutMillis);
+                            }
+                        }
+                    } else {
+                        throw new RemotingTimeoutException(addr, timeoutMillis);
+                    }
+
+
+                    Thread.sleep(1);
+                }
+
+
+                byteBufferBody.flip();
+                return RemotingCommand.decode(byteBufferBody);
+            } catch (IOException e) {
+                e.printStackTrace();
+
+                if (sendRequestOK) {
+                    throw new RemotingTimeoutException(addr, timeoutMillis);
+                } else {
+                    throw new RemotingSendRequestException(addr);
+                }
+            } finally {
+                try {
+                    socketChannel.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        } else {
+            throw new RemotingConnectException(addr);
+        }
+    }
+
+
+    public static String parseChannelRemoteAddr(final Channel channel) {
+        if (null == channel) {
+            return "";
+        }
+        SocketAddress remote = channel.remoteAddress();
+        final String addr = remote != null ? remote.toString() : "";
+
+        if (addr.length() > 0) {
+            int index = addr.lastIndexOf("/");
+            if (index >= 0) {
+                return addr.substring(index + 1);
+            }
+
+            return addr;
+        }
+
+        return "";
+    }
+
+
+    public static String parseChannelRemoteName(final Channel channel) {
+        if (null == channel) {
+            return "";
+        }
+        final InetSocketAddress remote = (InetSocketAddress) channel.remoteAddress();
+        if (remote != null) {
+            return remote.getAddress().getHostName();
+        }
+        return "";
+    }
+
+
+    public static String parseSocketAddressAddr(SocketAddress socketAddress) {
+        if (socketAddress != null) {
+            final String addr = socketAddress.toString();
+
+            if (addr.length() > 0) {
+                return addr.substring(1);
+            }
+        }
+        return "";
+    }
+
+
+    public static String parseSocketAddressName(SocketAddress socketAddress) {
+
+        final InetSocketAddress addrs = (InetSocketAddress) socketAddress;
+        if (addrs != null) {
+            return addrs.getAddress().getHostName();
+        }
+        return "";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingUtil.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingUtil.java
new file mode 100644
index 0000000..af2348f
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingUtil.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.common;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.Enumeration;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingUtil {
+    public static final String OS_NAME = System.getProperty("os.name");
+
+    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+    private static boolean isLinuxPlatform = false;
+    private static boolean isWindowsPlatform = false;
+
+    static {
+        if (OS_NAME != null && OS_NAME.toLowerCase().indexOf("linux") >= 0) {
+            isLinuxPlatform = true;
+        }
+
+        if (OS_NAME != null && OS_NAME.toLowerCase().indexOf("windows") >= 0) {
+            isWindowsPlatform = true;
+        }
+    }
+
+    public static boolean isWindowsPlatform() {
+        return isWindowsPlatform;
+    }
+
+    public static Selector openSelector() throws IOException {
+        Selector result = null;
+
+        if (isLinuxPlatform()) {
+            try {
+                final Class<?> providerClazz = Class.forName("sun.nio.ch.EPollSelectorProvider");
+                if (providerClazz != null) {
+                    try {
+                        final Method method = providerClazz.getMethod("provider");
+                        if (method != null) {
+                            final SelectorProvider selectorProvider = (SelectorProvider) method.invoke(null);
+                            if (selectorProvider != null) {
+                                result = selectorProvider.openSelector();
+                            }
+                        }
+                    } catch (final Exception e) {
+                        log.warn("Open ePoll Selector for linux platform exception", e);
+                    }
+                }
+            } catch (final Exception e) {
+                // ignore
+            }
+        }
+
+        if (result == null) {
+            result = Selector.open();
+        }
+
+        return result;
+    }
+
+    public static boolean isLinuxPlatform() {
+        return isLinuxPlatform;
+    }
+
+    public static String getLocalAddress() {
+        try {
+            // Traversal Network interface to get the first non-loopback and non-private address
+            Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
+            ArrayList<String> ipv4Result = new ArrayList<String>();
+            ArrayList<String> ipv6Result = new ArrayList<String>();
+            while (enumeration.hasMoreElements()) {
+                final NetworkInterface networkInterface = enumeration.nextElement();
+                final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
+                while (en.hasMoreElements()) {
+                    final InetAddress address = en.nextElement();
+                    if (!address.isLoopbackAddress()) {
+                        if (address instanceof Inet6Address) {
+                            ipv6Result.add(normalizeHostAddress(address));
+                        } else {
+                            ipv4Result.add(normalizeHostAddress(address));
+                        }
+                    }
+                }
+            }
+
+            // prefer ipv4
+            if (!ipv4Result.isEmpty()) {
+                for (String ip : ipv4Result) {
+                    if (ip.startsWith("127.0") || ip.startsWith("192.168")) {
+                        continue;
+                    }
+
+                    return ip;
+                }
+
+                return ipv4Result.get(ipv4Result.size() - 1);
+            } else if (!ipv6Result.isEmpty()) {
+                return ipv6Result.get(0);
+            }
+            //If failed to find,fall back to localhost
+            final InetAddress localHost = InetAddress.getLocalHost();
+            return normalizeHostAddress(localHost);
+        } catch (SocketException e) {
+            e.printStackTrace();
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+
+        return null;
+    }
+
+
+    public static String normalizeHostAddress(final InetAddress localHost) {
+        if (localHost instanceof Inet6Address) {
+            return "[" + localHost.getHostAddress() + "]";
+        } else {
+            return localHost.getHostAddress();
+        }
+    }
+
+    public static SocketAddress string2SocketAddress(final String addr) {
+        String[] s = addr.split(":");
+        InetSocketAddress isa = new InetSocketAddress(s[0], Integer.parseInt(s[1]));
+        return isa;
+    }
+
+
+    public static String socketAddress2String(final SocketAddress addr) {
+        StringBuilder sb = new StringBuilder();
+        InetSocketAddress inetSocketAddress = (InetSocketAddress) addr;
+        sb.append(inetSocketAddress.getAddress().getHostAddress());
+        sb.append(":");
+        sb.append(inetSocketAddress.getPort());
+        return sb.toString();
+    }
+
+
+    public static SocketChannel connect(SocketAddress remote) {
+        return connect(remote, 1000 * 5);
+    }
+
+
+    public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) {
+        SocketChannel sc = null;
+        try {
+            sc = SocketChannel.open();
+            sc.configureBlocking(true);
+            sc.socket().setSoLinger(false, -1);
+            sc.socket().setTcpNoDelay(true);
+            sc.socket().setReceiveBufferSize(1024 * 64);
+            sc.socket().setSendBufferSize(1024 * 64);
+            sc.socket().connect(remote, timeoutMillis);
+            sc.configureBlocking(false);
+            return sc;
+        } catch (Exception e) {
+            if (sc != null) {
+                try {
+                    sc.close();
+                } catch (IOException e1) {
+                    e1.printStackTrace();
+                }
+            }
+        }
+
+        return null;
+    }
+
+
+    public static void closeChannel(Channel channel) {
+        final String addrRemote = RemotingHelper.parseChannelRemoteAddr(channel);
+        channel.close().addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                log.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote,
+                        future.isSuccess());
+            }
+        });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
new file mode 100644
index 0000000..c24e8b3
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.common;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SemaphoreReleaseOnlyOnce {
+    private final AtomicBoolean released = new AtomicBoolean(false);
+    private final Semaphore semaphore;
+
+
+    public SemaphoreReleaseOnlyOnce(Semaphore semaphore) {
+        this.semaphore = semaphore;
+    }
+
+
+    public void release() {
+        if (this.semaphore != null) {
+            if (this.released.compareAndSet(false, true)) {
+                this.semaphore.release();
+            }
+        }
+    }
+
+
+    public Semaphore getSemaphore() {
+        return semaphore;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/ServiceThread.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/ServiceThread.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/ServiceThread.java
new file mode 100644
index 0000000..365c670
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/ServiceThread.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base class for background thread
+ *
+ * @author shijia.wxr
+ *
+ */
+public abstract class ServiceThread implements Runnable {
+    private static final Logger STLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+    private static final long JOIN_TIME = 90 * 1000;
+    protected final Thread thread;
+    protected volatile boolean hasNotified = false;
+    protected volatile boolean stopped = false;
+
+
+    public ServiceThread() {
+        this.thread = new Thread(this, this.getServiceName());
+    }
+
+
+    public abstract String getServiceName();
+
+
+    public void start() {
+        this.thread.start();
+    }
+
+
+    public void shutdown() {
+        this.shutdown(false);
+    }
+
+    public void shutdown(final boolean interrupt) {
+        this.stopped = true;
+        STLOG.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
+        synchronized (this) {
+            if (!this.hasNotified) {
+                this.hasNotified = true;
+                this.notify();
+            }
+        }
+
+        try {
+            if (interrupt) {
+                this.thread.interrupt();
+            }
+
+            long beginTime = System.currentTimeMillis();
+            this.thread.join(this.getJointime());
+            long eclipseTime = System.currentTimeMillis() - beginTime;
+            STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
+                    + this.getJointime());
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public long getJointime() {
+        return JOIN_TIME;
+    }
+
+    public void stop() {
+        this.stop(false);
+    }
+
+    public void stop(final boolean interrupt) {
+        this.stopped = true;
+        STLOG.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
+        synchronized (this) {
+            if (!this.hasNotified) {
+                this.hasNotified = true;
+                this.notify();
+            }
+        }
+
+        if (interrupt) {
+            this.thread.interrupt();
+        }
+    }
+
+    public void makeStop() {
+        this.stopped = true;
+        STLOG.info("makestop thread " + this.getServiceName());
+    }
+
+    public void wakeup() {
+        synchronized (this) {
+            if (!this.hasNotified) {
+                this.hasNotified = true;
+                this.notify();
+            }
+        }
+    }
+
+    protected void waitForRunning(long interval) {
+        synchronized (this) {
+            if (this.hasNotified) {
+                this.hasNotified = false;
+                this.onWaitEnd();
+                return;
+            }
+
+            try {
+                this.wait(interval);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } finally {
+                this.hasNotified = false;
+                this.onWaitEnd();
+            }
+        }
+    }
+
+    protected void onWaitEnd() {
+    }
+
+    public boolean isStopped() {
+        return stopped;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingCommandException.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingCommandException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingCommandException.java
new file mode 100644
index 0000000..fe5cab9
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingCommandException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.exception;
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingCommandException extends RemotingException {
+    private static final long serialVersionUID = -6061365915274953096L;
+
+
+    public RemotingCommandException(String message) {
+        super(message, null);
+    }
+
+
+    public RemotingCommandException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingConnectException.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingConnectException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingConnectException.java
new file mode 100644
index 0000000..5c546bd
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingConnectException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.exception;
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingConnectException extends RemotingException {
+    private static final long serialVersionUID = -5565366231695911316L;
+
+
+    public RemotingConnectException(String addr) {
+        this(addr, null);
+    }
+
+
+    public RemotingConnectException(String addr, Throwable cause) {
+        super("connect to <" + addr + "> failed", cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingException.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingException.java
new file mode 100644
index 0000000..2c4b672
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.exception;
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingException extends Exception {
+    private static final long serialVersionUID = -5690687334570505110L;
+
+
+    public RemotingException(String message) {
+        super(message);
+    }
+
+
+    public RemotingException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingSendRequestException.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingSendRequestException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingSendRequestException.java
new file mode 100644
index 0000000..e29e1a2
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingSendRequestException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.exception;
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingSendRequestException extends RemotingException {
+    private static final long serialVersionUID = 5391285827332471674L;
+
+
+    public RemotingSendRequestException(String addr) {
+        this(addr, null);
+    }
+
+
+    public RemotingSendRequestException(String addr, Throwable cause) {
+        super("send request to <" + addr + "> failed", cause);
+    }
+}