You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:35 UTC
[44/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
new file mode 100644
index 0000000..1749e91
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.NettySystemConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerStartup {
+ public static Properties properties = null;
+ public static CommandLine commandLine = null;
+ public static String configFile = null;
+ public static Logger log;
+
+ public static void main(String[] args) {
+ start(createBrokerController(args));
+ }
+
+ public static BrokerController start(BrokerController controller) {
+ try {
+ controller.start();
+ String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+
+ if (null != controller.getBrokerConfig().getNamesrvAddr()) {
+ tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
+ }
+
+ log.info(tip);
+ return controller;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ return null;
+ }
+
+ public static BrokerController createBrokerController(String[] args) {
+ System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+
+ if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
+ NettySystemConfig.socketSndbufSize = 131072;
+ }
+
+ if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
+ NettySystemConfig.socketRcvbufSize = 131072;
+ }
+
+ try {
+ //PackageConflictDetect.detectFastjson();
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
+ new PosixParser());
+ if (null == commandLine) {
+ System.exit(-1);
+ }
+
+ final BrokerConfig brokerConfig = new BrokerConfig();
+ final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ nettyServerConfig.setListenPort(10911);
+ final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+
+ if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
+ int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
+ messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
+ }
+
+ if (commandLine.hasOption('p')) {
+ MixAll.printObjectProperties(null, brokerConfig);
+ MixAll.printObjectProperties(null, nettyServerConfig);
+ MixAll.printObjectProperties(null, nettyClientConfig);
+ MixAll.printObjectProperties(null, messageStoreConfig);
+ System.exit(0);
+ } else if (commandLine.hasOption('m')) {
+ MixAll.printObjectProperties(null, brokerConfig, true);
+ MixAll.printObjectProperties(null, nettyServerConfig, true);
+ MixAll.printObjectProperties(null, nettyClientConfig, true);
+ MixAll.printObjectProperties(null, messageStoreConfig, true);
+ System.exit(0);
+ }
+
+ if (commandLine.hasOption('c')) {
+ String file = commandLine.getOptionValue('c');
+ if (file != null) {
+ configFile = file;
+ InputStream in = new BufferedInputStream(new FileInputStream(file));
+ properties = new Properties();
+ properties.load(in);
+
+ parsePropertie2SystemEnv(properties);
+ MixAll.properties2Object(properties, brokerConfig);
+ MixAll.properties2Object(properties, nettyServerConfig);
+ MixAll.properties2Object(properties, nettyClientConfig);
+ MixAll.properties2Object(properties, messageStoreConfig);
+
+ BrokerPathConfigHelper.setBrokerConfigPath(file);
+ in.close();
+ }
+ }
+
+ MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
+
+ if (null == brokerConfig.getRocketmqHome()) {
+ System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
+ + " variable in your environment to match the location of the RocketMQ installation");
+ System.exit(-2);
+ }
+
+ String namesrvAddr = brokerConfig.getNamesrvAddr();
+ if (null != namesrvAddr) {
+ try {
+ String[] addrArray = namesrvAddr.split(";");
+ if (addrArray != null) {
+ for (String addr : addrArray) {
+ RemotingUtil.string2SocketAddress(addr);
+ }
+ }
+ } catch (Exception e) {
+ System.out.printf(
+ "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
+ namesrvAddr);
+ System.exit(-3);
+ }
+ }
+
+
+ switch (messageStoreConfig.getBrokerRole()) {
+ case ASYNC_MASTER:
+ case SYNC_MASTER:
+ brokerConfig.setBrokerId(MixAll.MASTER_ID);
+ break;
+ case SLAVE:
+ if (brokerConfig.getBrokerId() <= 0) {
+ System.out.printf("Slave's brokerId must be > 0");
+ System.exit(-3);
+ }
+
+ break;
+ default:
+ break;
+ }
+
+ messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ JoranConfigurator configurator = new JoranConfigurator();
+ configurator.setContext(lc);
+ lc.reset();
+ configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
+ log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ MixAll.printObjectProperties(log, brokerConfig);
+ MixAll.printObjectProperties(log, nettyServerConfig);
+ MixAll.printObjectProperties(log, nettyClientConfig);
+ MixAll.printObjectProperties(log, messageStoreConfig);
+
+ final BrokerController controller = new BrokerController(//
+ brokerConfig, //
+ nettyServerConfig, //
+ nettyClientConfig, //
+ messageStoreConfig);
+ // remember all configs to prevent discard
+ controller.getConfiguration().registerConfig(properties);
+
+ boolean initResult = controller.initialize();
+ if (!initResult) {
+ controller.shutdown();
+ System.exit(-3);
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ private volatile boolean hasShutdown = false;
+ private AtomicInteger shutdownTimes = new AtomicInteger(0);
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet());
+ if (!this.hasShutdown) {
+ this.hasShutdown = true;
+ long begineTime = System.currentTimeMillis();
+ controller.shutdown();
+ long consumingTimeTotal = System.currentTimeMillis() - begineTime;
+ log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal);
+ }
+ }
+ }
+ }, "ShutdownHook"));
+
+ return controller;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ return null;
+ }
+
+ private static void parsePropertie2SystemEnv(Properties properties) {
+ if (properties == null) {
+ return;
+ }
+ String rmqAddressServerDomain = properties.getProperty("rmqAddressServerDomain", "jmenv.tbsite.net");
+ String rmqAddressServerSubGroup = properties.getProperty("rmqAddressServerSubGroup", "nsaddr");
+ System.setProperty("rocketmq.namesrv.domain", rmqAddressServerDomain);
+ System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);
+ }
+
+ public static Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("c", "configFile", true, "Broker config properties file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("p", "printConfigItem", false, "Print all config item");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("m", "printImportantConfig", false, "Print important config item");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
new file mode 100644
index 0000000..e15a22a
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client;
+
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import io.netty.channel.Channel;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ClientChannelInfo {
+ private final Channel channel;
+ private final String clientId;
+ private final LanguageCode language;
+ private final int version;
+ private volatile long lastUpdateTimestamp = System.currentTimeMillis();
+
+
+ public ClientChannelInfo(Channel channel) {
+ this(channel, null, null, 0);
+ }
+
+
+ public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) {
+ this.channel = channel;
+ this.clientId = clientId;
+ this.language = language;
+ this.version = version;
+ }
+
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+
+ public String getClientId() {
+ return clientId;
+ }
+
+
+ public LanguageCode getLanguage() {
+ return language;
+ }
+
+
+ public int getVersion() {
+ return version;
+ }
+
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((channel == null) ? 0 : channel.hashCode());
+ result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
+ result = prime * result + ((language == null) ? 0 : language.hashCode());
+ result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32));
+ result = prime * result + version;
+ return result;
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ClientChannelInfo other = (ClientChannelInfo) obj;
+ if (channel == null) {
+ if (other.channel != null)
+ return false;
+ } else if (this.channel != other.channel) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language
+ + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
new file mode 100644
index 0000000..2d1ad9b
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ClientHousekeepingService implements ChannelEventListener {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final BrokerController brokerController;
+
+ private ScheduledExecutorService scheduledExecutorService = Executors
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
+
+
+ public ClientHousekeepingService(final BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+
+ public void start() {
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ClientHousekeepingService.this.scanExceptionChannel();
+ } catch (Exception e) {
+ log.error("", e);
+ }
+ }
+ }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
+ }
+
+ private void scanExceptionChannel() {
+ this.brokerController.getProducerManager().scanNotActiveChannel();
+ this.brokerController.getConsumerManager().scanNotActiveChannel();
+ this.brokerController.getFilterServerManager().scanNotActiveChannel();
+ }
+
+ public void shutdown() {
+ this.scheduledExecutorService.shutdown();
+ }
+
+ @Override
+ public void onChannelConnect(String remoteAddr, Channel channel) {
+
+ }
+
+
+ @Override
+ public void onChannelClose(String remoteAddr, Channel channel) {
+ this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
+ this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
+ this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
+ }
+
+
+ @Override
+ public void onChannelException(String remoteAddr, Channel channel) {
+ this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
+ this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
+ this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
+ }
+
+
+ @Override
+ public void onChannelIdle(String remoteAddr, Channel channel) {
+ this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
+ this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
+ this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
new file mode 100644
index 0000000..10795f5
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerGroupInfo {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final String groupName;
+ private final ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable =
+ new ConcurrentHashMap<String, SubscriptionData>();
+ private final ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
+ new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
+ private volatile ConsumeType consumeType;
+ private volatile MessageModel messageModel;
+ private volatile ConsumeFromWhere consumeFromWhere;
+ private volatile long lastUpdateTimestamp = System.currentTimeMillis();
+
+
+ public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel,
+ ConsumeFromWhere consumeFromWhere) {
+ this.groupName = groupName;
+ this.consumeType = consumeType;
+ this.messageModel = messageModel;
+ this.consumeFromWhere = consumeFromWhere;
+ }
+
+
+ public ClientChannelInfo findChannel(final String clientId) {
+ Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Channel, ClientChannelInfo> next = it.next();
+ if (next.getValue().getClientId().equals(clientId)) {
+ return next.getValue();
+ }
+ }
+
+ return null;
+ }
+
+
+ public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() {
+ return subscriptionTable;
+ }
+
+
+ public ConcurrentHashMap<Channel, ClientChannelInfo> getChannelInfoTable() {
+ return channelInfoTable;
+ }
+
+
+ public List<Channel> getAllChannel() {
+ List<Channel> result = new ArrayList<Channel>();
+
+ result.addAll(this.channelInfoTable.keySet());
+
+ return result;
+ }
+
+
+ public List<String> getAllClientId() {
+ List<String> result = new ArrayList<String>();
+
+ Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Entry<Channel, ClientChannelInfo> entry = it.next();
+ ClientChannelInfo clientChannelInfo = entry.getValue();
+ result.add(clientChannelInfo.getClientId());
+ }
+
+ return result;
+ }
+
+
+ public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {
+ ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel());
+ if (old != null) {
+ log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString());
+ }
+ }
+
+
+ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+ final ClientChannelInfo info = this.channelInfoTable.remove(channel);
+ if (info != null) {
+ log.warn(
+ "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",
+ info.toString(), groupName);
+ return true;
+ }
+
+ return false;
+ }
+
+ public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
+ MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
+ boolean updated = false;
+ this.consumeType = consumeType;
+ this.messageModel = messageModel;
+ this.consumeFromWhere = consumeFromWhere;
+
+ ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
+ if (null == infoOld) {
+ ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
+ if (null == prev) {
+ log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
+ messageModel, infoNew.toString());
+ updated = true;
+ }
+
+ infoOld = infoNew;
+ } else {
+ if (!infoOld.getClientId().equals(infoNew.getClientId())) {
+ log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
+ this.groupName,
+ infoOld.toString(),
+ infoNew.toString());
+ this.channelInfoTable.put(infoNew.getChannel(), infoNew);
+ }
+ }
+
+ this.lastUpdateTimestamp = System.currentTimeMillis();
+ infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
+
+ return updated;
+ }
+
+
+ public boolean updateSubscription(final Set<SubscriptionData> subList) {
+ boolean updated = false;
+
+ for (SubscriptionData sub : subList) {
+ SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
+ if (old == null) {
+ SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
+ if (null == prev) {
+ updated = true;
+ log.info("subscription changed, add new topic, group: {} {}",
+ this.groupName,
+ sub.toString());
+ }
+ } else if (sub.getSubVersion() > old.getSubVersion()) {
+ if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
+ log.info("subscription changed, group: {} OLD: {} NEW: {}",
+ this.groupName,
+ old.toString(),
+ sub.toString()
+ );
+ }
+
+ this.subscriptionTable.put(sub.getTopic(), sub);
+ }
+ }
+
+
+ Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, SubscriptionData> next = it.next();
+ String oldTopic = next.getKey();
+
+ boolean exist = false;
+ for (SubscriptionData sub : subList) {
+ if (sub.getTopic().equals(oldTopic)) {
+ exist = true;
+ break;
+ }
+ }
+
+ if (!exist) {
+ log.warn("subscription changed, group: {} remove topic {} {}",
+ this.groupName,
+ oldTopic,
+ next.getValue().toString()
+ );
+
+ it.remove();
+ updated = true;
+ }
+ }
+
+ this.lastUpdateTimestamp = System.currentTimeMillis();
+
+ return updated;
+ }
+
+
+ public Set<String> getSubscribeTopics() {
+ return subscriptionTable.keySet();
+ }
+
+
+ public SubscriptionData findSubscriptionData(final String topic) {
+ return this.subscriptionTable.get(topic);
+ }
+
+
+ public ConsumeType getConsumeType() {
+ return consumeType;
+ }
+
+
+ public void setConsumeType(ConsumeType consumeType) {
+ this.consumeType = consumeType;
+ }
+
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+
+ public void setMessageModel(MessageModel messageModel) {
+ this.messageModel = messageModel;
+ }
+
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+
+ public ConsumeFromWhere getConsumeFromWhere() {
+ return consumeFromWhere;
+ }
+
+
+ public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+ this.consumeFromWhere = consumeFromWhere;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
new file mode 100644
index 0000000..e8d23db
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client;
+
+import io.netty.channel.Channel;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface ConsumerIdsChangeListener {
+ public void consumerIdsChanged(final String group, final List<Channel> channels);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
new file mode 100644
index 0000000..561fec6
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
+ private final ConcurrentHashMap<String/* Group */, ConsumerGroupInfo> consumerTable =
+ new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
+ private final ConsumerIdsChangeListener consumerIdsChangeListener;
+
+ public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
+ this.consumerIdsChangeListener = consumerIdsChangeListener;
+ }
+
+ public ClientChannelInfo findChannel(final String group, final String clientId) {
+ ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+ if (consumerGroupInfo != null) {
+ return consumerGroupInfo.findChannel(clientId);
+ }
+ return null;
+ }
+
+ public SubscriptionData findSubscriptionData(final String group, final String topic) {
+ ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
+ if (consumerGroupInfo != null) {
+ return consumerGroupInfo.findSubscriptionData(topic);
+ }
+
+ return null;
+ }
+
+ public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
+ return this.consumerTable.get(group);
+ }
+
+ public int findSubscriptionDataCount(final String group) {
+ ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
+ if (consumerGroupInfo != null) {
+ return consumerGroupInfo.getSubscriptionTable().size();
+ }
+
+ return 0;
+ }
+
+ public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+ Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumerGroupInfo> next = it.next();
+ ConsumerGroupInfo info = next.getValue();
+ boolean removed = info.doChannelCloseEvent(remoteAddr, channel);
+ if (removed) {
+ if (info.getChannelInfoTable().isEmpty()) {
+ ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
+ if (remove != null) {
+ log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
+ next.getKey());
+ }
+ }
+
+ this.consumerIdsChangeListener.consumerIdsChanged(next.getKey(), info.getAllChannel());
+ }
+ }
+ }
+
+ public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
+ ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+ final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
+
+ ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+ if (null == consumerGroupInfo) {
+ ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
+ ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
+ consumerGroupInfo = prev != null ? prev : tmp;
+ }
+
+ boolean r1 =
+ consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
+ consumeFromWhere);
+ boolean r2 = consumerGroupInfo.updateSubscription(subList);
+
+ if (r1 || r2) {
+ if (isNotifyConsumerIdsChangedEnable) {
+ this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
+ }
+ }
+
+ return r1 || r2;
+ }
+
+ public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) {
+ ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+ if (null != consumerGroupInfo) {
+ consumerGroupInfo.unregisterChannel(clientChannelInfo);
+ if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
+ ConsumerGroupInfo remove = this.consumerTable.remove(group);
+ if (remove != null) {
+ log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
+ }
+ }
+ if (isNotifyConsumerIdsChangedEnable) {
+ this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
+ }
+ }
+ }
+
+ public void scanNotActiveChannel() {
+ Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumerGroupInfo> next = it.next();
+ String group = next.getKey();
+ ConsumerGroupInfo consumerGroupInfo = next.getValue();
+ ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
+ consumerGroupInfo.getChannelInfoTable();
+
+ Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
+ while (itChannel.hasNext()) {
+ Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
+ ClientChannelInfo clientChannelInfo = nextChannel.getValue();
+ long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
+ if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+ log.warn(
+ "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
+ RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
+ RemotingUtil.closeChannel(clientChannelInfo.getChannel());
+ itChannel.remove();
+ }
+ }
+
+ if (channelInfoTable.isEmpty()) {
+ log.warn(
+ "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
+ group);
+ it.remove();
+ }
+ }
+ }
+
+ public HashSet<String> queryTopicConsumeByWho(final String topic) {
+ HashSet<String> groups = new HashSet<String>();
+ Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumerGroupInfo> entry = it.next();
+ ConcurrentHashMap<String, SubscriptionData> subscriptionTable =
+ entry.getValue().getSubscriptionTable();
+ if (subscriptionTable.containsKey(topic)) {
+ groups.add(entry.getKey());
+ }
+ }
+ return groups;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
new file mode 100644
index 0000000..501d665
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client;
+
+import org.apache.rocketmq.broker.BrokerController;
+import io.netty.channel.Channel;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
+ private final BrokerController brokerController;
+
+
+ public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+
+ @Override
+ public void consumerIdsChanged(String group, List<Channel> channels) {
+ if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
+ for (Channel chl : channels) {
+ this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
new file mode 100644
index 0000000..6656ab0
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ProducerManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final long LOCK_TIMEOUT_MILLIS = 3000;
+ private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
+ private final Lock groupChannelLock = new ReentrantLock();
+ private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
+ new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+
+
+ public ProducerManager() {
+ }
+
+
+ public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
+ HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable =
+ new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+ try {
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ newGroupChannelTable.putAll(groupChannelTable);
+ } finally {
+ groupChannelLock.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ return newGroupChannelTable;
+ }
+
+
+ public void scanNotActiveChannel() {
+ try {
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
+ .entrySet()) {
+ final String group = entry.getKey();
+ final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
+
+ Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Channel, ClientChannelInfo> item = it.next();
+ // final Integer id = item.getKey();
+ final ClientChannelInfo info = item.getValue();
+
+ long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
+ if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+ it.remove();
+ log.warn(
+ "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
+ RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
+ RemotingUtil.closeChannel(info.getChannel());
+ }
+ }
+ }
+ } finally {
+ this.groupChannelLock.unlock();
+ }
+ } else {
+ log.warn("ProducerManager scanNotActiveChannel lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ }
+
+
+ public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+ if (channel != null) {
+ try {
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
+ .entrySet()) {
+ final String group = entry.getKey();
+ final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
+ entry.getValue();
+ final ClientChannelInfo clientChannelInfo =
+ clientChannelInfoTable.remove(channel);
+ if (clientChannelInfo != null) {
+ log.info(
+ "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
+ clientChannelInfo.toString(), remoteAddr, group);
+ }
+
+ }
+ } finally {
+ this.groupChannelLock.unlock();
+ }
+ } else {
+ log.warn("ProducerManager doChannelCloseEvent lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ }
+ }
+
+
+ public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
+ try {
+ ClientChannelInfo clientChannelInfoFound = null;
+
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+ if (null == channelTable) {
+ channelTable = new HashMap<Channel, ClientChannelInfo>();
+ this.groupChannelTable.put(group, channelTable);
+ }
+
+ clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
+ if (null == clientChannelInfoFound) {
+ channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
+ log.info("new producer connected, group: {} channel: {}", group,
+ clientChannelInfo.toString());
+ }
+ } finally {
+ this.groupChannelLock.unlock();
+ }
+
+ if (clientChannelInfoFound != null) {
+ clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
+ }
+ } else {
+ log.warn("ProducerManager registerProducer lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ }
+
+
+ public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
+ try {
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+ if (null != channelTable && !channelTable.isEmpty()) {
+ ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
+ if (old != null) {
+ log.info("unregister a producer[{}] from groupChannelTable {}", group,
+ clientChannelInfo.toString());
+ }
+
+ if (channelTable.isEmpty()) {
+ this.groupChannelTable.remove(group);
+ log.info("unregister a producer group[{}] from groupChannelTable", group);
+ }
+ }
+ } finally {
+ this.groupChannelLock.unlock();
+ }
+ } else {
+ log.warn("ProducerManager unregisterProducer lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
new file mode 100644
index 0000000..7d7064a
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client.net;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.pagecache.OneMessageTransfer;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageQueueForC;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody;
+import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.common.protocol.body.ResetOffsetBodyForC;
+import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
+import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.FileRegion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class Broker2Client {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final BrokerController brokerController;
+
+ public Broker2Client(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ public void checkProducerTransactionState(
+ final Channel channel,
+ final CheckTransactionStateRequestHeader requestHeader,
+ final SelectMappedBufferResult selectMappedBufferResult) {
+ RemotingCommand request =
+ RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
+ request.markOnewayRPC();
+
+ try {
+ FileRegion fileRegion =
+ new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
+ selectMappedBufferResult);
+ channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ selectMappedBufferResult.release();
+ if (!future.isSuccess()) {
+ log.error("invokeProducer failed,", future.cause());
+ }
+ }
+ });
+ } catch (Throwable e) {
+ log.error("invokeProducer exception", e);
+ selectMappedBufferResult.release();
+ }
+ }
+
+ public RemotingCommand callClient(final Channel channel,
+ final RemotingCommand request
+ ) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+ return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
+ }
+
+ public void notifyConsumerIdsChanged(
+ final Channel channel,
+ final String consumerGroup) {
+ if (null == consumerGroup) {
+ log.error("notifyConsumerIdsChanged consumerGroup is null");
+ return;
+ }
+
+ NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ RemotingCommand request =
+ RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
+
+ try {
+ this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
+ } catch (Exception e) {
+ log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
+ }
+ }
+
+ public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce) {
+ return resetOffset(topic, group, timeStamp, isForce, false);
+ }
+
+ public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
+ boolean isC) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
+ if (null == topicConfig) {
+ log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
+ return response;
+ }
+
+ Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
+
+ for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
+ MessageQueue mq = new MessageQueue();
+ mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+ mq.setTopic(topic);
+ mq.setQueueId(i);
+
+ long consumerOffset =
+ this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
+ if (-1 == consumerOffset) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("THe consumer group <%s> not exist", group));
+ return response;
+ }
+
+ long timeStampOffset;
+ if (timeStamp == -1) {
+
+ timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
+ } else {
+ timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
+ }
+
+ if (timeStampOffset < 0) {
+ log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
+ timeStampOffset = 0;
+ }
+
+ if (isForce || timeStampOffset < consumerOffset) {
+ offsetTable.put(mq, timeStampOffset);
+ } else {
+ offsetTable.put(mq, consumerOffset);
+ }
+ }
+
+ ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setGroup(group);
+ requestHeader.setTimestamp(timeStamp);
+ RemotingCommand request =
+ RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
+ if (isC) {
+ // c++ language
+ ResetOffsetBodyForC body = new ResetOffsetBodyForC();
+ List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
+ body.setOffsetTable(offsetList);
+ request.setBody(body.encode());
+ } else {
+ // other language
+ ResetOffsetBody body = new ResetOffsetBody();
+ body.setOffsetTable(offsetTable);
+ request.setBody(body.encode());
+ }
+
+ ConsumerGroupInfo consumerGroupInfo =
+ this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
+
+ if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
+ ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
+ consumerGroupInfo.getChannelInfoTable();
+ for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
+ int version = entry.getValue().getVersion();
+ if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
+ try {
+ this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
+ log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
+ new Object[]{topic, group, entry.getValue().getClientId()});
+ } catch (Exception e) {
+ log.error("[reset-offset] reset offset exception. topic={}, group={}",
+ new Object[]{topic, group}, e);
+ }
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("the client does not support this feature. version="
+ + MQVersion.getVersionDesc(version));
+ log.warn("[reset-offset] the client does not support this feature. version={}",
+ RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
+ return response;
+ }
+ }
+ } else {
+ String errorInfo =
+ String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
+ requestHeader.getGroup(),
+ requestHeader.getTopic(),
+ requestHeader.getTimestamp());
+ log.error(errorInfo);
+ response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
+ response.setRemark(errorInfo);
+ return response;
+ }
+ response.setCode(ResponseCode.SUCCESS);
+ ResetOffsetBody resBody = new ResetOffsetBody();
+ resBody.setOffsetTable(offsetTable);
+ response.setBody(resBody.encode());
+ return response;
+ }
+
+ private List<MessageQueueForC> convertOffsetTable2OffsetList(Map<MessageQueue, Long> table) {
+ List<MessageQueueForC> list = new ArrayList<MessageQueueForC>();
+ for (Entry<MessageQueue, Long> entry : table.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ MessageQueueForC tmp =
+ new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue());
+ list.add(tmp);
+ }
+ return list;
+ }
+
+ public RemotingCommand getConsumeStatus(String topic, String group, String originClientId) {
+ final RemotingCommand result = RemotingCommand.createResponseCommand(null);
+
+ GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setGroup(group);
+ RemotingCommand request =
+ RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT,
+ requestHeader);
+
+ Map<String, Map<MessageQueue, Long>> consumerStatusTable =
+ new HashMap<String, Map<MessageQueue, Long>>();
+ ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
+ this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
+ if (null == channelInfoTable || channelInfoTable.isEmpty()) {
+ result.setCode(ResponseCode.SYSTEM_ERROR);
+ result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group));
+ return result;
+ }
+
+ for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
+ int version = entry.getValue().getVersion();
+ String clientId = entry.getValue().getClientId();
+ if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
+ result.setCode(ResponseCode.SYSTEM_ERROR);
+ result.setRemark("the client does not support this feature. version="
+ + MQVersion.getVersionDesc(version));
+ log.warn("[get-consumer-status] the client does not support this feature. version={}",
+ RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
+ return result;
+ } else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) {
+ try {
+ RemotingCommand response =
+ this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ if (response.getBody() != null) {
+ GetConsumerStatusBody body =
+ GetConsumerStatusBody.decode(response.getBody(),
+ GetConsumerStatusBody.class);
+
+ consumerStatusTable.put(clientId, body.getMessageQueueTable());
+ log.info(
+ "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}",
+ new Object[]{topic, group, clientId});
+ }
+ }
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ log.error(
+ "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}",
+ new Object[]{topic, group}, e);
+ }
+
+ if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) {
+ break;
+ }
+ }
+ }
+
+ result.setCode(ResponseCode.SUCCESS);
+ GetConsumerStatusBody resBody = new GetConsumerStatusBody();
+ resBody.setConsumerTable(consumerStatusTable);
+ result.setBody(resBody.encode());
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
new file mode 100644
index 0000000..adb1819
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client.rebalance;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RebalanceLockManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME);
+ private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
+ "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
+ private final Lock lock = new ReentrantLock();
+ private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
+ new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
+
+ public boolean tryLock(final String group, final MessageQueue mq, final String clientId) {
+
+ if (!this.isLocked(group, mq, clientId)) {
+ try {
+ this.lock.lockInterruptibly();
+ try {
+ ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
+ if (null == groupValue) {
+ groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32);
+ this.mqLockTable.put(group, groupValue);
+ }
+
+ LockEntry lockEntry = groupValue.get(mq);
+ if (null == lockEntry) {
+ lockEntry = new LockEntry();
+ lockEntry.setClientId(clientId);
+ groupValue.put(mq, lockEntry);
+ log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
+ group, //
+ clientId, //
+ mq);
+ }
+
+ if (lockEntry.isLocked(clientId)) {
+ lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
+ return true;
+ }
+
+ String oldClientId = lockEntry.getClientId();
+
+
+ if (lockEntry.isExpired()) {
+ lockEntry.setClientId(clientId);
+ lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
+ log.warn(
+ "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
+ group, //
+ oldClientId, //
+ clientId, //
+ mq);
+ return true;
+ }
+
+
+ log.warn(
+ "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
+ group, //
+ oldClientId, //
+ clientId, //
+ mq);
+ return false;
+ } finally {
+ this.lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("putMessage exception", e);
+ }
+ } else {
+
+ }
+
+ return true;
+ }
+
+ private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
+ ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
+ if (groupValue != null) {
+ LockEntry lockEntry = groupValue.get(mq);
+ if (lockEntry != null) {
+ boolean locked = lockEntry.isLocked(clientId);
+ if (locked) {
+ lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
+ }
+
+ return locked;
+ }
+ }
+
+ return false;
+ }
+
+ public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
+ final String clientId) {
+ Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
+ Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
+
+
+ for (MessageQueue mq : mqs) {
+ if (this.isLocked(group, mq, clientId)) {
+ lockedMqs.add(mq);
+ } else {
+ notLockedMqs.add(mq);
+ }
+ }
+
+ if (!notLockedMqs.isEmpty()) {
+ try {
+ this.lock.lockInterruptibly();
+ try {
+ ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
+ if (null == groupValue) {
+ groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32);
+ this.mqLockTable.put(group, groupValue);
+ }
+
+
+ for (MessageQueue mq : notLockedMqs) {
+ LockEntry lockEntry = groupValue.get(mq);
+ if (null == lockEntry) {
+ lockEntry = new LockEntry();
+ lockEntry.setClientId(clientId);
+ groupValue.put(mq, lockEntry);
+ log.info(
+ "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
+ group, //
+ clientId, //
+ mq);
+ }
+
+
+ if (lockEntry.isLocked(clientId)) {
+ lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
+ lockedMqs.add(mq);
+ continue;
+ }
+
+ String oldClientId = lockEntry.getClientId();
+
+
+ if (lockEntry.isExpired()) {
+ lockEntry.setClientId(clientId);
+ lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
+ log.warn(
+ "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
+ group, //
+ oldClientId, //
+ clientId, //
+ mq);
+ lockedMqs.add(mq);
+ continue;
+ }
+
+
+ log.warn(
+ "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
+ group, //
+ oldClientId, //
+ clientId, //
+ mq);
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("putMessage exception", e);
+ }
+ }
+
+ return lockedMqs;
+ }
+
+ public void unlockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) {
+ try {
+ this.lock.lockInterruptibly();
+ try {
+ ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
+ if (null != groupValue) {
+ for (MessageQueue mq : mqs) {
+ LockEntry lockEntry = groupValue.get(mq);
+ if (null != lockEntry) {
+ if (lockEntry.getClientId().equals(clientId)) {
+ groupValue.remove(mq);
+ log.info("unlockBatch, Group: {} {} {}",
+ group,
+ mq,
+ clientId);
+ } else {
+ log.warn("unlockBatch, but mq locked by other client: {}, Group: {} {} {}",
+ lockEntry.getClientId(),
+ group,
+ mq,
+ clientId);
+ }
+ } else {
+ log.warn("unlockBatch, but mq not locked, Group: {} {} {}",
+ group,
+ mq,
+ clientId);
+ }
+ }
+ } else {
+ log.warn("unlockBatch, group not exist, Group: {} {}",
+ group,
+ clientId);
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("putMessage exception", e);
+ }
+ }
+
+ static class LockEntry {
+ private String clientId;
+ private volatile long lastUpdateTimestamp = System.currentTimeMillis();
+
+
+ public String getClientId() {
+ return clientId;
+ }
+
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+ public boolean isLocked(final String clientId) {
+ boolean eq = this.clientId.equals(clientId);
+ return eq && !this.isExpired();
+ }
+
+ public boolean isExpired() {
+ boolean expired =
+ (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
+
+ return expired;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
new file mode 100644
index 0000000..5b86d99
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filtersrv;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerStartup;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class FilterServerManager {
+
+ public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final ConcurrentHashMap<Channel, FilterServerInfo> filterServerTable =
+ new ConcurrentHashMap<Channel, FilterServerInfo>(16);
+ private final BrokerController brokerController;
+
+ private ScheduledExecutorService scheduledExecutorService = Executors
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));
+
+ public FilterServerManager(final BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ public void start() {
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ FilterServerManager.this.createFilterServer();
+ } catch (Exception e) {
+ log.error("", e);
+ }
+ }
+ }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
+ }
+
+ public void createFilterServer() {
+ int more =
+ this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
+ String cmd = this.buildStartCommand();
+ for (int i = 0; i < more; i++) {
+ FilterServerUtil.callShell(cmd, log);
+ }
+ }
+
+ private String buildStartCommand() {
+ String config = "";
+ if (BrokerStartup.configFile != null) {
+ config = String.format("-c %s", BrokerStartup.configFile);
+ }
+
+ if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) {
+ config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr());
+ }
+
+ if (RemotingUtil.isWindowsPlatform()) {
+ return String.format("start /b %s\\bin\\mqfiltersrv.exe %s",
+ this.brokerController.getBrokerConfig().getRocketmqHome(),
+ config);
+ } else {
+ return String.format("sh %s/bin/startfsrv.sh %s",
+ this.brokerController.getBrokerConfig().getRocketmqHome(),
+ config);
+ }
+ }
+
+ public void shutdown() {
+ this.scheduledExecutorService.shutdown();
+ }
+
+ public void registerFilterServer(final Channel channel, final String filterServerAddr) {
+ FilterServerInfo filterServerInfo = this.filterServerTable.get(channel);
+ if (filterServerInfo != null) {
+ filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
+ } else {
+ filterServerInfo = new FilterServerInfo();
+ filterServerInfo.setFilterServerAddr(filterServerAddr);
+ filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
+ this.filterServerTable.put(channel, filterServerInfo);
+ log.info("Receive a New Filter Server<{}>", filterServerAddr);
+ }
+ }
+
+ /**
+
+ */
+ public void scanNotActiveChannel() {
+
+ Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Channel, FilterServerInfo> next = it.next();
+ long timestamp = next.getValue().getLastUpdateTimestamp();
+ Channel channel = next.getKey();
+ if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) {
+ log.info("The Filter Server<{}> expired, remove it", next.getKey());
+ it.remove();
+ RemotingUtil.closeChannel(channel);
+ }
+ }
+ }
+
+ public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+ FilterServerInfo old = this.filterServerTable.remove(channel);
+ if (old != null) {
+ log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),
+ remoteAddr);
+ }
+ }
+
+ public List<String> buildNewFilterServerList() {
+ List<String> addr = new ArrayList<String>();
+ Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Channel, FilterServerInfo> next = it.next();
+ addr.add(next.getValue().getFilterServerAddr());
+ }
+ return addr;
+ }
+
+ static class FilterServerInfo {
+ private String filterServerAddr;
+ private long lastUpdateTimestamp;
+
+
+ public String getFilterServerAddr() {
+ return filterServerAddr;
+ }
+
+
+ public void setFilterServerAddr(String filterServerAddr) {
+ this.filterServerAddr = filterServerAddr;
+ }
+
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+ }
+}