You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:13 UTC
[22/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
new file mode 100644
index 0000000..fcadd9a
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -0,0 +1,1147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.factory;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.admin.MQAdminExtInner;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.*;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.MQProducerInner;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.stat.ConsumerStatsManager;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.heartbeat.*;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.client.impl.consumer.*;
+import org.slf4j.Logger;
+
+import java.io.UnsupportedEncodingException;
+import java.net.DatagramSocket;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQClientInstance {
+ private final static long LOCK_TIMEOUT_MILLIS = 3000;
+ private final Logger log = ClientLogger.getLog();
+ private final ClientConfig clientConfig;
+ private final int instanceIndex;
+ private final String clientId;
+ private final long bootTimestamp = System.currentTimeMillis();
+ private final ConcurrentHashMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
+ private final ConcurrentHashMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
+ private final ConcurrentHashMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
+ private final NettyClientConfig nettyClientConfig;
+ private final MQClientAPIImpl mQClientAPIImpl;
+ private final MQAdminImpl mQAdminImpl;
+ private final ConcurrentHashMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
+ private final Lock lockNamesrv = new ReentrantLock();
+ private final Lock lockHeartbeat = new ReentrantLock();
+ private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
+ new ConcurrentHashMap<String, HashMap<Long, String>>();
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "MQClientFactoryScheduledThread");
+ }
+ });
+ private final ClientRemotingProcessor clientRemotingProcessor;
+ private final PullMessageService pullMessageService;
+ private final RebalanceService rebalanceService;
+ private final DefaultMQProducer defaultMQProducer;
+ private final ConsumerStatsManager consumerStatsManager;
+ private final AtomicLong storeTimesTotal = new AtomicLong(0);
+ private ServiceState serviceState = ServiceState.CREATE_JUST;
+ private DatagramSocket datagramSocket;
+ private Random random = new Random();
+
+
+ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId) {
+ this(clientConfig, instanceIndex, clientId, null);
+ }
+
+
+ public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
+ this.clientConfig = clientConfig;
+ this.instanceIndex = instanceIndex;
+ this.nettyClientConfig = new NettyClientConfig();
+ this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
+ this.clientRemotingProcessor = new ClientRemotingProcessor(this);
+ this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
+
+ if (this.clientConfig.getNamesrvAddr() != null) {
+ this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
+ log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
+ }
+
+ this.clientId = clientId;
+
+ this.mQAdminImpl = new MQAdminImpl(this);
+
+ this.pullMessageService = new PullMessageService(this);
+
+ this.rebalanceService = new RebalanceService(this);
+
+ this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
+ this.defaultMQProducer.resetClientConfig(clientConfig);
+
+ this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
+
+ log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", //
+ this.instanceIndex, //
+ this.clientId, //
+ this.clientConfig, //
+ MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
+ }
+
+ public void start() throws MQClientException {
+
+ synchronized (this) {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ this.serviceState = ServiceState.START_FAILED;
+ // If not specified,looking address from name server
+ if (null == this.clientConfig.getNamesrvAddr()) {
+ this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
+ }
+ // Start request-response channel
+ this.mQClientAPIImpl.start();
+ // Start various schedule tasks
+ this.startScheduledTask();
+ // Start pull service
+ this.pullMessageService.start();
+ // Start rebalance service
+ this.rebalanceService.start();
+ // Start push service
+ this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
+ log.info("the client factory [{}] start OK", this.clientId);
+ this.serviceState = ServiceState.RUNNING;
+ break;
+ case RUNNING:
+ break;
+ case SHUTDOWN_ALREADY:
+ break;
+ case START_FAILED:
+ throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
+ default:
+ break;
+ }
+ }
+ }
+
+
+ private void startScheduledTask() {
+ if (null == this.clientConfig.getNamesrvAddr()) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
+ } catch (Exception e) {
+ log.error("ScheduledTask fetchNameServerAddr exception", e);
+ }
+ }
+ }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
+ }
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ MQClientInstance.this.updateTopicRouteInfoFromNameServer();
+ } catch (Exception e) {
+ log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
+ }
+ }
+ }, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ MQClientInstance.this.cleanOfflineBroker();
+ MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
+ } catch (Exception e) {
+ log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
+ }
+ }
+ }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ MQClientInstance.this.persistAllConsumerOffset();
+ } catch (Exception e) {
+ log.error("ScheduledTask persistAllConsumerOffset exception", e);
+ }
+ }
+ }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ MQClientInstance.this.adjustThreadPool();
+ } catch (Exception e) {
+ log.error("ScheduledTask adjustThreadPool exception", e);
+ }
+ }
+ }, 1, 1, TimeUnit.MINUTES);
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void updateTopicRouteInfoFromNameServer() {
+ Set<String> topicList = new HashSet<String>();
+
+ // Consumer
+ {
+ Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, MQConsumerInner> entry = it.next();
+ MQConsumerInner impl = entry.getValue();
+ if (impl != null) {
+ Set<SubscriptionData> subList = impl.subscriptions();
+ if (subList != null) {
+ for (SubscriptionData subData : subList) {
+ topicList.add(subData.getTopic());
+ }
+ }
+ }
+ }
+ }
+
+ // Producer
+ {
+ Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, MQProducerInner> entry = it.next();
+ MQProducerInner impl = entry.getValue();
+ if (impl != null) {
+ Set<String> lst = impl.getPublishTopicList();
+ topicList.addAll(lst);
+ }
+ }
+ }
+
+ for (String topic : topicList) {
+ this.updateTopicRouteInfoFromNameServer(topic);
+ }
+ }
+
+ /**
+ * Remove offline broker
+ */
+ private void cleanOfflineBroker() {
+ try {
+ if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
+ try {
+ ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>();
+
+ Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();
+ while (itBrokerTable.hasNext()) {
+ Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();
+ String brokerName = entry.getKey();
+ HashMap<Long, String> oneTable = entry.getValue();
+
+ HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();
+ cloneAddrTable.putAll(oneTable);
+
+ Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Long, String> ee = it.next();
+ String addr = ee.getValue();
+ if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {
+ it.remove();
+ log.info("the broker addr[{} {}] is offline, remove it", brokerName, addr);
+ }
+ }
+
+ if (cloneAddrTable.isEmpty()) {
+ itBrokerTable.remove();
+ log.info("the broker[{}] name's host is offline, remove it", brokerName);
+ } else {
+ updatedTable.put(brokerName, cloneAddrTable);
+ }
+ }
+
+ if (!updatedTable.isEmpty()) {
+ this.brokerAddrTable.putAll(updatedTable);
+ }
+ } finally {
+ this.lockNamesrv.unlock();
+ }
+ } catch (InterruptedException e) {
+ log.warn("cleanOfflineBroker Exception", e);
+ }
+ }
+
+ public void sendHeartbeatToAllBrokerWithLock() {
+ if (this.lockHeartbeat.tryLock()) {
+ try {
+ this.sendHeartbeatToAllBroker();
+ this.uploadFilterClassSource();
+ } catch (final Exception e) {
+ log.error("sendHeartbeatToAllBroker exception", e);
+ } finally {
+ this.lockHeartbeat.unlock();
+ }
+ } else {
+ log.warn("lock heartBeat, but failed.");
+ }
+ }
+
+ private void persistAllConsumerOffset() {
+ Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, MQConsumerInner> entry = it.next();
+ MQConsumerInner impl = entry.getValue();
+ impl.persistConsumerOffset();
+ }
+ }
+
+ public void adjustThreadPool() {
+ Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, MQConsumerInner> entry = it.next();
+ MQConsumerInner impl = entry.getValue();
+ if (impl != null) {
+ try {
+ if (impl instanceof DefaultMQPushConsumerImpl) {
+ DefaultMQPushConsumerImpl dmq = (DefaultMQPushConsumerImpl) impl;
+ dmq.adjustThreadPool();
+ }
+ } catch (Exception e) {
+ }
+ }
+ }
+ }
+
+ public boolean updateTopicRouteInfoFromNameServer(final String topic) {
+ return updateTopicRouteInfoFromNameServer(topic, false, null);
+ }
+
+ private boolean isBrokerAddrExistInTopicRouteTable(final String addr) {
+ Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, TopicRouteData> entry = it.next();
+ TopicRouteData topicRouteData = entry.getValue();
+ List<BrokerData> bds = topicRouteData.getBrokerDatas();
+ for (BrokerData bd : bds) {
+ if (bd.getBrokerAddrs() != null) {
+ boolean exist = bd.getBrokerAddrs().containsValue(addr);
+ if (exist)
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ private void sendHeartbeatToAllBroker() {
+ final HeartbeatData heartbeatData = this.prepareHeartbeatData();
+ final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
+ final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
+ if (producerEmpty && consumerEmpty) {
+ log.warn("sending hearbeat, but no consumer and no producer");
+ return;
+ }
+
+ long times = this.storeTimesTotal.getAndIncrement();
+ Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, HashMap<Long, String>> entry = it.next();
+ String brokerName = entry.getKey();
+ HashMap<Long, String> oneTable = entry.getValue();
+ if (oneTable != null) {
+ for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
+ Long id = entry1.getKey();
+ String addr = entry1.getValue();
+ if (addr != null) {
+ if (consumerEmpty) {
+ if (id != MixAll.MASTER_ID)
+ continue;
+ }
+
+ try {
+ this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
+ if (times % 20 == 0) {
+ log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
+ log.info(heartbeatData.toString());
+ }
+ } catch (Exception e) {
+ if (this.isBrokerInNameServer(addr)) {
+ log.error("send heart beat to broker exception", e);
+ } else {
+ log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
+ id, addr);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void uploadFilterClassSource() {
+ Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, MQConsumerInner> next = it.next();
+ MQConsumerInner consumer = next.getValue();
+ if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
+ Set<SubscriptionData> subscriptions = consumer.subscriptions();
+ for (SubscriptionData sub : subscriptions) {
+ if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
+ final String consumerGroup = consumer.groupName();
+ final String className = sub.getSubString();
+ final String topic = sub.getTopic();
+ final String filterClassSource = sub.getFilterClassSource();
+ try {
+ this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
+ } catch (Exception e) {
+ log.error("uploadFilterClassToAllFilterServer Exception", e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
+ try {
+ if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ TopicRouteData topicRouteData;
+ if (isDefault && defaultMQProducer != null) {
+ topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
+ 1000 * 3);
+ if (topicRouteData != null) {
+ for (QueueData data : topicRouteData.getQueueDatas()) {
+ int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
+ data.setReadQueueNums(queueNums);
+ data.setWriteQueueNums(queueNums);
+ }
+ }
+ } else {
+ topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
+ }
+ if (topicRouteData != null) {
+ TopicRouteData old = this.topicRouteTable.get(topic);
+ boolean changed = topicRouteDataIsChange(old, topicRouteData);
+ if (!changed) {
+ changed = this.isNeedUpdateTopicRouteInfo(topic);
+ } else {
+ log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
+ }
+
+ if (changed) {
+ TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
+
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
+ }
+
+ // Update Pub info
+ {
+ TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
+ publishInfo.setHaveTopicRouterInfo(true);
+ Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, MQProducerInner> entry = it.next();
+ MQProducerInner impl = entry.getValue();
+ if (impl != null) {
+ impl.updateTopicPublishInfo(topic, publishInfo);
+ }
+ }
+ }
+
+ // Update sub info
+ {
+ Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
+ Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, MQConsumerInner> entry = it.next();
+ MQConsumerInner impl = entry.getValue();
+ if (impl != null) {
+ impl.updateTopicSubscribeInfo(topic, subscribeInfo);
+ }
+ }
+ }
+ log.info("topicRouteTable.put TopicRouteData[{}]", cloneTopicRouteData);
+ this.topicRouteTable.put(topic, cloneTopicRouteData);
+ return true;
+ }
+ } else {
+ log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
+ }
+ } catch (Exception e) {
+ if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
+ log.warn("updateTopicRouteInfoFromNameServer Exception", e);
+ }
+ } finally {
+ this.lockNamesrv.unlock();
+ }
+ } else {
+ log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+ } catch (InterruptedException e) {
+ log.warn("updateTopicRouteInfoFromNameServer Exception", e);
+ }
+
+ return false;
+ }
+
+ private HeartbeatData prepareHeartbeatData() {
+ HeartbeatData heartbeatData = new HeartbeatData();
+
+ // clientID
+ heartbeatData.setClientID(this.clientId);
+
+ // Consumer
+ for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
+ MQConsumerInner impl = entry.getValue();
+ if (impl != null) {
+ ConsumerData consumerData = new ConsumerData();
+ consumerData.setGroupName(impl.groupName());
+ consumerData.setConsumeType(impl.consumeType());
+ consumerData.setMessageModel(impl.messageModel());
+ consumerData.setConsumeFromWhere(impl.consumeFromWhere());
+ consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
+ consumerData.setUnitMode(impl.isUnitMode());
+
+ heartbeatData.getConsumerDataSet().add(consumerData);
+ }
+ }
+
+
+ // Producer
+ for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
+ MQProducerInner impl = entry.getValue();
+ if (impl != null) {
+ ProducerData producerData = new ProducerData();
+ producerData.setGroupName(entry.getKey());
+
+ heartbeatData.getProducerDataSet().add(producerData);
+ }
+ }
+
+ return heartbeatData;
+ }
+
+ private boolean isBrokerInNameServer(final String brokerAddr) {
+ Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, TopicRouteData> itNext = it.next();
+ List<BrokerData> brokerDatas = itNext.getValue().getBrokerDatas();
+ for (BrokerData bd : brokerDatas) {
+ boolean contain = bd.getBrokerAddrs().containsValue(brokerAddr);
+ if (contain)
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName, final String topic,
+ final String filterClassSource) throws UnsupportedEncodingException {
+ byte[] classBody = null;
+ int classCRC = 0;
+ try {
+ classBody = filterClassSource.getBytes(MixAll.DEFAULT_CHARSET);
+ classCRC = UtilAll.crc32(classBody);
+ } catch (Exception e1) {
+ log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", //
+ fullClassName, //
+ RemotingHelper.exceptionSimpleDesc(e1));
+ }
+
+ TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
+ if (topicRouteData != null //
+ && topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
+ Iterator<Entry<String, List<String>>> it = topicRouteData.getFilterServerTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, List<String>> next = it.next();
+ List<String> value = next.getValue();
+ for (final String fsAddr : value) {
+ try {
+ this.mQClientAPIImpl.registerMessageFilterClass(fsAddr, consumerGroup, topic, fullClassName, classCRC, classBody,
+ 5000);
+
+ log.info("register message class filter to {} OK, ConsumerGroup: {} Topic: {} ClassName: {}", fsAddr, consumerGroup,
+ topic, fullClassName);
+
+ } catch (Exception e) {
+ log.error("uploadFilterClassToAllFilterServer Exception", e);
+ }
+ }
+ }
+ } else {
+ log.warn("register message class filter failed, because no filter server, ConsumerGroup: {} Topic: {} ClassName: {}",
+ consumerGroup, topic, fullClassName);
+ }
+ }
+
+ private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {
+ if (olddata == null || nowdata == null)
+ return true;
+ TopicRouteData old = olddata.cloneTopicRouteData();
+ TopicRouteData now = nowdata.cloneTopicRouteData();
+ Collections.sort(old.getQueueDatas());
+ Collections.sort(old.getBrokerDatas());
+ Collections.sort(now.getQueueDatas());
+ Collections.sort(now.getBrokerDatas());
+ return !old.equals(now);
+
+ }
+
+ private boolean isNeedUpdateTopicRouteInfo(final String topic) {
+ boolean result = false;
+ {
+ Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
+ while (it.hasNext() && !result) {
+ Entry<String, MQProducerInner> entry = it.next();
+ MQProducerInner impl = entry.getValue();
+ if (impl != null) {
+ result = impl.isPublishTopicNeedUpdate(topic);
+ }
+ }
+ }
+
+ {
+ Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
+ while (it.hasNext() && !result) {
+ Entry<String, MQConsumerInner> entry = it.next();
+ MQConsumerInner impl = entry.getValue();
+ if (impl != null) {
+ result = impl.isSubscribeTopicNeedUpdate(topic);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
+ TopicPublishInfo info = new TopicPublishInfo();
+ info.setTopicRouteData(route);
+ if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
+ String[] brokers = route.getOrderTopicConf().split(";");
+ for (String broker : brokers) {
+ String[] item = broker.split(":");
+ int nums = Integer.parseInt(item[1]);
+ for (int i = 0; i < nums; i++) {
+ MessageQueue mq = new MessageQueue(topic, item[0], i);
+ info.getMessageQueueList().add(mq);
+ }
+ }
+
+ info.setOrderTopic(true);
+ } else {
+ List<QueueData> qds = route.getQueueDatas();
+ Collections.sort(qds);
+ for (QueueData qd : qds) {
+ if (PermName.isWriteable(qd.getPerm())) {
+ BrokerData brokerData = null;
+ for (BrokerData bd : route.getBrokerDatas()) {
+ if (bd.getBrokerName().equals(qd.getBrokerName())) {
+ brokerData = bd;
+ break;
+ }
+ }
+
+ if (null == brokerData) {
+ continue;
+ }
+
+ if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
+ continue;
+ }
+
+ for (int i = 0; i < qd.getWriteQueueNums(); i++) {
+ MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+ info.getMessageQueueList().add(mq);
+ }
+ }
+ }
+
+ info.setOrderTopic(false);
+ }
+
+ return info;
+ }
+
+ public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
+ Set<MessageQueue> mqList = new HashSet<MessageQueue>();
+ List<QueueData> qds = route.getQueueDatas();
+ for (QueueData qd : qds) {
+ if (PermName.isReadable(qd.getPerm())) {
+ for (int i = 0; i < qd.getReadQueueNums(); i++) {
+ MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+ mqList.add(mq);
+ }
+ }
+ }
+
+ return mqList;
+ }
+
+ public void shutdown() {
+ // Consumer
+ if (!this.consumerTable.isEmpty())
+ return;
+
+ // AdminExt
+ if (!this.adminExtTable.isEmpty())
+ return;
+
+ // Producer
+ if (this.producerTable.size() > 1)
+ return;
+
+ synchronized (this) {
+ switch (this.serviceState) {
+ case CREATE_JUST:
+ break;
+ case RUNNING:
+ this.defaultMQProducer.getDefaultMQProducerImpl().shutdown(false);
+
+ this.serviceState = ServiceState.SHUTDOWN_ALREADY;
+ this.pullMessageService.shutdown(true);
+ this.scheduledExecutorService.shutdown();
+ this.mQClientAPIImpl.shutdown();
+ this.rebalanceService.shutdown();
+
+ if (this.datagramSocket != null) {
+ this.datagramSocket.close();
+ this.datagramSocket = null;
+ }
+ MQClientManager.getInstance().removeClientFactory(this.clientId);
+ log.info("the client factory [{}] shutdown OK", this.clientId);
+ break;
+ case SHUTDOWN_ALREADY:
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ public boolean registerConsumer(final String group, final MQConsumerInner consumer) {
+ if (null == group || null == consumer) {
+ return false;
+ }
+
+ MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
+ if (prev != null) {
+ log.warn("the consumer group[" + group + "] exist already.");
+ return false;
+ }
+
+ return true;
+ }
+
+ public void unregisterConsumer(final String group) {
+ this.consumerTable.remove(group);
+ this.unregisterClientWithLock(null, group);
+ }
+
+ private void unregisterClientWithLock(final String producerGroup, final String consumerGroup) {
+ try {
+ if (this.lockHeartbeat.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ this.unregisterClient(producerGroup, consumerGroup);
+ } catch (Exception e) {
+ log.error("unregisterClient exception", e);
+ } finally {
+ this.lockHeartbeat.unlock();
+ }
+ } else {
+ log.warn("lock heartBeat, but failed.");
+ }
+ } catch (InterruptedException e) {
+ log.warn("unregisterClientWithLock exception", e);
+ }
+ }
+
+ private void unregisterClient(final String producerGroup, final String consumerGroup) {
+ Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, HashMap<Long, String>> entry = it.next();
+ String brokerName = entry.getKey();
+ HashMap<Long, String> oneTable = entry.getValue();
+
+ if (oneTable != null) {
+ for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
+ String addr = entry1.getValue();
+ if (addr != null) {
+ try {
+ this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000);
+ log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup,
+ consumerGroup, brokerName, entry1.getKey(), addr);
+ } catch (RemotingException e) {
+ log.error("unregister client exception from broker: " + addr, e);
+ } catch (MQBrokerException e) {
+ log.error("unregister client exception from broker: " + addr, e);
+ } catch (InterruptedException e) {
+ log.error("unregister client exception from broker: " + addr, e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
+ if (null == group || null == producer) {
+ return false;
+ }
+
+ MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
+ if (prev != null) {
+ log.warn("the producer group[{}] exist already.", group);
+ return false;
+ }
+
+ return true;
+ }
+
+ public void unregisterProducer(final String group) {
+ this.producerTable.remove(group);
+ this.unregisterClientWithLock(group, null);
+ }
+
+ public boolean registerAdminExt(final String group, final MQAdminExtInner admin) {
+ if (null == group || null == admin) {
+ return false;
+ }
+
+ MQAdminExtInner prev = this.adminExtTable.putIfAbsent(group, admin);
+ if (prev != null) {
+ log.warn("the admin group[{}] exist already.", group);
+ return false;
+ }
+
+ return true;
+ }
+
+ public void unregisterAdminExt(final String group) {
+ this.adminExtTable.remove(group);
+ }
+
+ public void rebalanceImmediately() {
+ this.rebalanceService.wakeup();
+ }
+
+ public void doRebalance() {
+ for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
+ MQConsumerInner impl = entry.getValue();
+ if (impl != null) {
+ try {
+ impl.doRebalance();
+ } catch (Throwable e) {
+ log.error("doRebalance exception", e);
+ }
+ }
+ }
+ }
+
+ public MQProducerInner selectProducer(final String group) {
+ return this.producerTable.get(group);
+ }
+
+ public MQConsumerInner selectConsumer(final String group) {
+ return this.consumerTable.get(group);
+ }
+
+ public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {
+ String brokerAddr = null;
+ boolean slave = false;
+ boolean found = false;
+
+ HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
+ if (map != null && !map.isEmpty()) {
+ FOR_SEG:
+ for (Map.Entry<Long, String> entry : map.entrySet()) {
+ Long id = entry.getKey();
+ brokerAddr = entry.getValue();
+ if (brokerAddr != null) {
+ found = true;
+ if (MixAll.MASTER_ID == id) {
+ slave = false;
+ break FOR_SEG;
+ } else {
+ slave = true;
+ }
+ break;
+
+ }
+ } // end of for
+ }
+
+ if (found) {
+ return new FindBrokerResult(brokerAddr, slave);
+ }
+
+ return null;
+ }
+
+ public String findBrokerAddressInPublish(final String brokerName) {
+ HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
+ if (map != null && !map.isEmpty()) {
+ return map.get(MixAll.MASTER_ID);
+ }
+
+ return null;
+ }
+
+ public FindBrokerResult findBrokerAddressInSubscribe(//
+ final String brokerName, //
+ final long brokerId, //
+ final boolean onlyThisBroker//
+ ) {
+ String brokerAddr = null;
+ boolean slave = false;
+ boolean found = false;
+
+ HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
+ if (map != null && !map.isEmpty()) {
+ brokerAddr = map.get(brokerId);
+ slave = brokerId != MixAll.MASTER_ID;
+ found = brokerAddr != null;
+
+ if (!found && !onlyThisBroker) {
+ Entry<Long, String> entry = map.entrySet().iterator().next();
+ brokerAddr = entry.getValue();
+ slave = entry.getKey() != MixAll.MASTER_ID;
+ found = true;
+ }
+ }
+
+ if (found) {
+ return new FindBrokerResult(brokerAddr, slave);
+ }
+
+ return null;
+ }
+
+ public List<String> findConsumerIdList(final String topic, final String group) {
+ String brokerAddr = this.findBrokerAddrByTopic(topic);
+ if (null == brokerAddr) {
+ this.updateTopicRouteInfoFromNameServer(topic);
+ brokerAddr = this.findBrokerAddrByTopic(topic);
+ }
+
+ if (null != brokerAddr) {
+ try {
+ return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);
+ } catch (Exception e) {
+ log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);
+ }
+ }
+
+ return null;
+ }
+
+ public String findBrokerAddrByTopic(final String topic) {
+ TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
+ if (topicRouteData != null) {
+ List<BrokerData> brokers = topicRouteData.getBrokerDatas();
+ if (!brokers.isEmpty()) {
+ int index = random.nextInt(brokers.size());
+ BrokerData bd = brokers.get(index % brokers.size());
+ return bd.selectBrokerAddr();
+ }
+ }
+
+ return null;
+ }
+
+ public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
+ DefaultMQPushConsumerImpl consumer = null;
+ try {
+ MQConsumerInner impl = this.consumerTable.get(group);
+ if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
+ consumer = (DefaultMQPushConsumerImpl) impl;
+ } else {
+ log.info("[reset-offset] consumer dose not exist. group={}", group);
+ return;
+ }
+ consumer.suspend();
+
+ ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
+ for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
+ ProcessQueue pq = entry.getValue();
+ pq.setDropped(true);
+ pq.clear();
+ }
+ }
+
+ try {
+ TimeUnit.SECONDS.sleep(10);
+ } catch (InterruptedException e) {
+ //
+ }
+
+ Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
+ while (iterator.hasNext()) {
+ MessageQueue mq = iterator.next();
+ Long offset = offsetTable.get(mq);
+ if (topic.equals(mq.getTopic()) && offset != null) {
+ try {
+ consumer.updateConsumeOffset(mq, offset);
+ consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
+ iterator.remove();
+ } catch (Exception e) {
+ log.warn("reset offset failed. group={}, {}", group, mq, e);
+ }
+ }
+ }
+ } finally {
+ consumer.resume();
+ }
+ }
+
+ public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {
+ MQConsumerInner impl = this.consumerTable.get(group);
+ if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
+ DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) impl;
+ return consumer.getOffsetStore().cloneOffsetTable(topic);
+ } else if (impl != null && impl instanceof DefaultMQPullConsumerImpl) {
+ DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl) impl;
+ return consumer.getOffsetStore().cloneOffsetTable(topic);
+ } else {
+ return Collections.EMPTY_MAP;
+ }
+ }
+
+ public TopicRouteData getAnExistTopicRouteData(final String topic) {
+ return this.topicRouteTable.get(topic);
+ }
+
+ public MQClientAPIImpl getMQClientAPIImpl() {
+ return mQClientAPIImpl;
+ }
+
+ public MQAdminImpl getMQAdminImpl() {
+ return mQAdminImpl;
+ }
+
+ public long getBootTimestamp() {
+ return bootTimestamp;
+ }
+
+ public ScheduledExecutorService getScheduledExecutorService() {
+ return scheduledExecutorService;
+ }
+
+ public PullMessageService getPullMessageService() {
+ return pullMessageService;
+ }
+
+ public DefaultMQProducer getDefaultMQProducer() {
+ return defaultMQProducer;
+ }
+
+ public ConcurrentHashMap<String, TopicRouteData> getTopicRouteTable() {
+ return topicRouteTable;
+ }
+
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, //
+ final String consumerGroup, //
+ final String brokerName) {
+ MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
+ if (null != mqConsumerInner) {
+ DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner;
+
+ ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName);
+ return result;
+ }
+
+ return null;
+ }
+
+
+ public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) {
+ MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
+
+ ConsumerRunningInfo consumerRunningInfo = mqConsumerInner.consumerRunningInfo();
+
+ List<String> nsList = this.mQClientAPIImpl.getRemotingClient().getNameServerAddressList();
+
+ StringBuffer strBuffer = new StringBuffer();
+ if (nsList != null) {
+ for (String addr : nsList) {
+ strBuffer.append(addr + ";");
+ }
+ }
+
+ String nsAddr = strBuffer.toString();
+ consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_NAMESERVER_ADDR, nsAddr);
+ consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CONSUME_TYPE, mqConsumerInner.consumeType().name());
+ consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CLIENT_VERSION,
+ MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
+
+ return consumerRunningInfo;
+ }
+
+
+ public ConsumerStatsManager getConsumerStatsManager() {
+ return consumerStatsManager;
+ }
+
+ public NettyClientConfig getNettyClientConfig() {
+ return nettyClientConfig;
+ }
+}