You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:32 UTC
[15/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java
new file mode 100644
index 0000000..18450c6
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -0,0 +1,815 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv.routeinfo;
+
+import com.alibaba.rocketmq.common.DataVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.constant.PermName;
+import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
+import com.alibaba.rocketmq.common.protocol.body.ClusterInfo;
+import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import com.alibaba.rocketmq.common.protocol.body.TopicList;
+import com.alibaba.rocketmq.common.protocol.route.BrokerData;
+import com.alibaba.rocketmq.common.protocol.route.QueueData;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RouteInfoManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+ private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
+ private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
+ private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
+ private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
+ private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
+
+
+ public RouteInfoManager() {
+ this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
+ this.brokerAddrTable = new HashMap<String, BrokerData>(128);
+ this.clusterAddrTable = new HashMap<String, Set<String>>(32);
+ this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
+ this.filterServerTable = new HashMap<String, List<String>>(256);
+ }
+
+ public byte[] getAllClusterInfo() {
+ ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();
+ clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);
+ clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);
+ return clusterInfoSerializeWrapper.encode();
+ }
+
+ public void deleteTopic(final String topic) {
+ try {
+ try {
+ this.lock.writeLock().lockInterruptibly();
+ this.topicQueueTable.remove(topic);
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("deleteTopic Exception", e);
+ }
+ }
+
+ public byte[] getAllTopicList() {
+ TopicList topicList = new TopicList();
+ try {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ topicList.getTopicList().addAll(this.topicQueueTable.keySet());
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("getAllTopicList Exception", e);
+ }
+
+ return topicList.encode();
+ }
+
+ public RegisterBrokerResult registerBroker(
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final String haServerAddr,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final List<String> filterServerList,
+ final Channel channel) {
+ RegisterBrokerResult result = new RegisterBrokerResult();
+ try {
+ try {
+ this.lock.writeLock().lockInterruptibly();
+
+
+ Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
+ if (null == brokerNames) {
+ brokerNames = new HashSet<String>();
+ this.clusterAddrTable.put(clusterName, brokerNames);
+ }
+ brokerNames.add(brokerName);
+
+ boolean registerFirst = false;
+
+
+ BrokerData brokerData = this.brokerAddrTable.get(brokerName);
+ if (null == brokerData) {
+ registerFirst = true;
+ brokerData = new BrokerData();
+ brokerData.setBrokerName(brokerName);
+ HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
+ brokerData.setBrokerAddrs(brokerAddrs);
+
+ this.brokerAddrTable.put(brokerName, brokerData);
+ }
+ String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
+ registerFirst = registerFirst || (null == oldAddr);
+
+
+ if (null != topicConfigWrapper //
+ && MixAll.MASTER_ID == brokerId) {
+ if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
+ || registerFirst) {
+ ConcurrentHashMap<String, TopicConfig> tcTable =
+ topicConfigWrapper.getTopicConfigTable();
+ if (tcTable != null) {
+ for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
+ this.createAndUpdateQueueData(brokerName, entry.getValue());
+ }
+ }
+ }
+ }
+
+
+ BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
+ new BrokerLiveInfo(
+ System.currentTimeMillis(),
+ topicConfigWrapper.getDataVersion(),
+ channel,
+ haServerAddr));
+ if (null == prevBrokerLiveInfo) {
+ log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
+ }
+
+
+ if (filterServerList != null) {
+ if (filterServerList.isEmpty()) {
+ this.filterServerTable.remove(brokerAddr);
+ } else {
+ this.filterServerTable.put(brokerAddr, filterServerList);
+ }
+ }
+
+
+ if (MixAll.MASTER_ID != brokerId) {
+ String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+ if (masterAddr != null) {
+ BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
+ if (brokerLiveInfo != null) {
+ result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
+ result.setMasterAddr(masterAddr);
+ }
+ }
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("registerBroker Exception", e);
+ }
+
+ return result;
+ }
+
+ private boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {
+ BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);
+ if (null == prev || !prev.getDataVersion().equals(dataVersion)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
+ QueueData queueData = new QueueData();
+ queueData.setBrokerName(brokerName);
+ queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
+ queueData.setReadQueueNums(topicConfig.getReadQueueNums());
+ queueData.setPerm(topicConfig.getPerm());
+ queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
+
+ List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
+ if (null == queueDataList) {
+ queueDataList = new LinkedList<QueueData>();
+ queueDataList.add(queueData);
+ this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
+ log.info("new topic registerd, {} {}", topicConfig.getTopicName(), queueData);
+ } else {
+ boolean addNewOne = true;
+
+ Iterator<QueueData> it = queueDataList.iterator();
+ while (it.hasNext()) {
+ QueueData qd = it.next();
+ if (qd.getBrokerName().equals(brokerName)) {
+ if (qd.equals(queueData)) {
+ addNewOne = false;
+ } else {
+ log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
+ queueData);
+ it.remove();
+ }
+ }
+ }
+
+ if (addNewOne) {
+ queueDataList.add(queueData);
+ }
+ }
+ }
+
+ public int wipeWritePermOfBrokerByLock(final String brokerName) {
+ try {
+ try {
+ this.lock.writeLock().lockInterruptibly();
+ return wipeWritePermOfBroker(brokerName);
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("wipeWritePermOfBrokerByLock Exception", e);
+ }
+
+ return 0;
+ }
+
+ private int wipeWritePermOfBroker(final String brokerName) {
+ int wipeTopicCnt = 0;
+ Iterator<Entry<String, List<QueueData>>> itTopic = this.topicQueueTable.entrySet().iterator();
+ while (itTopic.hasNext()) {
+ Entry<String, List<QueueData>> entry = itTopic.next();
+ List<QueueData> qdList = entry.getValue();
+
+ Iterator<QueueData> it = qdList.iterator();
+ while (it.hasNext()) {
+ QueueData qd = it.next();
+ if (qd.getBrokerName().equals(brokerName)) {
+ int perm = qd.getPerm();
+ perm &= ~PermName.PERM_WRITE;
+ qd.setPerm(perm);
+ wipeTopicCnt++;
+ }
+ }
+ }
+
+ return wipeTopicCnt;
+ }
+
+ public void unregisterBroker(
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId) {
+ try {
+ try {
+ this.lock.writeLock().lockInterruptibly();
+ BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.remove(brokerAddr);
+ if (brokerLiveInfo != null) {
+ log.info("unregisterBroker, remove from brokerLiveTable {}, {}",
+ brokerLiveInfo != null ? "OK" : "Failed",
+ brokerAddr
+ );
+ }
+
+ this.filterServerTable.remove(brokerAddr);
+
+ boolean removeBrokerName = false;
+ BrokerData brokerData = this.brokerAddrTable.get(brokerName);
+ if (null != brokerData) {
+ String addr = brokerData.getBrokerAddrs().remove(brokerId);
+ log.info("unregisterBroker, remove addr from brokerAddrTable {}, {}",
+ addr != null ? "OK" : "Failed",
+ brokerAddr
+ );
+
+ if (brokerData.getBrokerAddrs().isEmpty()) {
+ this.brokerAddrTable.remove(brokerName);
+ log.info("unregisterBroker, remove name from brokerAddrTable OK, {}",
+ brokerName
+ );
+
+ removeBrokerName = true;
+ }
+ }
+
+ if (removeBrokerName) {
+ Set<String> nameSet = this.clusterAddrTable.get(clusterName);
+ if (nameSet != null) {
+ boolean removed = nameSet.remove(brokerName);
+ log.info("unregisterBroker, remove name from clusterAddrTable {}, {}",
+ removed ? "OK" : "Failed",
+ brokerName);
+
+ if (nameSet.isEmpty()) {
+ this.clusterAddrTable.remove(clusterName);
+ log.info("unregisterBroker, remove cluster from clusterAddrTable {}",
+ clusterName
+ );
+ }
+ }
+ this.removeTopicByBrokerName(brokerName);
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("unregisterBroker Exception", e);
+ }
+ }
+
+ private void removeTopicByBrokerName(final String brokerName) {
+ Iterator<Entry<String, List<QueueData>>> itMap = this.topicQueueTable.entrySet().iterator();
+ while (itMap.hasNext()) {
+ Entry<String, List<QueueData>> entry = itMap.next();
+
+ String topic = entry.getKey();
+ List<QueueData> queueDataList = entry.getValue();
+ Iterator<QueueData> it = queueDataList.iterator();
+ while (it.hasNext()) {
+ QueueData qd = it.next();
+ if (qd.getBrokerName().equals(brokerName)) {
+ log.info("removeTopicByBrokerName, remove one broker's topic {} {}", topic, qd);
+ it.remove();
+ }
+ }
+
+ if (queueDataList.isEmpty()) {
+ log.info("removeTopicByBrokerName, remove the topic all queue {}", topic);
+ itMap.remove();
+ }
+ }
+ }
+
+ public TopicRouteData pickupTopicRouteData(final String topic) {
+ TopicRouteData topicRouteData = new TopicRouteData();
+ boolean foundQueueData = false;
+ boolean foundBrokerData = false;
+ Set<String> brokerNameSet = new HashSet<String>();
+ List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
+ topicRouteData.setBrokerDatas(brokerDataList);
+
+ HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
+ topicRouteData.setFilterServerTable(filterServerMap);
+
+ try {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ List<QueueData> queueDataList = this.topicQueueTable.get(topic);
+ if (queueDataList != null) {
+ topicRouteData.setQueueDatas(queueDataList);
+ foundQueueData = true;
+
+
+ Iterator<QueueData> it = queueDataList.iterator();
+ while (it.hasNext()) {
+ QueueData qd = it.next();
+ brokerNameSet.add(qd.getBrokerName());
+ }
+
+ for (String brokerName : brokerNameSet) {
+ BrokerData brokerData = this.brokerAddrTable.get(brokerName);
+ if (null != brokerData) {
+ BrokerData brokerDataClone = new BrokerData();
+ brokerDataClone.setBrokerName(brokerData.getBrokerName());
+ brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData
+ .getBrokerAddrs().clone());
+ brokerDataList.add(brokerDataClone);
+ foundBrokerData = true;
+ for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
+ List<String> filterServerList = this.filterServerTable.get(brokerAddr);
+ filterServerMap.put(brokerAddr, filterServerList);
+ }
+ }
+ }
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("pickupTopicRouteData Exception", e);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
+ }
+
+ if (foundBrokerData && foundQueueData) {
+ return topicRouteData;
+ }
+
+ return null;
+ }
+
+ public void scanNotActiveBroker() {
+ Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, BrokerLiveInfo> next = it.next();
+ long last = next.getValue().getLastUpdateTimestamp();
+ if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
+ RemotingUtil.closeChannel(next.getValue().getChannel());
+ it.remove();
+ log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
+ this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
+ }
+ }
+ }
+
+ public void onChannelDestroy(String remoteAddr, Channel channel) {
+ String brokerAddrFound = null;
+ if (channel != null) {
+ try {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
+ this.brokerLiveTable.entrySet().iterator();
+ while (itBrokerLiveTable.hasNext()) {
+ Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
+ if (entry.getValue().getChannel() == channel) {
+ brokerAddrFound = entry.getKey();
+ break;
+ }
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("onChannelDestroy Exception", e);
+ }
+ }
+
+ if (null == brokerAddrFound) {
+ brokerAddrFound = remoteAddr;
+ } else {
+ log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
+ }
+
+
+ if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
+
+ try {
+ try {
+ this.lock.writeLock().lockInterruptibly();
+ this.brokerLiveTable.remove(brokerAddrFound);
+ this.filterServerTable.remove(brokerAddrFound);
+ String brokerNameFound = null;
+ boolean removeBrokerName = false;
+ Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
+ this.brokerAddrTable.entrySet().iterator();
+ while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
+ BrokerData brokerData = itBrokerAddrTable.next().getValue();
+
+ Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Long, String> entry = it.next();
+ Long brokerId = entry.getKey();
+ String brokerAddr = entry.getValue();
+ if (brokerAddr.equals(brokerAddrFound)) {
+ brokerNameFound = brokerData.getBrokerName();
+ it.remove();
+ log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
+ brokerId, brokerAddr);
+ break;
+ }
+ }
+
+ if (brokerData.getBrokerAddrs().isEmpty()) {
+ removeBrokerName = true;
+ itBrokerAddrTable.remove();
+ log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
+ brokerData.getBrokerName());
+ }
+ }
+
+ if (brokerNameFound != null && removeBrokerName) {
+ Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, Set<String>> entry = it.next();
+ String clusterName = entry.getKey();
+ Set<String> brokerNames = entry.getValue();
+ boolean removed = brokerNames.remove(brokerNameFound);
+ if (removed) {
+ log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
+ brokerNameFound, clusterName);
+
+
+ if (brokerNames.isEmpty()) {
+ log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
+ clusterName);
+ it.remove();
+ }
+
+ break;
+ }
+ }
+ }
+
+ if (removeBrokerName) {
+ Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
+ this.topicQueueTable.entrySet().iterator();
+ while (itTopicQueueTable.hasNext()) {
+ Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
+ String topic = entry.getKey();
+ List<QueueData> queueDataList = entry.getValue();
+
+ Iterator<QueueData> itQueueData = queueDataList.iterator();
+ while (itQueueData.hasNext()) {
+ QueueData queueData = itQueueData.next();
+ if (queueData.getBrokerName().equals(brokerNameFound)) {
+ itQueueData.remove();
+ log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
+ topic, queueData);
+ }
+ }
+
+ if (queueDataList.isEmpty()) {
+ itTopicQueueTable.remove();
+ log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
+ topic);
+ }
+ }
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("onChannelDestroy Exception", e);
+ }
+ }
+ }
+
+ public void printAllPeriodically() {
+ try {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ log.info("--------------------------------------------------------");
+ {
+ log.info("topicQueueTable SIZE: {}", this.topicQueueTable.size());
+ Iterator<Entry<String, List<QueueData>>> it = this.topicQueueTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, List<QueueData>> next = it.next();
+ log.info("topicQueueTable Topic: {} {}", next.getKey(), next.getValue());
+ }
+ }
+
+ {
+ log.info("brokerAddrTable SIZE: {}", this.brokerAddrTable.size());
+ Iterator<Entry<String, BrokerData>> it = this.brokerAddrTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, BrokerData> next = it.next();
+ log.info("brokerAddrTable brokerName: {} {}", next.getKey(), next.getValue());
+ }
+ }
+
+ {
+ log.info("brokerLiveTable SIZE: {}", this.brokerLiveTable.size());
+ Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, BrokerLiveInfo> next = it.next();
+ log.info("brokerLiveTable brokerAddr: {} {}", next.getKey(), next.getValue());
+ }
+ }
+
+ {
+ log.info("clusterAddrTable SIZE: {}", this.clusterAddrTable.size());
+ Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, Set<String>> next = it.next();
+ log.info("clusterAddrTable clusterName: {} {}", next.getKey(), next.getValue());
+ }
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("printAllPeriodically Exception", e);
+ }
+ }
+
+
+ public byte[] getSystemTopicList() {
+ TopicList topicList = new TopicList();
+ try {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ for (Map.Entry<String, Set<String>> entry : clusterAddrTable.entrySet()) {
+ topicList.getTopicList().add(entry.getKey());
+ topicList.getTopicList().addAll(entry.getValue());
+ }
+
+ if (brokerAddrTable != null && !brokerAddrTable.isEmpty()) {
+ Iterator<String> it = brokerAddrTable.keySet().iterator();
+ while (it.hasNext()) {
+ BrokerData bd = brokerAddrTable.get(it.next());
+ HashMap<Long, String> brokerAddrs = bd.getBrokerAddrs();
+ if (bd.getBrokerAddrs() != null && !bd.getBrokerAddrs().isEmpty()) {
+ Iterator<Long> it2 = brokerAddrs.keySet().iterator();
+ topicList.setBrokerAddr(brokerAddrs.get(it2.next()));
+ break;
+ }
+ }
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("getAllTopicList Exception", e);
+ }
+
+ return topicList.encode();
+ }
+
+ public byte[] getTopicsByCluster(String cluster) {
+ TopicList topicList = new TopicList();
+ try {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ Set<String> brokerNameSet = this.clusterAddrTable.get(cluster);
+ for (String brokerName : brokerNameSet) {
+ Iterator<Entry<String, List<QueueData>>> topicTableIt =
+ this.topicQueueTable.entrySet().iterator();
+ while (topicTableIt.hasNext()) {
+ Entry<String, List<QueueData>> topicEntry = topicTableIt.next();
+ String topic = topicEntry.getKey();
+ List<QueueData> queueDatas = topicEntry.getValue();
+ for (QueueData queueData : queueDatas) {
+ if (brokerName.equals(queueData.getBrokerName())) {
+ topicList.getTopicList().add(topic);
+ break;
+ }
+ }
+ }
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("getAllTopicList Exception", e);
+ }
+
+ return topicList.encode();
+ }
+
+ public byte[] getUnitTopics() {
+ TopicList topicList = new TopicList();
+ try {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ Iterator<Entry<String, List<QueueData>>> topicTableIt =
+ this.topicQueueTable.entrySet().iterator();
+ while (topicTableIt.hasNext()) {
+ Entry<String, List<QueueData>> topicEntry = topicTableIt.next();
+ String topic = topicEntry.getKey();
+ List<QueueData> queueDatas = topicEntry.getValue();
+ if (queueDatas != null && queueDatas.size() > 0
+ && TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())) {
+ topicList.getTopicList().add(topic);
+ }
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("getAllTopicList Exception", e);
+ }
+
+ return topicList.encode();
+ }
+
+ public byte[] getHasUnitSubTopicList() {
+ TopicList topicList = new TopicList();
+ try {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ Iterator<Entry<String, List<QueueData>>> topicTableIt =
+ this.topicQueueTable.entrySet().iterator();
+ while (topicTableIt.hasNext()) {
+ Entry<String, List<QueueData>> topicEntry = topicTableIt.next();
+ String topic = topicEntry.getKey();
+ List<QueueData> queueDatas = topicEntry.getValue();
+ if (queueDatas != null && queueDatas.size() > 0
+ && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) {
+ topicList.getTopicList().add(topic);
+ }
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("getAllTopicList Exception", e);
+ }
+
+ return topicList.encode();
+ }
+
+ public byte[] getHasUnitSubUnUnitTopicList() {
+ TopicList topicList = new TopicList();
+ try {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ Iterator<Entry<String, List<QueueData>>> topicTableIt =
+ this.topicQueueTable.entrySet().iterator();
+ while (topicTableIt.hasNext()) {
+ Entry<String, List<QueueData>> topicEntry = topicTableIt.next();
+ String topic = topicEntry.getKey();
+ List<QueueData> queueDatas = topicEntry.getValue();
+ if (queueDatas != null && queueDatas.size() > 0
+ && !TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())
+ && TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) {
+ topicList.getTopicList().add(topic);
+ }
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (Exception e) {
+ log.error("getAllTopicList Exception", e);
+ }
+
+ return topicList.encode();
+ }
+}
+
+
+class BrokerLiveInfo {
+ private long lastUpdateTimestamp;
+ private DataVersion dataVersion;
+ private Channel channel;
+ private String haServerAddr;
+
+
+ public BrokerLiveInfo(long lastUpdateTimestamp, DataVersion dataVersion, Channel channel,
+ String haServerAddr) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ this.dataVersion = dataVersion;
+ this.channel = channel;
+ this.haServerAddr = haServerAddr;
+ }
+
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+
+ public DataVersion getDataVersion() {
+ return dataVersion;
+ }
+
+
+ public void setDataVersion(DataVersion dataVersion) {
+ this.dataVersion = dataVersion;
+ }
+
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+
+ public void setChannel(Channel channel) {
+ this.channel = channel;
+ }
+
+
+ public String getHaServerAddr() {
+ return haServerAddr;
+ }
+
+
+ public void setHaServerAddr(String haServerAddr) {
+ this.haServerAddr = haServerAddr;
+ }
+
+
+ @Override
+ public String toString() {
+ return "BrokerLiveInfo [lastUpdateTimestamp=" + lastUpdateTimestamp + ", dataVersion=" + dataVersion
+ + ", channel=" + channel + ", haServerAddr=" + haServerAddr + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/pom.xml b/rocketmq-remoting/pom.xml
new file mode 100644
index 0000000..b229597
--- /dev/null
+++ b/rocketmq-remoting/pom.xml
@@ -0,0 +1,51 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain producerGroup copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>com.alibaba.rocketmq</groupId>
+ <artifactId>rocketmq-all</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>rocketmq-remoting</artifactId>
+ <name>rocketmq-remoting ${project.version}</name>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/ChannelEventListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/ChannelEventListener.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/ChannelEventListener.java
new file mode 100644
index 0000000..eff9551
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/ChannelEventListener.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting;
+
+import io.netty.channel.Channel;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public interface ChannelEventListener {
+ void onChannelConnect(final String remoteAddr, final Channel channel);
+
+
+ void onChannelClose(final String remoteAddr, final Channel channel);
+
+
+ void onChannelException(final String remoteAddr, final Channel channel);
+
+
+ void onChannelIdle(final String remoteAddr, final Channel channel);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/CommandCustomHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/CommandCustomHeader.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/CommandCustomHeader.java
new file mode 100644
index 0000000..7e0bd8c
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/CommandCustomHeader.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface CommandCustomHeader {
+ void checkFields() throws RemotingCommandException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/InvokeCallback.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/InvokeCallback.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/InvokeCallback.java
new file mode 100644
index 0000000..6ba27e1
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/InvokeCallback.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.netty.ResponseFuture;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public interface InvokeCallback {
+ public void operationComplete(final ResponseFuture responseFuture);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java
new file mode 100644
index 0000000..cc2d594
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RPCHook.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+
+
+public interface RPCHook {
+ void doBeforeRequest(final String remoteAddr, final RemotingCommand request);
+
+
+ void doAfterResponse(final String remoteAddr, final RemotingCommand request,
+ final RemotingCommand response);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingClient.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingClient.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingClient.java
new file mode 100644
index 0000000..ad8c0be
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingClient.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public interface RemotingClient extends RemotingService {
+
+ public void updateNameServerAddressList(final List<String> addrs);
+
+
+ public List<String> getNameServerAddressList();
+
+
+ public RemotingCommand invokeSync(final String addr, final RemotingCommand request,
+ final long timeoutMillis) throws InterruptedException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException;
+
+
+ public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
+ final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
+ RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
+
+
+ public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
+ throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
+ RemotingTimeoutException, RemotingSendRequestException;
+
+
+ public void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
+ final ExecutorService executor);
+
+
+ public boolean isChannelWriteable(final String addr);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingServer.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingServer.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingServer.java
new file mode 100644
index 0000000..ae84c1b
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingServer.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.common.Pair;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.Channel;
+
+import java.util.concurrent.ExecutorService;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public interface RemotingServer extends RemotingService {
+
+ void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
+ final ExecutorService executor);
+
+
+ void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
+
+
+ int localListenPort();
+
+
+ Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
+
+
+ RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
+ final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
+ RemotingTimeoutException;
+
+
+ void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
+ final InvokeCallback invokeCallback) throws InterruptedException,
+ RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
+
+
+ void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
+ throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
+ RemotingSendRequestException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingService.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingService.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingService.java
new file mode 100644
index 0000000..cddac3e
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/RemotingService.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting;
+
+public interface RemotingService {
+ void start();
+
+
+ void shutdown();
+
+
+ void registerRPCHook(RPCHook rpcHook);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNotNull.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNotNull.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNotNull.java
new file mode 100644
index 0000000..4ca077d
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNotNull.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * @author shijia.wxr
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE})
+public @interface CFNotNull {
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNullable.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNullable.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNullable.java
new file mode 100644
index 0000000..1318854
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/annotation/CFNullable.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * @author shijia.wxr
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE})
+public @interface CFNullable {
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/Pair.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/Pair.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/Pair.java
new file mode 100644
index 0000000..091224e
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/Pair.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.common;
+
+/**
+ * @author shijia.wxr
+ */
+public class Pair<T1, T2> {
+ private T1 object1;
+ private T2 object2;
+
+
+ public Pair(T1 object1, T2 object2) {
+ this.object1 = object1;
+ this.object2 = object2;
+ }
+
+
+ public T1 getObject1() {
+ return object1;
+ }
+
+
+ public void setObject1(T1 object1) {
+ this.object1 = object1;
+ }
+
+
+ public T2 getObject2() {
+ return object2;
+ }
+
+
+ public void setObject2(T2 object2) {
+ this.object2 = object2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingHelper.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingHelper.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingHelper.java
new file mode 100644
index 0000000..9dcdd83
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingHelper.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.common;
+
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.Channel;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingHelper {
+ public static final String ROCKETMQ_REMOTING = "RocketmqRemoting";
+ public static final String DEFAULT_CHARSET = "UTF-8";
+
+ public static String exceptionSimpleDesc(final Throwable e) {
+ StringBuffer sb = new StringBuffer();
+ if (e != null) {
+ sb.append(e.toString());
+
+ StackTraceElement[] stackTrace = e.getStackTrace();
+ if (stackTrace != null && stackTrace.length > 0) {
+ StackTraceElement elment = stackTrace[0];
+ sb.append(", ");
+ sb.append(elment.toString());
+ }
+ }
+
+ return sb.toString();
+ }
+
+ public static SocketAddress string2SocketAddress(final String addr) {
+ String[] s = addr.split(":");
+ InetSocketAddress isa = new InetSocketAddress(s[0], Integer.parseInt(s[1]));
+ return isa;
+ }
+
+ public static RemotingCommand invokeSync(final String addr, final RemotingCommand request,
+ final long timeoutMillis) throws InterruptedException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException {
+ long beginTime = System.currentTimeMillis();
+ SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
+ SocketChannel socketChannel = RemotingUtil.connect(socketAddress);
+ if (socketChannel != null) {
+ boolean sendRequestOK = false;
+
+ try {
+
+ socketChannel.configureBlocking(true);
+
+ //bugfix http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802
+ socketChannel.socket().setSoTimeout((int) timeoutMillis);
+
+
+ ByteBuffer byteBufferRequest = request.encode();
+ while (byteBufferRequest.hasRemaining()) {
+ int length = socketChannel.write(byteBufferRequest);
+ if (length > 0) {
+ if (byteBufferRequest.hasRemaining()) {
+ if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
+
+ throw new RemotingSendRequestException(addr);
+ }
+ }
+ } else {
+ throw new RemotingSendRequestException(addr);
+ }
+
+
+ Thread.sleep(1);
+ }
+
+ sendRequestOK = true;
+
+ ByteBuffer byteBufferSize = ByteBuffer.allocate(4);
+ while (byteBufferSize.hasRemaining()) {
+ int length = socketChannel.read(byteBufferSize);
+ if (length > 0) {
+ if (byteBufferSize.hasRemaining()) {
+ if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
+
+ throw new RemotingTimeoutException(addr, timeoutMillis);
+ }
+ }
+ } else {
+ throw new RemotingTimeoutException(addr, timeoutMillis);
+ }
+
+
+ Thread.sleep(1);
+ }
+
+ int size = byteBufferSize.getInt(0);
+ ByteBuffer byteBufferBody = ByteBuffer.allocate(size);
+ while (byteBufferBody.hasRemaining()) {
+ int length = socketChannel.read(byteBufferBody);
+ if (length > 0) {
+ if (byteBufferBody.hasRemaining()) {
+ if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
+
+ throw new RemotingTimeoutException(addr, timeoutMillis);
+ }
+ }
+ } else {
+ throw new RemotingTimeoutException(addr, timeoutMillis);
+ }
+
+
+ Thread.sleep(1);
+ }
+
+
+ byteBufferBody.flip();
+ return RemotingCommand.decode(byteBufferBody);
+ } catch (IOException e) {
+ e.printStackTrace();
+
+ if (sendRequestOK) {
+ throw new RemotingTimeoutException(addr, timeoutMillis);
+ } else {
+ throw new RemotingSendRequestException(addr);
+ }
+ } finally {
+ try {
+ socketChannel.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ } else {
+ throw new RemotingConnectException(addr);
+ }
+ }
+
+
+ public static String parseChannelRemoteAddr(final Channel channel) {
+ if (null == channel) {
+ return "";
+ }
+ SocketAddress remote = channel.remoteAddress();
+ final String addr = remote != null ? remote.toString() : "";
+
+ if (addr.length() > 0) {
+ int index = addr.lastIndexOf("/");
+ if (index >= 0) {
+ return addr.substring(index + 1);
+ }
+
+ return addr;
+ }
+
+ return "";
+ }
+
+
+ public static String parseChannelRemoteName(final Channel channel) {
+ if (null == channel) {
+ return "";
+ }
+ final InetSocketAddress remote = (InetSocketAddress) channel.remoteAddress();
+ if (remote != null) {
+ return remote.getAddress().getHostName();
+ }
+ return "";
+ }
+
+
+ public static String parseSocketAddressAddr(SocketAddress socketAddress) {
+ if (socketAddress != null) {
+ final String addr = socketAddress.toString();
+
+ if (addr.length() > 0) {
+ return addr.substring(1);
+ }
+ }
+ return "";
+ }
+
+
+ public static String parseSocketAddressName(SocketAddress socketAddress) {
+
+ final InetSocketAddress addrs = (InetSocketAddress) socketAddress;
+ if (addrs != null) {
+ return addrs.getAddress().getHostName();
+ }
+ return "";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingUtil.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingUtil.java
new file mode 100644
index 0000000..af2348f
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/RemotingUtil.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.common;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.Enumeration;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingUtil {
+ public static final String OS_NAME = System.getProperty("os.name");
+
+ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private static boolean isLinuxPlatform = false;
+ private static boolean isWindowsPlatform = false;
+
+ static {
+ if (OS_NAME != null && OS_NAME.toLowerCase().indexOf("linux") >= 0) {
+ isLinuxPlatform = true;
+ }
+
+ if (OS_NAME != null && OS_NAME.toLowerCase().indexOf("windows") >= 0) {
+ isWindowsPlatform = true;
+ }
+ }
+
+ public static boolean isWindowsPlatform() {
+ return isWindowsPlatform;
+ }
+
+ public static Selector openSelector() throws IOException {
+ Selector result = null;
+
+ if (isLinuxPlatform()) {
+ try {
+ final Class<?> providerClazz = Class.forName("sun.nio.ch.EPollSelectorProvider");
+ if (providerClazz != null) {
+ try {
+ final Method method = providerClazz.getMethod("provider");
+ if (method != null) {
+ final SelectorProvider selectorProvider = (SelectorProvider) method.invoke(null);
+ if (selectorProvider != null) {
+ result = selectorProvider.openSelector();
+ }
+ }
+ } catch (final Exception e) {
+ log.warn("Open ePoll Selector for linux platform exception", e);
+ }
+ }
+ } catch (final Exception e) {
+ // ignore
+ }
+ }
+
+ if (result == null) {
+ result = Selector.open();
+ }
+
+ return result;
+ }
+
+ public static boolean isLinuxPlatform() {
+ return isLinuxPlatform;
+ }
+
+ public static String getLocalAddress() {
+ try {
+ // Traversal Network interface to get the first non-loopback and non-private address
+ Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
+ ArrayList<String> ipv4Result = new ArrayList<String>();
+ ArrayList<String> ipv6Result = new ArrayList<String>();
+ while (enumeration.hasMoreElements()) {
+ final NetworkInterface networkInterface = enumeration.nextElement();
+ final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
+ while (en.hasMoreElements()) {
+ final InetAddress address = en.nextElement();
+ if (!address.isLoopbackAddress()) {
+ if (address instanceof Inet6Address) {
+ ipv6Result.add(normalizeHostAddress(address));
+ } else {
+ ipv4Result.add(normalizeHostAddress(address));
+ }
+ }
+ }
+ }
+
+ // prefer ipv4
+ if (!ipv4Result.isEmpty()) {
+ for (String ip : ipv4Result) {
+ if (ip.startsWith("127.0") || ip.startsWith("192.168")) {
+ continue;
+ }
+
+ return ip;
+ }
+
+ return ipv4Result.get(ipv4Result.size() - 1);
+ } else if (!ipv6Result.isEmpty()) {
+ return ipv6Result.get(0);
+ }
+ //If failed to find,fall back to localhost
+ final InetAddress localHost = InetAddress.getLocalHost();
+ return normalizeHostAddress(localHost);
+ } catch (SocketException e) {
+ e.printStackTrace();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ }
+
+
+ public static String normalizeHostAddress(final InetAddress localHost) {
+ if (localHost instanceof Inet6Address) {
+ return "[" + localHost.getHostAddress() + "]";
+ } else {
+ return localHost.getHostAddress();
+ }
+ }
+
+ public static SocketAddress string2SocketAddress(final String addr) {
+ String[] s = addr.split(":");
+ InetSocketAddress isa = new InetSocketAddress(s[0], Integer.parseInt(s[1]));
+ return isa;
+ }
+
+
+ public static String socketAddress2String(final SocketAddress addr) {
+ StringBuilder sb = new StringBuilder();
+ InetSocketAddress inetSocketAddress = (InetSocketAddress) addr;
+ sb.append(inetSocketAddress.getAddress().getHostAddress());
+ sb.append(":");
+ sb.append(inetSocketAddress.getPort());
+ return sb.toString();
+ }
+
+
+ public static SocketChannel connect(SocketAddress remote) {
+ return connect(remote, 1000 * 5);
+ }
+
+
+ public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) {
+ SocketChannel sc = null;
+ try {
+ sc = SocketChannel.open();
+ sc.configureBlocking(true);
+ sc.socket().setSoLinger(false, -1);
+ sc.socket().setTcpNoDelay(true);
+ sc.socket().setReceiveBufferSize(1024 * 64);
+ sc.socket().setSendBufferSize(1024 * 64);
+ sc.socket().connect(remote, timeoutMillis);
+ sc.configureBlocking(false);
+ return sc;
+ } catch (Exception e) {
+ if (sc != null) {
+ try {
+ sc.close();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+
+ return null;
+ }
+
+
+ public static void closeChannel(Channel channel) {
+ final String addrRemote = RemotingHelper.parseChannelRemoteAddr(channel);
+ channel.close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ log.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote,
+ future.isSuccess());
+ }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
new file mode 100644
index 0000000..c24e8b3
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/SemaphoreReleaseOnlyOnce.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.common;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SemaphoreReleaseOnlyOnce {
+ private final AtomicBoolean released = new AtomicBoolean(false);
+ private final Semaphore semaphore;
+
+
+ public SemaphoreReleaseOnlyOnce(Semaphore semaphore) {
+ this.semaphore = semaphore;
+ }
+
+
+ public void release() {
+ if (this.semaphore != null) {
+ if (this.released.compareAndSet(false, true)) {
+ this.semaphore.release();
+ }
+ }
+ }
+
+
+ public Semaphore getSemaphore() {
+ return semaphore;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/ServiceThread.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/ServiceThread.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/ServiceThread.java
new file mode 100644
index 0000000..365c670
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/common/ServiceThread.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base class for background thread
+ *
+ * @author shijia.wxr
+ *
+ */
+public abstract class ServiceThread implements Runnable {
+ private static final Logger STLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private static final long JOIN_TIME = 90 * 1000;
+ protected final Thread thread;
+ protected volatile boolean hasNotified = false;
+ protected volatile boolean stopped = false;
+
+
+ public ServiceThread() {
+ this.thread = new Thread(this, this.getServiceName());
+ }
+
+
+ public abstract String getServiceName();
+
+
+ public void start() {
+ this.thread.start();
+ }
+
+
+ public void shutdown() {
+ this.shutdown(false);
+ }
+
+ public void shutdown(final boolean interrupt) {
+ this.stopped = true;
+ STLOG.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
+ synchronized (this) {
+ if (!this.hasNotified) {
+ this.hasNotified = true;
+ this.notify();
+ }
+ }
+
+ try {
+ if (interrupt) {
+ this.thread.interrupt();
+ }
+
+ long beginTime = System.currentTimeMillis();
+ this.thread.join(this.getJointime());
+ long eclipseTime = System.currentTimeMillis() - beginTime;
+ STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
+ + this.getJointime());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public long getJointime() {
+ return JOIN_TIME;
+ }
+
+ public void stop() {
+ this.stop(false);
+ }
+
+ public void stop(final boolean interrupt) {
+ this.stopped = true;
+ STLOG.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
+ synchronized (this) {
+ if (!this.hasNotified) {
+ this.hasNotified = true;
+ this.notify();
+ }
+ }
+
+ if (interrupt) {
+ this.thread.interrupt();
+ }
+ }
+
+ public void makeStop() {
+ this.stopped = true;
+ STLOG.info("makestop thread " + this.getServiceName());
+ }
+
+ public void wakeup() {
+ synchronized (this) {
+ if (!this.hasNotified) {
+ this.hasNotified = true;
+ this.notify();
+ }
+ }
+ }
+
+ protected void waitForRunning(long interval) {
+ synchronized (this) {
+ if (this.hasNotified) {
+ this.hasNotified = false;
+ this.onWaitEnd();
+ return;
+ }
+
+ try {
+ this.wait(interval);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ this.hasNotified = false;
+ this.onWaitEnd();
+ }
+ }
+ }
+
+ protected void onWaitEnd() {
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingCommandException.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingCommandException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingCommandException.java
new file mode 100644
index 0000000..fe5cab9
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingCommandException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.exception;
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingCommandException extends RemotingException {
+ private static final long serialVersionUID = -6061365915274953096L;
+
+
+ public RemotingCommandException(String message) {
+ super(message, null);
+ }
+
+
+ public RemotingCommandException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingConnectException.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingConnectException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingConnectException.java
new file mode 100644
index 0000000..5c546bd
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingConnectException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.exception;
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingConnectException extends RemotingException {
+ private static final long serialVersionUID = -5565366231695911316L;
+
+
+ public RemotingConnectException(String addr) {
+ this(addr, null);
+ }
+
+
+ public RemotingConnectException(String addr, Throwable cause) {
+ super("connect to <" + addr + "> failed", cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingException.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingException.java
new file mode 100644
index 0000000..2c4b672
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.exception;
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingException extends Exception {
+ private static final long serialVersionUID = -5690687334570505110L;
+
+
+ public RemotingException(String message) {
+ super(message);
+ }
+
+
+ public RemotingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingSendRequestException.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingSendRequestException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingSendRequestException.java
new file mode 100644
index 0000000..e29e1a2
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingSendRequestException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.exception;
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingSendRequestException extends RemotingException {
+ private static final long serialVersionUID = 5391285827332471674L;
+
+
+ public RemotingSendRequestException(String addr) {
+ this(addr, null);
+ }
+
+
+ public RemotingSendRequestException(String addr, Throwable cause) {
+ super("send request to <" + addr + "> failed", cause);
+ }
+}