You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:29 UTC
[rocketmq] 05/14: Add Snode server
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 9f5383e58a6886b253395d5b6f2852e1b13552d2
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Dec 21 02:43:48 2018 +0800
Add Snode server
---
.../apache/rocketmq/broker/BrokerController.java | 11 +-
.../org/apache/rocketmq/broker/BrokerStartup.java | 7 +-
.../broker/client/ClientHousekeepingService.java | 1 -
.../broker/longpolling/PullRequestHoldService.java | 18 +-
.../broker/processor/ClientManageProcessor.java | 2 +
.../processor/SnodePullMessageProcessor.java | 449 +++++++++++++++++++++
.../rocketmq/client/impl/MQClientAPIImpl.java | 17 +-
.../client/impl/factory/MQClientInstance.java | 4 +-
.../impl/producer/DefaultMQProducerImpl.java | 37 +-
.../rocketmq/common/constant/LoggerName.java | 1 +
.../common/namesrv/RegisterSnodeResult.java | 20 +
.../rocketmq/common/protocol/RequestCode.java | 5 +
.../rocketmq/common/protocol/body/ClusterInfo.java | 22 +
.../protocol/header/PullMessageRequestHeader.java | 10 +
.../protocol/header/SendMessageRequestHeader.java | 10 +
.../header/SendMessageRequestHeaderV2.java | 12 +
.../header/namesrv/RegisterSnodeRequestHeader.java | 61 +++
.../namesrv/RegisterSnodeResponseHeader.java | 20 +
.../common/protocol/heartbeat/HeartbeatData.java | 20 +-
.../common/protocol/heartbeat/SnodeData.java | 34 ++
.../rocketmq/common/protocol/route/SnodeData.java | 57 +++
.../common/utils/PositiveAtomicCounter.java | 40 ++
distribution/conf/logback_snode.xml | 92 +++++
.../rocketmq/example/quickstart/Producer.java | 4 +-
.../apache/rocketmq/example/simple/Producer.java | 1 -
.../namesrv/processor/DefaultRequestProcessor.java | 18 +-
.../namesrv/routeinfo/RouteInfoManager.java | 31 ++
pom.xml | 1 +
rocketmq-snode/pom.xml | 93 +++++
.../org/apache/rocketmq/snode/SnodeController.java | 205 ++++++++++
.../org/apache/rocketmq/snode/SnodeStartup.java | 144 +++++++
.../rocketmq/snode/client/ClientChannelInfo.java | 100 +++++
.../snode}/client/ClientHousekeepingService.java | 36 +-
.../rocketmq/snode/client/ConsumerGroupEvent.java | 33 ++
.../rocketmq/snode/client/ConsumerGroupInfo.java | 248 ++++++++++++
.../snode/client/ConsumerIdsChangeListener.java | 22 +
.../rocketmq/snode/client/ConsumerManager.java | 189 +++++++++
.../client/DefaultConsumerIdsChangeListener.java | 64 +++
.../rocketmq/snode/client/ProducerManager.java | 224 ++++++++++
.../snode/client/SubscriptionGroupManager.java | 200 +++++++++
.../apache/rocketmq/snode/config/SnodeConfig.java | 223 ++++++++++
.../snode/processor/ConsumerManageProcessor.java | 96 +++++
.../snode/processor/HearbeatProcessor.java | 92 +++++
.../snode/processor/PullMessageProcessor.java | 40 ++
.../snode/processor/SendMessageProcessor.java | 44 ++
.../rocketmq/snode/service/ScheduledService.java | 21 +
.../snode/service/SendTransferService.java | 26 ++
.../rocketmq/snode/service/SnodeOuterService.java | 52 +++
.../snode/service/impl/ScheduledServiceImpl.java | 113 ++++++
.../service/impl/SendTransferServiceImpl.java | 45 +++
.../snode/service/impl/SnodeOuterServiceImpl.java | 270 +++++++++++++
.../rocketmq/snode/topic/TopicConfigManager.java | 19 +
52 files changed, 3542 insertions(+), 62 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 1cbd39c..9639f65 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -58,6 +58,7 @@ import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
+import org.apache.rocketmq.broker.processor.SnodePullMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
@@ -115,6 +116,7 @@ public class BrokerController {
private final ProducerManager producerManager;
private final ClientHousekeepingService clientHousekeepingService;
private final PullMessageProcessor pullMessageProcessor;
+ private final SnodePullMessageProcessor snodePullMessageProcessor;
private final PullRequestHoldService pullRequestHoldService;
private final MessageArrivingListener messageArrivingListener;
private final Broker2Client broker2Client;
@@ -184,7 +186,7 @@ public class BrokerController {
this.filterServerManager = new FilterServerManager(this);
this.slaveSynchronize = new SlaveSynchronize(this);
-
+ this.snodePullMessageProcessor = new SnodePullMessageProcessor(this);
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
@@ -513,7 +515,8 @@ public class BrokerController {
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
-
+ this.remotingServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, this.snodePullMessageProcessor,pullMessageExecutor);
+ this.snodePullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* QueryMessageProcessor
*/
@@ -671,6 +674,10 @@ public class BrokerController {
return pullMessageProcessor;
}
+ public SnodePullMessageProcessor getSnodePullMessageProcessor() {
+ return snodePullMessageProcessor;
+ }
+
public PullRequestHoldService getPullRequestHoldService() {
return pullRequestHoldService;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 4b986c0..c623d52 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -60,9 +60,7 @@ public class BrokerStartup {
public static BrokerController start(BrokerController controller) {
try {
-
controller.start();
-
String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
@@ -100,8 +98,9 @@ public class BrokerStartup {
try {
//PackageConflictDetect.detectFastjson();
+
Options options = ServerUtil.buildCommandlineOptions(new Options());
- commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
+ commandLine = ServerUtil.parseCmdLine("broker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
@@ -260,6 +259,8 @@ public class BrokerStartup {
}
private static Options buildCommandlineOptions(final Options options) {
+
+
Option opt = new Option("c", "configFile", true, "Broker config properties file");
opt.setRequired(false);
options.addOption(opt);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index d536db5..7e023dd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -55,7 +55,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
private void scanExceptionChannel() {
this.brokerController.getProducerManager().scanNotActiveChannel();
this.brokerController.getConsumerManager().scanNotActiveChannel();
- this.brokerController.getFilterServerManager().scanNotActiveChannel();
}
public void shutdown() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index 417ec0d..af6addc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -138,8 +138,13 @@ public class PullRequestHoldService extends ServiceThread {
if (match) {
try {
- this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
- request.getRequestCommand());
+ if (request.getMessageFilter() == null && request.getSubscriptionData() == null) {
+ this.brokerController.getSnodePullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
+ request.getRequestCommand());
+ } else {
+ this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
+ request.getRequestCommand());
+ }
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
@@ -149,8 +154,13 @@ public class PullRequestHoldService extends ServiceThread {
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
- this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
- request.getRequestCommand());
+ if (request.getMessageFilter() == null && request.getSubscriptionData() == null) {
+ this.brokerController.getSnodePullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
+ request.getRequestCommand());
+ } else {
+ this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
+ request.getRequestCommand());
+ }
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index b5e6085..f0d155f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -74,6 +74,7 @@ public class ClientManageProcessor implements NettyRequestProcessor {
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
+ log.info("heart beat request:{}", heartbeatData);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
heartbeatData.getClientID(),
@@ -121,6 +122,7 @@ public class ClientManageProcessor implements NettyRequestProcessor {
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
clientChannelInfo);
}
+
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
new file mode 100644
index 0000000..8beb6fa
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.FileRegion;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.filter.ConsumerFilterData;
+import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
+import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
+import org.apache.rocketmq.broker.longpolling.PullRequest;
+import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
+import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
+import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+public class SnodePullMessageProcessor implements NettyRequestProcessor {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final BrokerController brokerController;
+ private List<ConsumeMessageHook> consumeMessageHookList;
+
+ public SnodePullMessageProcessor(final BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ @Override
+ public RemotingCommand processRequest(final ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ return this.processRequest(ctx.channel(), request, true);
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+
+ private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
+ throws RemotingCommandException {
+ RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+ final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
+ final PullMessageRequestHeader requestHeader =
+ (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+
+ response.setOpaque(request.getOpaque());
+
+ log.info("receive PullMessage request command, {}", request);
+ final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
+ final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
+ final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
+
+ final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
+ if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
+ return response;
+ }
+
+ final GetMessageResult getMessageResult =
+ this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+ requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), null);
+
+ log.info("Get message response:{}",getMessageResult);
+ if (getMessageResult != null) {
+ response.setRemark(getMessageResult.getStatus().name());
+ responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
+ responseHeader.setMinOffset(getMessageResult.getMinOffset());
+ responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
+
+ if (getMessageResult.isSuggestPullingFromSlave()) {
+ responseHeader.setSuggestWhichBrokerId(0L);
+ } else {
+ responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+ }
+
+ switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
+ case ASYNC_MASTER:
+ case SYNC_MASTER:
+ break;
+ case SLAVE:
+ if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
+ response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+ responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+ }
+ break;
+ }
+
+// if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
+// // consume too slow ,redirect to another machine
+// if (getMessageResult.isSuggestPullingFromSlave()) {
+// responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
+// }
+// // consume ok
+// else {
+// responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
+// }
+// } else {
+// responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+// }
+
+ switch (getMessageResult.getStatus()) {
+ case FOUND:
+ response.setCode(ResponseCode.SUCCESS);
+ break;
+ case MESSAGE_WAS_REMOVING:
+ response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+ break;
+ case NO_MATCHED_LOGIC_QUEUE:
+ case NO_MESSAGE_IN_QUEUE:
+ if (0 != requestHeader.getQueueOffset()) {
+ response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+
+ // XXX: warn and notify me
+ log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
+ requestHeader.getQueueOffset(),
+ getMessageResult.getNextBeginOffset(),
+ requestHeader.getTopic(),
+ requestHeader.getQueueId(),
+ requestHeader.getConsumerGroup()
+ );
+ } else {
+ response.setCode(ResponseCode.PULL_NOT_FOUND);
+ }
+ break;
+ case NO_MATCHED_MESSAGE:
+ response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+ break;
+ case OFFSET_FOUND_NULL:
+ response.setCode(ResponseCode.PULL_NOT_FOUND);
+ break;
+ case OFFSET_OVERFLOW_BADLY:
+ response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+ // XXX: warn and notify me
+ log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
+ requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
+ break;
+ case OFFSET_OVERFLOW_ONE:
+ response.setCode(ResponseCode.PULL_NOT_FOUND);
+ break;
+ case OFFSET_TOO_SMALL:
+ response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+ log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
+ requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
+ getMessageResult.getMinOffset(), channel.remoteAddress());
+ break;
+ default:
+ assert false;
+ break;
+ }
+
+ if (this.hasConsumeMessageHook()) {
+ ConsumeMessageContext context = new ConsumeMessageContext();
+ context.setConsumerGroup(requestHeader.getConsumerGroup());
+ context.setTopic(requestHeader.getTopic());
+ context.setQueueId(requestHeader.getQueueId());
+
+ String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS:
+ int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
+ int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
+
+ context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
+ context.setCommercialRcvTimes(incValue);
+ context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
+ context.setCommercialOwner(owner);
+
+ break;
+ case ResponseCode.PULL_NOT_FOUND:
+ if (!brokerAllowSuspend) {
+
+ context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
+ context.setCommercialRcvTimes(1);
+ context.setCommercialOwner(owner);
+
+ }
+ break;
+ case ResponseCode.PULL_RETRY_IMMEDIATELY:
+ case ResponseCode.PULL_OFFSET_MOVED:
+ context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
+ context.setCommercialRcvTimes(1);
+ context.setCommercialOwner(owner);
+ break;
+ default:
+ assert false;
+ break;
+ }
+
+ this.executeConsumeMessageHookBefore(context);
+ }
+
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS:
+
+ this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+ getMessageResult.getMessageCount());
+
+ this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+ getMessageResult.getBufferTotalSize());
+
+ this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
+ if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
+ final long beginTimeMills = this.brokerController.getMessageStore().now();
+ final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
+ this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
+ requestHeader.getTopic(), requestHeader.getQueueId(),
+ (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
+ response.setBody(r);
+ } else {
+ try {
+ FileRegion fileRegion =
+ new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
+ channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ getMessageResult.release();
+ if (!future.isSuccess()) {
+ log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
+ }
+ }
+ });
+ } catch (Throwable e) {
+ log.error("transfer many message by pagecache exception", e);
+ getMessageResult.release();
+ }
+
+ response = null;
+ }
+ break;
+ case ResponseCode.PULL_NOT_FOUND:
+
+ if (brokerAllowSuspend && hasSuspendFlag) {
+ long pollingTimeMills = suspendTimeoutMillisLong;
+ if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
+ pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
+ }
+
+ String topic = requestHeader.getTopic();
+ long offset = requestHeader.getQueueOffset();
+ int queueId = requestHeader.getQueueId();
+ PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
+ this.brokerController.getMessageStore().now(), offset, null, null);
+ this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
+ response = null;
+ break;
+ }
+
+ case ResponseCode.PULL_RETRY_IMMEDIATELY:
+ break;
+ case ResponseCode.PULL_OFFSET_MOVED:
+ if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
+ || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
+ MessageQueue mq = new MessageQueue();
+ mq.setTopic(requestHeader.getTopic());
+ mq.setQueueId(requestHeader.getQueueId());
+ mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+
+ OffsetMovedEvent event = new OffsetMovedEvent();
+ event.setConsumerGroup(requestHeader.getConsumerGroup());
+ event.setMessageQueue(mq);
+ event.setOffsetRequest(requestHeader.getQueueOffset());
+ event.setOffsetNew(getMessageResult.getNextBeginOffset());
+ this.generateOffsetMovedEvent(event);
+ log.warn(
+ "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
+ requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
+ responseHeader.getSuggestWhichBrokerId());
+ } else {
+// responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
+ response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+ log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
+ requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
+ responseHeader.getSuggestWhichBrokerId());
+ }
+
+ break;
+ default:
+ assert false;
+ }
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("store getMessage return null");
+ }
+
+ boolean storeOffsetEnable = brokerAllowSuspend;
+ storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
+ storeOffsetEnable = storeOffsetEnable
+ && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
+ if (storeOffsetEnable) {
+ this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
+ requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
+ }
+ return response;
+ }
+
+ public boolean hasConsumeMessageHook() {
+ return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
+ }
+
+ public void executeConsumeMessageHookBefore(final ConsumeMessageContext context) {
+ if (hasConsumeMessageHook()) {
+ for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+ try {
+ hook.consumeMessageBefore(context);
+ } catch (Throwable e) {
+ }
+ }
+ }
+ }
+
+ private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic,
+ final int queueId) {
+ final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
+
+ long storeTimestamp = 0;
+ try {
+ List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList();
+ for (ByteBuffer bb : messageBufferList) {
+
+ byteBuffer.put(bb);
+ storeTimestamp = bb.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
+ }
+ } finally {
+ getMessageResult.release();
+ }
+
+ this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - storeTimestamp);
+ return byteBuffer.array();
+ }
+
+ private void generateOffsetMovedEvent(final OffsetMovedEvent event) {
+ try {
+ MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+ msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT);
+ msgInner.setTags(event.getConsumerGroup());
+ msgInner.setDelayTimeLevel(0);
+ msgInner.setKeys(event.getConsumerGroup());
+ msgInner.setBody(event.encode());
+ msgInner.setFlag(0);
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+ msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(TopicFilterType.SINGLE_TAG, msgInner.getTags()));
+
+ msgInner.setQueueId(0);
+ msgInner.setSysFlag(0);
+ msgInner.setBornTimestamp(System.currentTimeMillis());
+ msgInner.setBornHost(RemotingUtil.string2SocketAddress(this.brokerController.getBrokerAddr()));
+ msgInner.setStoreHost(msgInner.getBornHost());
+
+ msgInner.setReconsumeTimes(0);
+
+ PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
+ } catch (Exception e) {
+ log.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e);
+ }
+ }
+
+ public void executeRequestWhenWakeup(final Channel channel,
+ final RemotingCommand request) throws RemotingCommandException {
+ Runnable run = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final RemotingCommand response = SnodePullMessageProcessor.this.processRequest(channel, request, false);
+
+ if (response != null) {
+ response.setOpaque(request.getOpaque());
+ response.markResponseType();
+ try {
+ channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ log.error("processRequestWrapper response to {} failed",
+ future.channel().remoteAddress(), future.cause());
+ log.error(request.toString());
+ log.error(response.toString());
+ }
+ }
+ });
+ } catch (Throwable e) {
+ log.error("processRequestWrapper process request over, but response failed", e);
+ log.error(request.toString());
+ log.error(response.toString());
+ }
+ }
+ } catch (RemotingCommandException e1) {
+ log.error("excuteRequestWhenWakeup run", e1);
+ }
+ }
+ };
+ this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
+ }
+
+ public void registerConsumeMessageHook(List<ConsumeMessageHook> sendMessageHookList) {
+ this.consumeMessageHookList = sendMessageHookList;
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 0fa1ae7..6302cd0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -353,7 +353,8 @@ public class MQClientAPIImpl {
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+ String addrS = "localhost:11911";//TODO FIXME
+ RemotingCommand response = this.remotingClient.invokeSync(addrS, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}
@@ -557,14 +558,15 @@ public class MQClientAPIImpl {
}
public PullResult pullMessage(
- final String addr,
+ String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
-
+ requestHeader.setEnodeAddr(addr);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SNODE_PULL_MESSAGE, requestHeader);
+ addr = "localhost:11911"; //TODO FIXME
switch (communicationMode) {
case ONEWAY:
assert false;
@@ -647,7 +649,7 @@ public class MQClientAPIImpl {
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
-
+ log.info("response header: {}", responseHeader.getSuggestWhichBrokerId());
return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}
@@ -728,11 +730,12 @@ public class MQClientAPIImpl {
final String consumerGroup,
final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
MQBrokerException, InterruptedException {
+
GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ String addrS = "localhost:11911";//TODO FIXME
+ RemotingCommand response = this.remotingClient.invokeSync(addrS,
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 9ffaed0..984e2cc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -537,7 +537,8 @@ public class MQClientInstance {
}
try {
- int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
+ String addrS = "localhost:11911"; //TODO FIXME
+ int version = this.mQClientAPIImpl.sendHearbeat(addrS, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
@@ -547,6 +548,7 @@ public class MQClientInstance {
log.info(heartbeatData.toString());
}
} catch (Exception e) {
+ log.error("send heart beat error:{}",e);
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
} else {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 7ace9d5..9ada834 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -247,6 +247,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
/**
* This method will be removed in the version 5.0.0 and <code>getCheckListener</code> is recommended.
+ *
* @return
*/
@Override
@@ -440,13 +441,14 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* DEFAULT ASYNC -------------------------------------------------------
*/
public void send(Message msg,
- SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+ SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
/**
- * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
- * A new one will be provided in next version
+ * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
+ * provided in next version
+ *
* @param msg
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
@@ -481,7 +483,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
-
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
@@ -653,18 +654,17 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
private SendResult sendKernelImpl(final Message msg,
- final MessageQueue mq,
- final CommunicationMode communicationMode,
- final SendCallback sendCallback,
- final TopicPublishInfo topicPublishInfo,
- final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ final MessageQueue mq,
+ final CommunicationMode communicationMode,
+ final SendCallback sendCallback,
+ final TopicPublishInfo topicPublishInfo,
+ final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
-
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
@@ -733,6 +733,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
+ requestHeader.setEnodeAddr(brokerAddr);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
@@ -943,8 +944,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
/**
- * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
- * A new one will be provided in next version
+ * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
+ * provided in next version
+ *
* @param msg
* @param mq
* @param sendCallback
@@ -1064,8 +1066,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
/**
- * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
- * A new one will be provided in next version
+ * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
+ * provided in next version
+ *
* @param msg
* @param selector
* @param arg
@@ -1076,7 +1079,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* @throws InterruptedException
*/
@Deprecated
- public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout)
+ public void send(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getCallbackExecutor();
@@ -1120,7 +1124,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public TransactionSendResult sendMessageInTransaction(final Message msg,
- final LocalTransactionExecuter localTransactionExecuter, final Object arg)
+ final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
@@ -1243,6 +1247,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
}
+
public ExecutorService getCallbackExecutor() {
return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index fe0ae9f..48295a3 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -37,4 +37,5 @@ public class LoggerName {
public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
+ public static final String SNODE_LOGGER_NAME = "RocketmqSnode";
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterSnodeResult.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterSnodeResult.java
new file mode 100644
index 0000000..6527a55
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterSnodeResult.java
@@ -0,0 +1,20 @@
+package org.apache.rocketmq.common.namesrv;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class RegisterSnodeResult {
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 8cf2d46..b36bdb2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -167,4 +167,9 @@ public class RequestCode {
public static final int QUERY_CONSUME_QUEUE = 321;
public static final int QUERY_DATA_VERSION = 322;
+
+ public static final int REGISTER_SNODE = 350;
+
+ public static final int SNODE_PULL_MESSAGE = 351;
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
index 566bf93..9c4d913 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
@@ -20,7 +20,9 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
@@ -59,6 +61,26 @@ public class ClusterInfo extends RemotingSerializable {
return addrs.toArray(new String[] {});
}
+ public String[] retrieveAllMasterAddrByCluster(String cluster) {
+ List<String> addrs = new ArrayList<String>();
+ if (clusterAddrTable.containsKey(cluster)) {
+ Set<String> brokerNames = clusterAddrTable.get(cluster);
+ for (String brokerName : brokerNames) {
+ BrokerData brokerData = brokerAddrTable.get(brokerName);
+ if (null != brokerData) {
+ HashMap<Long, String> brokerAddrs = brokerData.getBrokerAddrs();
+ for (Map.Entry<Long, String> entry : brokerAddrs.entrySet()) {
+ if (MixAll.MASTER_ID == entry.getKey()) {
+ addrs.add(entry.getValue());
+ }
+ }
+
+ }
+ }
+ }
+ return addrs.toArray(new String[] {});
+ }
+
public String[] retrieveAllClusterNames() {
return clusterAddrTable.keySet().toArray(new String[] {});
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 106e89e..8332307 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -48,6 +48,16 @@ public class PullMessageRequestHeader implements CommandCustomHeader {
private Long subVersion;
private String expressionType;
+ private String enodeAddr;
+
+ public String getEnodeAddr() {
+ return enodeAddr;
+ }
+
+ public void setEnodeAddr(String enodeAddr) {
+ this.enodeAddr = enodeAddr;
+ }
+
@Override
public void checkFields() throws RemotingCommandException {
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 2df31e6..81e0cff 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -52,6 +52,8 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
private boolean batch = false;
private Integer maxReconsumeTimes;
+ private String enodeAddr;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -159,4 +161,12 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
public void setBatch(boolean batch) {
this.batch = batch;
}
+
+ public String getEnodeAddr() {
+ return enodeAddr;
+ }
+
+ public void setEnodeAddr(String enodeAddr) {
+ this.enodeAddr = enodeAddr;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 4e0098b..9602805 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -54,6 +54,8 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
@CFNullable
private boolean m; //batch
+ private String n; //enode addr
+
public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader();
v1.setProducerGroup(v2.a);
@@ -69,6 +71,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v1.setUnitMode(v2.k);
v1.setMaxReconsumeTimes(v2.l);
v1.setBatch(v2.m);
+ v1.setEnodeAddr(v2.n);
return v1;
}
@@ -87,6 +90,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v2.k = v1.isUnitMode();
v2.l = v1.getMaxReconsumeTimes();
v2.m = v1.isBatch();
+ v2.n = v1.getEnodeAddr();
return v2;
}
@@ -197,4 +201,12 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
public void setM(boolean m) {
this.m = m;
}
+
+ public String getN() {
+ return n;
+ }
+
+ public void setN(String n) {
+ this.n = n;
+ }
}
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterSnodeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterSnodeRequestHeader.java
new file mode 100644
index 0000000..7752cd7
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterSnodeRequestHeader.java
@@ -0,0 +1,61 @@
+package org.apache.rocketmq.common.protocol.header.namesrv;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class RegisterSnodeRequestHeader implements CommandCustomHeader {
+
+ @CFNotNull
+ private String snodeName;
+
+ @CFNotNull
+ private String snodeAddr;
+
+ @CFNotNull
+ private String clusterName;
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getSnodeName() {
+ return snodeName;
+ }
+
+ public void setSnodeName(String snodeName) {
+ this.snodeName = snodeName;
+ }
+
+ public String getSnodeAddr() {
+ return snodeAddr;
+ }
+
+ public void setSnodeAddr(String snodeAddr) {
+ this.snodeAddr = snodeAddr;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterSnodeResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterSnodeResponseHeader.java
new file mode 100644
index 0000000..4d4f15f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterSnodeResponseHeader.java
@@ -0,0 +1,20 @@
+package org.apache.rocketmq.common.protocol.header.namesrv;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class RegisterSnodeResponseHeader {
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
index 03151f5..eb32749 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
@@ -28,6 +28,15 @@ public class HeartbeatData extends RemotingSerializable {
private String clientID;
private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();
+ private SnodeData snodeData;
+
+ public SnodeData getSnodeData() {
+ return snodeData;
+ }
+
+ public void setSnodeData(SnodeData snodeData) {
+ this.snodeData = snodeData;
+ }
public String getClientID() {
return clientID;
@@ -53,9 +62,12 @@ public class HeartbeatData extends RemotingSerializable {
this.consumerDataSet = consumerDataSet;
}
- @Override
- public String toString() {
- return "HeartbeatData [clientID=" + clientID + ", producerDataSet=" + producerDataSet
- + ", consumerDataSet=" + consumerDataSet + "]";
+ @Override public String toString() {
+ return "HeartbeatData{" +
+ "clientID='" + clientID + '\'' +
+ ", producerDataSet=" + producerDataSet +
+ ", consumerDataSet=" + consumerDataSet +
+ ", snodeData='" + snodeData + '\'' +
+ '}';
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SnodeData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SnodeData.java
new file mode 100644
index 0000000..e4cb9b1
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SnodeData.java
@@ -0,0 +1,34 @@
+package org.apache.rocketmq.common.protocol.heartbeat;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class SnodeData {
+ private String snodeName;
+
+ public String getSnodeName() {
+ return snodeName;
+ }
+
+ public void setSnodeName(String snodeName) {
+ this.snodeName = snodeName;
+ }
+
+ @Override public String toString() {
+ return "SnodeData{" +
+ "snodeName='" + snodeName + '\'' +
+ '}';
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/SnodeData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/SnodeData.java
new file mode 100644
index 0000000..db76ad2
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/SnodeData.java
@@ -0,0 +1,57 @@
+package org.apache.rocketmq.common.protocol.route;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class SnodeData {
+ private String snodeName;
+
+ private String addr;
+
+ private String clusterName;
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getSnodeName() {
+ return snodeName;
+ }
+
+ public void setSnodeName(String snodeName) {
+ this.snodeName = snodeName;
+ }
+
+ public String getAddr() {
+ return addr;
+ }
+
+ public void setAddr(String addr) {
+ this.addr = addr;
+ }
+
+ @Override
+ public String toString() {
+ return "SnodeData{" +
+ "snodeName='" + snodeName + '\'' +
+ ", addr='" + addr + '\'' +
+ ", clusterName='" + clusterName + '\'' +
+ '}';
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/PositiveAtomicCounter.java b/common/src/main/java/org/apache/rocketmq/common/utils/PositiveAtomicCounter.java
new file mode 100644
index 0000000..105b88c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/PositiveAtomicCounter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PositiveAtomicCounter {
+ private static final int MASK = 0x7FFFFFFF;
+ private final AtomicInteger atom;
+
+
+ public PositiveAtomicCounter() {
+ atom = new AtomicInteger(0);
+ }
+
+
+ public final int incrementAndGet() {
+ final int rt = atom.incrementAndGet();
+ return rt & MASK;
+ }
+
+
+ public int intValue() {
+ return atom.intValue();
+ }
+}
diff --git a/distribution/conf/logback_snode.xml b/distribution/conf/logback_snode.xml
new file mode 100644
index 0000000..554f589
--- /dev/null
+++ b/distribution/conf/logback_snode.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="DefaultAppender"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${user.home}/logs/rocketmqlogs/snode_default.log</file>
+ <append>true</append>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/snode_default.%i.log.gz</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>5</maxIndex>
+ </rollingPolicy>
+ <triggeringPolicy
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>100MB</maxFileSize>
+ </triggeringPolicy>
+ <encoder>
+ <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+ <charset class="java.nio.charset.Charset">UTF-8</charset>
+ </encoder>
+ </appender>
+
+ <appender name="RocketmqSnodeAppender_inner"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${user.home}/logs/rocketmqlogs/snode.log</file>
+ <append>true</append>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/snode.%i.log.gz</fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>5</maxIndex>
+ </rollingPolicy>
+ <triggeringPolicy
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>100MB</maxFileSize>
+ </triggeringPolicy>
+ <encoder>
+ <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+ <charset class="java.nio.charset.Charset">UTF-8</charset>
+ </encoder>
+ </appender>
+ <appender name="RocketmqSnodeAppender" class="ch.qos.logback.classic.AsyncAppender">
+ <appender-ref ref="RocketmqSnodeAppender_inner"/>
+ <discardingThreshold>0</discardingThreshold>
+ </appender>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <append>true</append>
+ <encoder>
+ <pattern>%d{yyy-MM-dd HH\:mm\:ss,SSS} %p %t - %m%n</pattern>
+ <charset class="java.nio.charset.Charset">UTF-8</charset>
+ </encoder>
+ </appender>
+
+ <logger name="RocketmqSnode" additivity="false">
+ <level value="INFO"/>
+ <appender-ref ref="RocketmqSnodeAppender"/>
+ <appender-ref ref="STDOUT"/>
+ </logger>
+
+ <logger name="RocketmqCommon" additivity="false">
+ <level value="INFO"/>
+ <appender-ref ref="RocketmqSnodeAppender"/>
+ </logger>
+
+ <logger name="RocketmqRemoting" additivity="false">
+ <level value="INFO"/>
+ <appender-ref ref="RocketmqSnodeAppender"/>
+ <appender-ref ref="STDOUT"/>
+ </logger>
+
+ <root>
+ <level value="debug"/>
+ <appender-ref ref="STDOUT"/>
+ <appender-ref ref="DefaultAppender"/>
+ </root>
+</configuration>
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
index 53a1d4d..b2609c8 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
@@ -20,6 +20,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
@@ -50,7 +51,7 @@ public class Producer {
*/
producer.start();
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < 10; i++) {
try {
/*
@@ -76,6 +77,7 @@ public class Producer {
/*
* Shut down once the producer instance is not longer in use.
*/
+ Thread.sleep(100000000000L);
producer.shutdown();
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
index 7b504dd..84b7872 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java
@@ -43,7 +43,6 @@ public class Producer {
} catch (Exception e) {
e.printStackTrace();
}
-
producer.shutdown();
}
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index dc32445..3b12d49 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -27,6 +27,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
@@ -77,7 +78,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
request);
}
-
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
@@ -122,12 +122,26 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
+ case RequestCode.REGISTER_SNODE:
+ return this.registerSnode(ctx, request);
default:
break;
}
return null;
}
+ public RemotingCommand registerSnode(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final RegisterSnodeRequestHeader requestHeader =
+ (RegisterSnodeRequestHeader) request.decodeCommandCustomHeader(RegisterSnodeRequestHeader.class);
+ this.namesrvController.getRouteInfoManager().registerSnode(
+ requestHeader.getClusterName(),
+ requestHeader.getSnodeName(),
+ requestHeader.getSnodeAddr());
+ return response;
+ }
+
@Override
public boolean rejectRequest() {
return false;
@@ -280,7 +294,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
- log.info("requestHeader: " + requestHeader );
+ log.info("requestHeader: " + requestHeader);
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 00962ef..beef528 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.route.SnodeData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
@@ -54,6 +55,8 @@ public class RouteInfoManager {
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
+ private final HashMap<String/* snodeName*/, SnodeData> snodeTable;
+ private final HashMap<String/* clusterName*/, Set<String/*snodeName*/>> snodeCluster;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
@@ -61,6 +64,8 @@ public class RouteInfoManager {
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
+ this.snodeTable = new HashMap<>(256);
+ this.snodeCluster = new HashMap<>(256);
}
public byte[] getAllClusterInfo() {
@@ -99,6 +104,32 @@ public class RouteInfoManager {
return topicList.encode();
}
+ public void registerSnode(
+ final String clusterName,
+ final String snodeName,
+ final String snodeAddr) {
+ try {
+ this.lock.writeLock().lockInterruptibly();
+ Set<String> snodeSet = this.snodeCluster.get(clusterName);
+ if (snodeSet == null) {
+ snodeSet = new HashSet<>();
+ snodeSet.add(snodeName);
+ this.snodeCluster.put(clusterName, snodeSet);
+ } else {
+ snodeSet.add(snodeName);
+ }
+ SnodeData snodeData = new SnodeData();
+ snodeData.setAddr(snodeAddr);
+ snodeData.setSnodeName(snodeName);
+ snodeData.setClusterName(clusterName);
+ snodeTable.put(snodeName, snodeData);
+ } catch (Exception ex) {
+ log.error("Register snode error", ex);
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ }
+
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
diff --git a/pom.xml b/pom.xml
index 0a8a41e..47a7b68 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,6 +125,7 @@
<module>distribution</module>
<module>openmessaging</module>
<module>logging</module>
+ <module>rocketmq-snode</module>
</modules>
<build>
diff --git a/rocketmq-snode/pom.xml b/rocketmq-snode/pom.xml
new file mode 100644
index 0000000..d3df574
--- /dev/null
+++ b/rocketmq-snode/pom.xml
@@ -0,0 +1,93 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-all</artifactId>
+ <version>4.4.0-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>rocketmq-snode</artifactId>
+ <name>rocketmq-snode ${project.version}</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-store</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-remoting</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-srvutil</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-filter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.javassist</groupId>
+ <artifactId>javassist</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <!--<dependency>-->
+ <!--<groupId>org.apache.rocketmq</groupId>-->
+ <!--<artifactId>rocketmq-broker</artifactId>-->
+ <!--</dependency>-->
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
new file mode 100644
index 0000000..cb19bc9
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -0,0 +1,205 @@
+package org.apache.rocketmq.snode;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.RemotingServerFactory;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.snode.client.ClientHousekeepingService;
+import org.apache.rocketmq.snode.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.snode.client.ConsumerManager;
+import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener;
+import org.apache.rocketmq.snode.client.ProducerManager;
+import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
+import org.apache.rocketmq.snode.processor.HearbeatProcessor;
+import org.apache.rocketmq.snode.processor.PullMessageProcessor;
+import org.apache.rocketmq.snode.processor.SendMessageProcessor;
+import org.apache.rocketmq.snode.service.ScheduledService;
+import org.apache.rocketmq.snode.service.SnodeOuterService;
+import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
+import org.apache.rocketmq.snode.service.impl.SnodeOuterServiceImpl;
+
+public class SnodeController {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+ private final SnodeConfig snodeConfig;
+ private final NettyServerConfig nettyServerConfig;
+ private final NettyClientConfig nettyClientConfig;
+ private RemotingServer snodeServer;
+ private ExecutorService sendMessageExcutor;
+ private ExecutorService heartbeatExecutor;
+ private ExecutorService pullMessageExcutor;
+ private SnodeOuterService snodeOuterService;
+ private ExecutorService consumerManagerExcutor;
+ private ScheduledService scheduledService;
+ private ProducerManager producerManager;
+ private ConsumerManager consumerManager;
+ private ClientHousekeepingService clientHousekeepingService;
+ private SubscriptionGroupManager subscriptionGroupManager;
+
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+ "SnodeControllerScheduledThread"));
+
+ public SnodeController(NettyServerConfig nettyServerConfig,
+ NettyClientConfig nettyClientConfig,
+ SnodeConfig snodeConfig) {
+ this.nettyClientConfig = nettyClientConfig;
+ this.nettyServerConfig = nettyServerConfig;
+ this.snodeConfig = snodeConfig;
+ this.snodeOuterService = SnodeOuterServiceImpl.getInstance(this);
+ this.scheduledService = new ScheduledServiceImpl(this.snodeOuterService, this.snodeConfig);
+ this.sendMessageExcutor = ThreadUtils.newThreadPoolExecutor(
+ snodeConfig.getSnodeSendMessageMinPoolSize(),
+ snodeConfig.getSnodeSendMessageMaxPoolSize(),
+ 3000,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+ "SnodeSendMessageThread",
+ false);
+
+ this.pullMessageExcutor = ThreadUtils.newThreadPoolExecutor(
+ snodeConfig.getSnodeSendMessageMinPoolSize(),
+ snodeConfig.getSnodeSendMessageMaxPoolSize(),
+ 3000,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+ "SnodePullMessageThread",
+ false);
+
+ this.heartbeatExecutor = ThreadUtils.newThreadPoolExecutor(
+ snodeConfig.getSnodeHeartBeatCorePoolSize(),
+ snodeConfig.getSnodeHeartBeatMaxPoolSize(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeHeartBeatThreadPoolQueueCapacity()),
+ "SnodeHeartbeatThread",
+ true);
+
+ this.consumerManagerExcutor = ThreadUtils.newThreadPoolExecutor(
+ snodeConfig.getSnodeSendMessageMinPoolSize(),
+ snodeConfig.getSnodeSendMessageMaxPoolSize(),
+ 3000,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+ "SnodePullMessageThread",
+ false);
+
+ if (this.snodeConfig.getNamesrvAddr() != null) {
+ this.snodeOuterService.updateNameServerAddressList(this.snodeConfig.getNamesrvAddr());
+ log.info("Set user specified name server address: {}", this.snodeConfig.getNamesrvAddr());
+ }
+
+ this.producerManager = new ProducerManager();
+
+ ConsumerIdsChangeListener consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
+ this.consumerManager = new ConsumerManager(consumerIdsChangeListener);
+ this.subscriptionGroupManager = new SubscriptionGroupManager(this);
+ this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager);
+ }
+
+ public SnodeConfig getSnodeConfig() {
+ return snodeConfig;
+ }
+
+ public boolean initialize() {
+ this.snodeServer = RemotingServerFactory.createInstance().init(this.nettyServerConfig, null);
+ this.registerProcessor();
+ return true;
+ }
+
+ public void registerProcessor() {
+ snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, new SendMessageProcessor(this.snodeOuterService), sendMessageExcutor);
+ snodeServer.registerProcessor(RequestCode.HEART_BEAT, new HearbeatProcessor(this), heartbeatExecutor);
+ snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, new PullMessageProcessor(this.snodeOuterService), pullMessageExcutor);
+ snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, new ConsumerManageProcessor(this), consumerManagerExcutor);
+ }
+
+ public void start() {
+ initialize();
+ this.snodeServer.start();
+ this.snodeOuterService.start();
+ this.scheduledService.startScheduleTask();
+ this.clientHousekeepingService.start(this.snodeConfig.getHouseKeepingInterval());
+ }
+
+ public void shutdown() {
+ this.sendMessageExcutor.shutdown();
+ this.pullMessageExcutor.shutdown();
+ this.heartbeatExecutor.shutdown();
+ this.scheduledExecutorService.shutdown();
+ this.snodeOuterService.shutdown();
+ this.scheduledService.shutdown();
+ this.clientHousekeepingService.shutdown();
+ }
+
+ public ProducerManager getProducerManager() {
+ return producerManager;
+ }
+
+ public void setProducerManager(ProducerManager producerManager) {
+ this.producerManager = producerManager;
+ }
+
+ public RemotingServer getSnodeServer() {
+ return snodeServer;
+ }
+
+ public void setSnodeServer(RemotingServer snodeServer) {
+ this.snodeServer = snodeServer;
+ }
+
+ public ConsumerManager getConsumerManager() {
+ return consumerManager;
+ }
+
+ public void setConsumerManager(ConsumerManager consumerManager) {
+ this.consumerManager = consumerManager;
+ }
+
+ public SubscriptionGroupManager getSubscriptionGroupManager() {
+ return subscriptionGroupManager;
+ }
+
+ public void setSubscriptionGroupManager(SubscriptionGroupManager subscriptionGroupManager) {
+ this.subscriptionGroupManager = subscriptionGroupManager;
+ }
+
+ public NettyClientConfig getNettyClientConfig() {
+ return nettyClientConfig;
+ }
+
+ public SnodeOuterService getSnodeOuterService() {
+ return snodeOuterService;
+ }
+
+ public void setSnodeOuterService(SnodeOuterService snodeOuterService) {
+ this.snodeOuterService = snodeOuterService;
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
new file mode 100644
index 0000000..ddbeef8
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
@@ -0,0 +1,144 @@
+package org.apache.rocketmq.snode;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import ch.qos.logback.core.joran.spi.JoranException;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.TlsMode;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
+
+public class SnodeStartup {
+ private static InternalLogger log;
+ public static Properties properties = null;
+ public static CommandLine commandLine = null;
+ public static String configFile = null;
+
+ public static void main(String[] args) throws IOException, JoranException {
+ startup(createSnodeController(args));
+ }
+
+ public static SnodeController startup(SnodeController controller) {
+ try {
+ controller.start();
+
+ String tip = "The snode[" + controller.getSnodeConfig().getSnodeName() + ", "
+ + controller.getSnodeConfig().getSnodeAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+
+ if (null != controller.getSnodeConfig().getNamesrvAddr()) {
+ tip += " and name server is " + controller.getSnodeConfig().getNamesrvAddr();
+ }
+ log.info(tip);
+ System.out.printf("%s%n", tip);
+ return controller;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ return null;
+ }
+
+ public static SnodeController createSnodeController(String[] args) throws IOException, JoranException {
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ commandLine = ServerUtil.parseCmdLine("snode", args, buildCommandlineOptions(options),
+ new PosixParser());
+ if (null == commandLine) {
+ System.exit(-1);
+ }
+
+ final SnodeConfig snodeConfig = new SnodeConfig();
+ final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+
+ nettyServerConfig.setListenPort(snodeConfig.getListenPort());
+
+ nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
+ String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
+
+ if (commandLine.hasOption('c')) {
+ String file = commandLine.getOptionValue('c');
+ if (file != null) {
+ configFile = file;
+ InputStream in = new BufferedInputStream(new FileInputStream(file));
+ properties = new Properties();
+ properties.load(in);
+ MixAll.properties2Object(properties, snodeConfig);
+ MixAll.properties2Object(properties, nettyServerConfig);
+ MixAll.properties2Object(properties, nettyClientConfig);
+ in.close();
+ }
+ }
+
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ JoranConfigurator configurator = new JoranConfigurator();
+ configurator.setContext(lc);
+ lc.reset();
+ configurator.doConfigure(snodeConfig.getRocketmqHome() + "/conf/logback_snode.xml");
+ log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+ MixAll.printObjectProperties(log, snodeConfig);
+ MixAll.printObjectProperties(log, nettyClientConfig);
+ MixAll.printObjectProperties(log, nettyServerConfig);
+ final SnodeController snodeController = new SnodeController(
+ nettyServerConfig,
+ nettyClientConfig,
+ snodeConfig);
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ private volatile boolean hasShutdown = false;
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ if (!this.hasShutdown) {
+ this.hasShutdown = true;
+ snodeController.shutdown();
+ }
+ }
+ }
+ }));
+ return snodeController;
+ }
+
+ private static Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("c", "configFile", true, "Broker config properties file");
+ opt.setRequired(false);
+ options.addOption(opt);
+ return options;
+ }
+}
+
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientChannelInfo.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientChannelInfo.java
new file mode 100644
index 0000000..16d8cde
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientChannelInfo.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
+
+public class ClientChannelInfo {
+ private final Channel channel;
+ private final String clientId;
+ private final LanguageCode language;
+ private final int version;
+ private volatile long lastUpdateTimestamp = System.currentTimeMillis();
+
+ public ClientChannelInfo(Channel channel) {
+ this(channel, null, null, 0);
+ }
+
+ public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) {
+ this.channel = channel;
+ this.clientId = clientId;
+ this.language = language;
+ this.version = version;
+ }
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public LanguageCode getLanguage() {
+ return language;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((channel == null) ? 0 : channel.hashCode());
+ result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
+ result = prime * result + ((language == null) ? 0 : language.hashCode());
+ result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32));
+ result = prime * result + version;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ClientChannelInfo other = (ClientChannelInfo) obj;
+ if (channel == null) {
+ if (other.channel != null)
+ return false;
+ } else if (this.channel != other.channel) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language
+ + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]";
+ }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
similarity index 63%
copy from broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
copy to rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
index d536db5..e71ea0a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
@@ -14,13 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.broker.client;
+package org.apache.rocketmq.snode.client;
import io.netty.channel.Channel;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
@@ -29,17 +28,18 @@ import org.apache.rocketmq.remoting.ChannelEventListener;
public class ClientHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final BrokerController brokerController;
+ private final ProducerManager producerManager;
+ private final ConsumerManager consumerManager;
private ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
- public ClientHousekeepingService(final BrokerController brokerController) {
- this.brokerController = brokerController;
+ public ClientHousekeepingService(final ProducerManager producerManager, final ConsumerManager consumerManager) {
+ this.producerManager = producerManager;
+ this.consumerManager = consumerManager;
}
- public void start() {
-
+ public void start(long interval) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
@@ -49,13 +49,12 @@ public class ClientHousekeepingService implements ChannelEventListener {
log.error("Error occurred when scan not active client channels.", e);
}
}
- }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
+ }, 1000 * 10, interval, TimeUnit.MILLISECONDS);
}
private void scanExceptionChannel() {
- this.brokerController.getProducerManager().scanNotActiveChannel();
- this.brokerController.getConsumerManager().scanNotActiveChannel();
- this.brokerController.getFilterServerManager().scanNotActiveChannel();
+ this.producerManager.scanNotActiveChannel();
+ //this.consumerManager.scanNotActiveChannel();
}
public void shutdown() {
@@ -69,22 +68,19 @@ public class ClientHousekeepingService implements ChannelEventListener {
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
- this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
+ this.producerManager.doChannelCloseEvent(remoteAddr, channel);
+ this.producerManager.doChannelCloseEvent(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
- this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
+ this.producerManager.doChannelCloseEvent(remoteAddr, channel);
+ this.consumerManager.doChannelCloseEvent(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
- this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
+ this.producerManager.doChannelCloseEvent(remoteAddr, channel);
+ this.consumerManager.doChannelCloseEvent(remoteAddr, channel);
}
}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupEvent.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupEvent.java
new file mode 100644
index 0000000..0ebcf17
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+public enum ConsumerGroupEvent {
+
+ /**
+ * Some consumers in the group are changed.
+ */
+ CHANGE,
+ /**
+ * The group of consumer is unregistered.
+ */
+ UNREGISTER,
+ /**
+ * The group of consumer is registered.
+ */
+ REGISTER
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
new file mode 100644
index 0000000..9b366a5
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class ConsumerGroupInfo {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final String groupName;
+ private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
+ new ConcurrentHashMap<String, SubscriptionData>();
+ private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
+ new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
+ private volatile ConsumeType consumeType;
+ private volatile MessageModel messageModel;
+ private volatile ConsumeFromWhere consumeFromWhere;
+ private volatile long lastUpdateTimestamp = System.currentTimeMillis();
+
+ public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel,
+ ConsumeFromWhere consumeFromWhere) {
+ this.groupName = groupName;
+ this.consumeType = consumeType;
+ this.messageModel = messageModel;
+ this.consumeFromWhere = consumeFromWhere;
+ }
+
+ public ClientChannelInfo findChannel(final String clientId) {
+ Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Channel, ClientChannelInfo> next = it.next();
+ if (next.getValue().getClientId().equals(clientId)) {
+ return next.getValue();
+ }
+ }
+
+ return null;
+ }
+
+ public ConcurrentMap<String, SubscriptionData> getSubscriptionTable() {
+ return subscriptionTable;
+ }
+
+ public ConcurrentMap<Channel, ClientChannelInfo> getChannelInfoTable() {
+ return channelInfoTable;
+ }
+
+ public List<Channel> getAllChannel() {
+ List<Channel> result = new ArrayList<>();
+
+ result.addAll(this.channelInfoTable.keySet());
+
+ return result;
+ }
+
+ public List<String> getAllClientId() {
+ List<String> result = new ArrayList<>();
+
+ Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Entry<Channel, ClientChannelInfo> entry = it.next();
+ ClientChannelInfo clientChannelInfo = entry.getValue();
+ result.add(clientChannelInfo.getClientId());
+ }
+
+ return result;
+ }
+
+ public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {
+ ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel());
+ if (old != null) {
+ log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString());
+ }
+ }
+
+ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+ final ClientChannelInfo info = this.channelInfoTable.remove(channel);
+ if (info != null) {
+ log.warn(
+ "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",
+ info.toString(), groupName);
+ return true;
+ }
+
+ return false;
+ }
+
+ public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
+ MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
+ boolean updated = false;
+ this.consumeType = consumeType;
+ this.messageModel = messageModel;
+ this.consumeFromWhere = consumeFromWhere;
+
+ ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
+ if (null == infoOld) {
+ ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
+ if (null == prev) {
+ log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
+ messageModel, infoNew.toString());
+ updated = true;
+ }
+
+ infoOld = infoNew;
+ } else {
+ if (!infoOld.getClientId().equals(infoNew.getClientId())) {
+ log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
+ this.groupName,
+ infoOld.toString(),
+ infoNew.toString());
+ this.channelInfoTable.put(infoNew.getChannel(), infoNew);
+ }
+ }
+
+ this.lastUpdateTimestamp = System.currentTimeMillis();
+ infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
+
+ return updated;
+ }
+
+ public boolean updateSubscription(final Set<SubscriptionData> subList) {
+ boolean updated = false;
+
+ for (SubscriptionData sub : subList) {
+ SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
+ if (old == null) {
+ SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
+ if (null == prev) {
+ updated = true;
+ log.info("subscription changed, add new topic, group: {} {}",
+ this.groupName,
+ sub.toString());
+ }
+ } else if (sub.getSubVersion() > old.getSubVersion()) {
+ if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
+ log.info("subscription changed, group: {} OLD: {} NEW: {}",
+ this.groupName,
+ old.toString(),
+ sub.toString()
+ );
+ }
+
+ this.subscriptionTable.put(sub.getTopic(), sub);
+ }
+ }
+
+ Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, SubscriptionData> next = it.next();
+ String oldTopic = next.getKey();
+
+ boolean exist = false;
+ for (SubscriptionData sub : subList) {
+ if (sub.getTopic().equals(oldTopic)) {
+ exist = true;
+ break;
+ }
+ }
+
+ if (!exist) {
+ log.warn("subscription changed, group: {} remove topic {} {}",
+ this.groupName,
+ oldTopic,
+ next.getValue().toString()
+ );
+
+ it.remove();
+ updated = true;
+ }
+ }
+
+ this.lastUpdateTimestamp = System.currentTimeMillis();
+
+ return updated;
+ }
+
+ public Set<String> getSubscribeTopics() {
+ return subscriptionTable.keySet();
+ }
+
+ public SubscriptionData findSubscriptionData(final String topic) {
+ return this.subscriptionTable.get(topic);
+ }
+
+ public ConsumeType getConsumeType() {
+ return consumeType;
+ }
+
+ public void setConsumeType(ConsumeType consumeType) {
+ this.consumeType = consumeType;
+ }
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+ public void setMessageModel(MessageModel messageModel) {
+ this.messageModel = messageModel;
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+ public ConsumeFromWhere getConsumeFromWhere() {
+ return consumeFromWhere;
+ }
+
+ public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+ this.consumeFromWhere = consumeFromWhere;
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerIdsChangeListener.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerIdsChangeListener.java
new file mode 100644
index 0000000..ba950f1
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerIdsChangeListener.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+public interface ConsumerIdsChangeListener {
+
+ void handle(ConsumerGroupEvent event, String group, Object... args);
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
new file mode 100644
index 0000000..8d3b665
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+import io.netty.channel.Channel;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+public class ConsumerManager {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
+ private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
+ new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
+ private final ConsumerIdsChangeListener consumerIdsChangeListener;
+
+ public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
+ this.consumerIdsChangeListener = consumerIdsChangeListener;
+ }
+
+ public ClientChannelInfo findChannel(final String group, final String clientId) {
+ ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+ if (consumerGroupInfo != null) {
+ return consumerGroupInfo.findChannel(clientId);
+ }
+ return null;
+ }
+
+ public SubscriptionData findSubscriptionData(final String group, final String topic) {
+ ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
+ if (consumerGroupInfo != null) {
+ return consumerGroupInfo.findSubscriptionData(topic);
+ }
+
+ return null;
+ }
+
+ public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
+ return this.consumerTable.get(group);
+ }
+
+ public int findSubscriptionDataCount(final String group) {
+ ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
+ if (consumerGroupInfo != null) {
+ return consumerGroupInfo.getSubscriptionTable().size();
+ }
+
+ return 0;
+ }
+
+ public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+ Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumerGroupInfo> next = it.next();
+ ConsumerGroupInfo info = next.getValue();
+ boolean removed = info.doChannelCloseEvent(remoteAddr, channel);
+ if (removed) {
+ if (info.getChannelInfoTable().isEmpty()) {
+ ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
+ if (remove != null) {
+ log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
+ next.getKey());
+ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());
+ }
+ }
+
+ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
+ }
+ }
+ }
+
+ public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
+ ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+ final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
+
+ ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+ if (null == consumerGroupInfo) {
+ ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
+ ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
+ consumerGroupInfo = prev != null ? prev : tmp;
+ }
+
+ boolean r1 =
+ consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
+ consumeFromWhere);
+ boolean r2 = consumerGroupInfo.updateSubscription(subList);
+
+ if (r1 || r2) {
+ if (isNotifyConsumerIdsChangedEnable) {
+ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
+ }
+ }
+
+ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
+
+ return r1 || r2;
+ }
+
+ public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo,
+ boolean isNotifyConsumerIdsChangedEnable) {
+ ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+ if (null != consumerGroupInfo) {
+ consumerGroupInfo.unregisterChannel(clientChannelInfo);
+ if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
+ ConsumerGroupInfo remove = this.consumerTable.remove(group);
+ if (remove != null) {
+ log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
+
+ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
+ }
+ }
+ if (isNotifyConsumerIdsChangedEnable) {
+ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
+ }
+ }
+ }
+
+ public void scanNotActiveChannel() {
+ Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumerGroupInfo> next = it.next();
+ String group = next.getKey();
+ ConsumerGroupInfo consumerGroupInfo = next.getValue();
+ ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
+ consumerGroupInfo.getChannelInfoTable();
+
+ Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
+ while (itChannel.hasNext()) {
+ Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
+ ClientChannelInfo clientChannelInfo = nextChannel.getValue();
+ long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
+ if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+ log.warn(
+ "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
+ RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
+ RemotingUtil.closeChannel(clientChannelInfo.getChannel());
+ itChannel.remove();
+ }
+ }
+
+ if (channelInfoTable.isEmpty()) {
+ log.warn(
+ "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
+ group);
+ it.remove();
+ }
+ }
+ }
+
+ public HashSet<String> queryTopicConsumeByWho(final String topic) {
+ HashSet<String> groups = new HashSet<>();
+ Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumerGroupInfo> entry = it.next();
+ ConcurrentMap<String, SubscriptionData> subscriptionTable =
+ entry.getValue().getSubscriptionTable();
+ if (subscriptionTable.containsKey(topic)) {
+ groups.add(entry.getKey());
+ }
+ }
+ return groups;
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java
new file mode 100644
index 0000000..cb7c164
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+import io.netty.channel.Channel;
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.snode.SnodeController;
+
+//TODO Filter implementation
+public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
+ private final SnodeController snodeController;
+
+ public DefaultConsumerIdsChangeListener(SnodeController snodeController) {
+ this.snodeController = snodeController;
+ }
+
+ @Override
+ public void handle(ConsumerGroupEvent event, String group, Object... args) {
+ if (event == null) {
+ return;
+ }
+ switch (event) {
+ case CHANGE:
+ if (args == null || args.length < 1) {
+ return;
+ }
+ List<Channel> channels = (List<Channel>) args[0];
+ if (channels != null && snodeController.getSnodeConfig().isNotifyConsumerIdsChangedEnable()) {
+ for (Channel chl : channels) {
+ this.snodeController.getSnodeOuterService().notifyConsumerIdsChanged(chl, group);
+ }
+ }
+ break;
+ case UNREGISTER:
+// this.snodeController.getConsumerFilterManager().unRegister(group);
+ break;
+ case REGISTER:
+ if (args == null || args.length < 1) {
+ return;
+ }
+// Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
+// this.snodeController.getConsumerFilterManager().register(group, subscriptionDataList);
+ break;
+ default:
+ throw new RuntimeException("Unknown event " + event);
+ }
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
new file mode 100644
index 0000000..b80c027
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.PositiveAtomicCounter;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+public class ProducerManager {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final long LOCK_TIMEOUT_MILLIS = 3000;
+ private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
+ private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
+ private final Lock groupChannelLock = new ReentrantLock();
+ private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
+ new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+ private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
+ public ProducerManager() {
+
+ }
+
+ public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
+ HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable =
+ new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+ try {
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ newGroupChannelTable.putAll(groupChannelTable);
+ } finally {
+ groupChannelLock.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ return newGroupChannelTable;
+ }
+
+ public void scanNotActiveChannel() {
+ try {
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ for (final Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
+ .entrySet()) {
+ final String group = entry.getKey();
+ final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
+
+ Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Channel, ClientChannelInfo> item = it.next();
+ // final Integer id = item.getKey();
+ final ClientChannelInfo info = item.getValue();
+
+ long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
+ if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+ it.remove();
+ log.warn(
+ "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
+ RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
+ RemotingUtil.closeChannel(info.getChannel());
+ }
+ }
+ }
+ } finally {
+ this.groupChannelLock.unlock();
+ }
+ } else {
+ log.warn("ProducerManager scanNotActiveChannel lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ }
+
+ public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+ if (channel != null) {
+ try {
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ for (final Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
+ .entrySet()) {
+ final String group = entry.getKey();
+ final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
+ entry.getValue();
+ final ClientChannelInfo clientChannelInfo =
+ clientChannelInfoTable.remove(channel);
+ if (clientChannelInfo != null) {
+ log.info(
+ "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
+ clientChannelInfo.toString(), remoteAddr, group);
+ }
+
+ }
+ } finally {
+ this.groupChannelLock.unlock();
+ }
+ } else {
+ log.warn("ProducerManager doChannelCloseEvent lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ }
+ }
+
+ public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
+ try {
+ ClientChannelInfo clientChannelInfoFound = null;
+
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+ if (null == channelTable) {
+ channelTable = new HashMap<>();
+ this.groupChannelTable.put(group, channelTable);
+ }
+
+ clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
+ if (null == clientChannelInfoFound) {
+ channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
+ log.info("new producer connected, group: {} channel: {}", group,
+ clientChannelInfo.toString());
+ }
+ } finally {
+ this.groupChannelLock.unlock();
+ }
+
+ if (clientChannelInfoFound != null) {
+ clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
+ }
+ } else {
+ log.warn("ProducerManager registerProducer lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ }
+
+ public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
+ try {
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+ if (null != channelTable && !channelTable.isEmpty()) {
+ ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
+ if (old != null) {
+ log.info("unregister a producer[{}] from groupChannelTable {}", group,
+ clientChannelInfo.toString());
+ }
+
+ if (channelTable.isEmpty()) {
+ this.groupChannelTable.remove(group);
+ log.info("unregister a producer group[{}] from groupChannelTable", group);
+ }
+ }
+ } finally {
+ this.groupChannelLock.unlock();
+ }
+ } else {
+ log.warn("ProducerManager unregisterProducer lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ }
+
+ public Channel getAvaliableChannel(String groupId) {
+ HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
+ List<Channel> channelList = new ArrayList<Channel>();
+ if (channelClientChannelInfoHashMap != null) {
+ for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
+ channelList.add(channel);
+ }
+ int size = channelList.size();
+ if (0 == size) {
+ log.warn("Channel list is empty. groupId={}", groupId);
+ return null;
+ }
+
+ int index = positiveAtomicCounter.incrementAndGet() % size;
+ Channel channel = channelList.get(index);
+ int count = 0;
+ boolean isOk = channel.isActive() && channel.isWritable();
+ while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
+ if (isOk) {
+ return channel;
+ }
+ index = (++index) % size;
+ channel = channelList.get(index);
+ isOk = channel.isActive() && channel.isWritable();
+ }
+ } else {
+ log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
+ return null;
+ }
+ return null;
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
new file mode 100644
index 0000000..3c6799e
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.client;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
+import org.apache.rocketmq.snode.SnodeController;
+
+public class SubscriptionGroupManager extends ConfigManager {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
+ new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
+ private final DataVersion dataVersion = new DataVersion();
+ private transient SnodeController snodeController;
+
+ public enum SUBSCRIPTION_EVENT {
+ CREATE,
+ UPDATE,
+ DELETE
+ }
+
+ public SubscriptionGroupManager() {
+ this.init();
+ }
+
+ public SubscriptionGroupManager(SnodeController snodeController) {
+ this.snodeController = snodeController;
+ this.init();
+ }
+
+ private void init() {
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
+ this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
+ this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
+ this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
+ }
+
+ {
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
+ }
+ }
+
+ public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
+ SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
+ if (old != null) {
+ log.info("update subscription group config, old: {} new: {}", old, config);
+ } else {
+ log.info("create new subscription group, {}", config);
+ }
+
+ this.dataVersion.nextVersion();
+
+ this.persistToEnode(SUBSCRIPTION_EVENT.UPDATE, config);
+ }
+
+ public void disableConsume(final String groupName) {
+ SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);
+ if (old != null) {
+ old.setConsumeEnable(false);
+ this.dataVersion.nextVersion();
+ }
+ }
+
+ public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
+ SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
+ if (null == subscriptionGroupConfig) {
+ if (snodeController.getSnodeConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
+ subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setGroupName(group);
+ SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
+ if (null == preConfig) {
+ log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
+ }
+ this.dataVersion.nextVersion();
+ this.persistToEnode(SUBSCRIPTION_EVENT.CREATE, subscriptionGroupConfig);
+ }
+ }
+
+ return subscriptionGroupConfig;
+ }
+
+ @Override
+ public String encode() {
+ return this.encode(false);
+ }
+
+ @Override
+ public String configFilePath() {
+ //TODO get subscription persist request code
+ return null;
+ }
+
+ @Override
+ public void decode(String jsonString) {
+ if (jsonString != null) {
+ SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
+ if (obj != null) {
+ this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
+ this.dataVersion.assignNewOne(obj.dataVersion);
+ this.printLoadDataWhenFirstBoot(obj);
+ }
+ }
+ }
+
+ public String encode(final boolean prettyFormat) {
+ return RemotingSerializable.toJson(this, prettyFormat);
+ }
+
+ private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
+ Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, SubscriptionGroupConfig> next = it.next();
+ log.info("load exist subscription group, {}", next.getValue().toString());
+ }
+ }
+
+ public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
+ return subscriptionGroupTable;
+ }
+
+ public DataVersion getDataVersion() {
+ return dataVersion;
+ }
+
+ public void deleteSubscriptionGroupConfig(final String groupName) {
+ SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
+ if (old != null) {
+ log.info("delete subscription group OK, subscription group:{}", old);
+ this.dataVersion.nextVersion();
+ this.persistToEnode(SUBSCRIPTION_EVENT.DELETE, old);
+ } else {
+ log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);
+ }
+ }
+
+ void persistToEnode(SUBSCRIPTION_EVENT event, SubscriptionGroupConfig config) {
+
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
new file mode 100644
index 0000000..725cf6a
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
@@ -0,0 +1,223 @@
+package org.apache.rocketmq.snode.config;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.annotation.ImportantField;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class SnodeConfig {
+
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
+ private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+
+ @ImportantField
+ private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+
+ private String snodeName = "defaultNode";
+
+ private String snodeAddr = "127.0.0.1:11911";
+
+ private String clusterName = "defaultCluster";
+
+ private int snodeSendThreadPoolQueueCapacity = 10000;
+
+ private int snodeSendMessageMinPoolSize = 10;
+
+ private int snodeSendMessageMaxPoolSize = 20;
+
+ private int snodeHeartBeatCorePoolSize = 1;
+
+ private int snodeHeartBeatMaxPoolSize = 2;
+
+ private int snodeHeartBeatThreadPoolQueueCapacity = 1000;
+
+ private long snodeHeartBeatInterval = 30 * 1000;
+
+ private boolean fetechNameserver = false;
+
+ private long houseKeepingInterval = 10 * 1000;
+
+ private boolean notifyConsumerIdsChangedEnable = true;
+
+ private boolean autoCreateSubscriptionGroup = true;
+
+ private int listenPort = 11911;
+
+ public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) {
+ this.snodeHeartBeatInterval = snodeHeartBeatInterval;
+ }
+
+ public long getHouseKeepingInterval() {
+ return houseKeepingInterval;
+ }
+
+ public void setHouseKeepingInterval(long houseKeepingInterval) {
+ this.houseKeepingInterval = houseKeepingInterval;
+ }
+
+ public boolean isFetechNameserver() {
+ return fetechNameserver;
+ }
+
+ public void setFetechNameserver(boolean fetechNameserver) {
+ this.fetechNameserver = fetechNameserver;
+ }
+
+ public long getSnodeHeartBeatInterval() {
+ return snodeHeartBeatInterval;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ /**
+ * This configurable item defines interval of topics registration of broker to name server. Allowing values are
+ * between 10, 000 and 60, 000 milliseconds.
+ */
+ private int registerNameServerPeriod = 1000 * 30;
+
+ public int getRegisterNameServerPeriod() {
+ return registerNameServerPeriod;
+ }
+
+ public void setRegisterNameServerPeriod(int registerNameServerPeriod) {
+ this.registerNameServerPeriod = registerNameServerPeriod;
+ }
+
+ @ImportantField
+ private boolean fetchNamesrvAddrByAddressServer = false;
+
+ public boolean isFetchNamesrvAddrByAddressServer() {
+ return fetchNamesrvAddrByAddressServer;
+ }
+
+ public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) {
+ this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer;
+ }
+
+ public int getSnodeHeartBeatThreadPoolQueueCapacity() {
+ return snodeHeartBeatThreadPoolQueueCapacity;
+ }
+
+ public void setSnodeHeartBeatThreadPoolQueueCapacity(int snodeHeartBeatThreadPoolQueueCapacity) {
+ this.snodeHeartBeatThreadPoolQueueCapacity = snodeHeartBeatThreadPoolQueueCapacity;
+ }
+
+ public int getSnodeHeartBeatCorePoolSize() {
+ return snodeHeartBeatCorePoolSize;
+ }
+
+ public void setSnodeHeartBeatCorePoolSize(int snodeHeartBeatCorePoolSize) {
+ this.snodeHeartBeatCorePoolSize = snodeHeartBeatCorePoolSize;
+ }
+
+ public int getSnodeHeartBeatMaxPoolSize() {
+ return snodeHeartBeatMaxPoolSize;
+ }
+
+ public void setSnodeHeartBeatMaxPoolSize(int snodeHeartBeatMaxPoolSize) {
+ this.snodeHeartBeatMaxPoolSize = snodeHeartBeatMaxPoolSize;
+ }
+
+ public int getListenPort() {
+ return listenPort;
+ }
+
+ public String getRocketmqHome() {
+ return rocketmqHome;
+ }
+
+ public void setRocketmqHome(String rocketmqHome) {
+ this.rocketmqHome = rocketmqHome;
+ }
+
+ public void setListenPort(int listenPort) {
+ this.listenPort = listenPort;
+ }
+
+ public int getSnodeSendThreadPoolQueueCapacity() {
+ return snodeSendThreadPoolQueueCapacity;
+ }
+
+ public void setSnodeSendThreadPoolQueueCapacity(int snodeSendThreadPoolQueueCapacity) {
+ this.snodeSendThreadPoolQueueCapacity = snodeSendThreadPoolQueueCapacity;
+ }
+
+ public int getSnodeSendMessageMinPoolSize() {
+ return snodeSendMessageMinPoolSize;
+ }
+
+ public void setSnodeSendMessageMinPoolSize(int snodeSendMessageMinPoolSize) {
+ this.snodeSendMessageMinPoolSize = snodeSendMessageMinPoolSize;
+ }
+
+ public int getSnodeSendMessageMaxPoolSize() {
+ return snodeSendMessageMaxPoolSize;
+ }
+
+ public void setSnodeSendMessageMaxPoolSize(int snodeSendMessageMaxPoolSize) {
+ this.snodeSendMessageMaxPoolSize = snodeSendMessageMaxPoolSize;
+ }
+
+ public String getSnodeAddr() {
+ return snodeAddr;
+ }
+
+ public void setSnodeAddr(String snodeAddr) {
+ this.snodeAddr = snodeAddr;
+ }
+
+ public String getSnodeName() {
+ return snodeName;
+ }
+
+ public void setSnodeName(String snodeName) {
+ this.snodeName = snodeName;
+ }
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+ public boolean isNotifyConsumerIdsChangedEnable() {
+ return notifyConsumerIdsChangedEnable;
+ }
+
+ public void setNotifyConsumerIdsChangedEnable(boolean notifyConsumerIdsChangedEnable) {
+ this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
+ }
+
+ public boolean isAutoCreateSubscriptionGroup() {
+ return autoCreateSubscriptionGroup;
+ }
+
+ public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) {
+ this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup;
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
new file mode 100644
index 0000000..8509546
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import java.util.List;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
+
+public class ConsumerManageProcessor implements NettyRequestProcessor {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ private final SnodeController snodeController;
+
+ public ConsumerManageProcessor(final SnodeController snodeController) {
+ this.snodeController = snodeController;
+ }
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
+ throws RemotingCommandException {
+ switch (request.getCode()) {
+ case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
+ return this.getConsumerListByGroup(ctx, request);
+ default:
+ break;
+ }
+ return null;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+
+ public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
+ throws RemotingCommandException {
+ final RemotingCommand response =
+ RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
+ final GetConsumerListByGroupRequestHeader requestHeader =
+ (GetConsumerListByGroupRequestHeader) request
+ .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
+
+ ConsumerGroupInfo consumerGroupInfo =
+ this.snodeController.getConsumerManager().getConsumerGroupInfo(
+ requestHeader.getConsumerGroup());
+ if (consumerGroupInfo != null) {
+ List<String> clientIds = consumerGroupInfo.getAllClientId();
+ if (!clientIds.isEmpty()) {
+ GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();
+ body.setConsumerIdList(clientIds);
+ response.setBody(body.encode());
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ } else {
+ log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ }
+ } else {
+ log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ }
+
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());
+ return response;
+ }
+
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
new file mode 100644
index 0000000..f06af79
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
@@ -0,0 +1,92 @@
+package org.apache.rocketmq.snode.processor;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.client.ClientChannelInfo;
+
+public class HearbeatProcessor implements NettyRequestProcessor {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+ private final SnodeController snodeController;
+
+ public HearbeatProcessor(SnodeController snodeController) {
+ this.snodeController = snodeController;
+ }
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+ HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+ ctx.channel(),
+ heartbeatData.getClientID(),
+ request.getLanguage(),
+ request.getVersion()
+ );
+
+ if (heartbeatData.getProducerDataSet() != null) {
+ for (ProducerData producerData : heartbeatData.getProducerDataSet()) {
+ this.snodeController.getProducerManager().registerProducer(producerData.getGroupName(),
+ clientChannelInfo);
+ }
+ }
+
+ if (heartbeatData.getConsumerDataSet() != null) {
+ for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
+ SubscriptionGroupConfig subscriptionGroupConfig =
+ this.snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
+ data.getGroupName());
+ boolean isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
+ boolean changed = this.snodeController.getConsumerManager().registerConsumer(
+ data.getGroupName(),
+ clientChannelInfo,
+ data.getConsumeType(),
+ data.getMessageModel(),
+ data.getConsumeFromWhere(),
+ data.getSubscriptionDataSet(),
+ isNotifyConsumerIdsChangedEnable
+ );
+
+ if (changed) {
+ log.info("registerConsumer info changed {} {}",
+ data.toString(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ );
+ }
+ }
+ }
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ return response;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
new file mode 100644
index 0000000..a165fd4
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -0,0 +1,40 @@
+package org.apache.rocketmq.snode.processor;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.service.SnodeOuterService;
+
+public class PullMessageProcessor implements NettyRequestProcessor {
+
+ private final SnodeOuterService snodeOuterService;
+
+ public PullMessageProcessor(SnodeOuterService snodeOuterService){
+ this.snodeOuterService = snodeOuterService;
+ }
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+ return snodeOuterService.pullMessage(request);
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
new file mode 100644
index 0000000..e419475
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -0,0 +1,44 @@
+package org.apache.rocketmq.snode.processor;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.service.SnodeOuterService;
+
+public class SendMessageProcessor implements NettyRequestProcessor {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+ private final SnodeOuterService snodeOuterService;
+
+ public SendMessageProcessor(final SnodeOuterService snodeOuterService) {
+ this.snodeOuterService = snodeOuterService;
+ }
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+ return snodeOuterService.sendMessage(request);
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/ScheduledService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/ScheduledService.java
new file mode 100644
index 0000000..dc86688
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/ScheduledService.java
@@ -0,0 +1,21 @@
+package org.apache.rocketmq.snode.service;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public interface ScheduledService {
+ void startScheduleTask();
+ void shutdown();
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SendTransferService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SendTransferService.java
new file mode 100644
index 0000000..6dad57c
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SendTransferService.java
@@ -0,0 +1,26 @@
+package org.apache.rocketmq.snode.service;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public interface SendTransferService {
+ RemotingCommand sendMessage(RemotingCommand request);
+
+ boolean start();
+
+ void shutdown();
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
new file mode 100644
index 0000000..8764228
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
@@ -0,0 +1,52 @@
+package org.apache.rocketmq.snode.service;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+
+public interface SnodeOuterService {
+ void sendHearbeat(RemotingCommand remotingCommand);
+
+ RemotingCommand sendMessage(RemotingCommand remotingCommand);
+
+ RemotingCommand pullMessage(RemotingCommand remotingCommand);
+
+ void saveSubscriptionData(RemotingCommand remotingCommand);
+
+ void start();
+
+ void shutdown();
+
+ void registerSnode(SnodeConfig snodeConfig);
+
+ void updateNameServerAddressList(final String addrs);
+
+ String fetchNameServerAddr();
+
+ void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, MQBrokerException;
+
+ void notifyConsumerIdsChanged(final Channel channel, final String consumerGroup);
+
+ RemotingCommand creatTopic(TopicConfig topicConfig);
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
new file mode 100644
index 0000000..23d8867
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
@@ -0,0 +1,113 @@
+package org.apache.rocketmq.snode.service.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.SnodeData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.service.ScheduledService;
+import org.apache.rocketmq.snode.service.SnodeOuterService;
+
+public class ScheduledServiceImpl implements ScheduledService {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+ private SnodeOuterService snodeOuterService;
+ private SnodeConfig snodeConfig;
+
+ private final RemotingCommand enodeHeartbeat;
+
+ public ScheduledServiceImpl(SnodeOuterService snodeOuterService, SnodeConfig snodeConfig) {
+ this.snodeOuterService = snodeOuterService;
+ this.snodeConfig = snodeConfig;
+ enodeHeartbeat = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
+ HeartbeatData heartbeatData = new HeartbeatData();
+ heartbeatData.setClientID(snodeConfig.getSnodeName());
+ SnodeData snodeData = new SnodeData();
+ snodeData.setSnodeName(snodeConfig.getSnodeName());
+ heartbeatData.setSnodeData(snodeData);
+ enodeHeartbeat.setBody(heartbeatData.encode());
+ }
+
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "SNodeScheduledThread");
+ }
+ });
+
+ @Override
+ public void startScheduleTask() {
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ snodeOuterService.sendHearbeat(enodeHeartbeat);
+ } catch (Exception e) {
+ log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
+ }
+ }
+ }, 1000 * 10, this.snodeConfig.getSnodeHeartBeatInterval(), TimeUnit.MILLISECONDS);
+
+ if (snodeConfig.isFetchNamesrvAddrByAddressServer()) {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ snodeOuterService.fetchNameServerAddr();
+ } catch (Throwable e) {
+ log.error("ScheduledTask fetchNameServerAddr exception", e);
+ }
+ }
+ }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
+ }
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ snodeOuterService.registerSnode(snodeConfig);
+ }
+ }, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ snodeOuterService.updateEnodeAddr(snodeConfig.getClusterName());
+ } catch (Exception ex) {
+ log.warn("Update broker addr error:{}", ex);
+ }
+ }
+ }, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
+
+ }
+
+ @Override
+ public void shutdown() {
+ if (this.scheduledExecutorService != null) {
+ this.scheduledExecutorService.shutdown();
+ }
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SendTransferServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SendTransferServiceImpl.java
new file mode 100644
index 0000000..df589da
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SendTransferServiceImpl.java
@@ -0,0 +1,45 @@
+package org.apache.rocketmq.snode.service.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.rocketmq.common.ServiceState;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.service.SendTransferService;
+import org.apache.rocketmq.snode.service.SnodeOuterService;
+
+public class SendTransferServiceImpl implements SendTransferService {
+ private ServiceState serviceState = ServiceState.CREATE_JUST;
+ private SnodeOuterService snodeOuterService;
+
+ public SendTransferServiceImpl(SnodeOuterService snodeOuterService) {
+ snodeOuterService = snodeOuterService;
+ }
+
+ @Override
+ public RemotingCommand sendMessage(RemotingCommand request) {
+ return snodeOuterService.sendMessage(request);
+ }
+
+ @Override
+ public boolean start() {
+ return false;
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
new file mode 100644
index 0000000..14e577e
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
@@ -0,0 +1,270 @@
+package org.apache.rocketmq.snode.service.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.Channel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.TopAddressing;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.RemotingClientFactory;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.service.SnodeOuterService;
+
+public class SnodeOuterServiceImpl implements SnodeOuterService {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+ private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
+ private String nameSrvAddr = null;
+ private RemotingClient client;
+ private SnodeController snodeController;
+ private static SnodeOuterServiceImpl snodeOuterService;
+ private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> enodeTable =
+ new ConcurrentHashMap<>();
+ private final long defaultTimeoutMills = 3000L;
+
+ private SnodeOuterServiceImpl() {
+
+ }
+
+ public static SnodeOuterServiceImpl getInstance(SnodeController snodeController) {
+ if (snodeOuterService == null) {
+ synchronized (SnodeOuterServiceImpl.class) {
+ if (snodeOuterService == null) {
+ snodeOuterService = new SnodeOuterServiceImpl(snodeController);
+ return snodeOuterService;
+ }
+ }
+ }
+ return snodeOuterService;
+ }
+
+ private SnodeOuterServiceImpl(SnodeController snodeController) {
+ this.snodeController = snodeController;
+ this.client = RemotingClientFactory.createInstance().init(snodeController.getNettyClientConfig(), null);
+ }
+
+ @Override
+ public void start() {
+ this.client.start();
+ }
+
+ @Override
+ public void shutdown() {
+ this.client.shutdown();
+ }
+
+ @Override
+ public void sendHearbeat(RemotingCommand remotingCommand) {
+ for (Map.Entry<String, HashMap<Long, String>> entry : enodeTable.entrySet()) {
+ String enodeAddr = entry.getValue().get(MixAll.MASTER_ID);
+ if (enodeAddr != null) {
+ try {
+ RemotingCommand response = this.client.invokeSync(enodeAddr, remotingCommand, defaultTimeoutMills);
+ } catch (Exception ex) {
+ log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public RemotingCommand sendMessage(RemotingCommand request) {
+ try {
+ SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+ RemotingCommand response =
+ this.client.invokeSync(sendMessageRequestHeaderV2.getN(), request, defaultTimeoutMills);
+ return response;
+ } catch (Exception ex) {
+ log.error("Send message async error:", ex);
+ }
+ return null;
+ }
+
+ @Override
+ public RemotingCommand pullMessage(RemotingCommand request) {
+ try {
+ final PullMessageRequestHeader requestHeader =
+ (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+ RemotingCommand remotingCommand = this.client.invokeSync(requestHeader.getEnodeAddr(), request, 20 * defaultTimeoutMills);
+ log.info("Pull message response:{}", remotingCommand);
+ log.info("Pull message response:{}", remotingCommand.getBody().length);
+ return remotingCommand;
+ } catch (Exception ex) {
+ log.error("pull message async error:", ex);
+ }
+ return null;
+ }
+
+ @Override
+ public void saveSubscriptionData(RemotingCommand remotingCommand) {
+
+ }
+
+ @Override
+ public String fetchNameServerAddr() {
+ try {
+ String addrs = this.topAddressing.fetchNSAddr();
+ if (addrs != null) {
+ if (!addrs.equals(this.nameSrvAddr)) {
+ log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
+ this.updateNameServerAddressList(addrs);
+ this.nameSrvAddr = addrs;
+ return nameSrvAddr;
+ }
+ }
+ } catch (Exception e) {
+ log.error("fetchNameServerAddr Exception", e);
+ }
+ return nameSrvAddr;
+ }
+
+ private ClusterInfo getBrokerClusterInfo(
+ final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
+ RemotingCommand response = this.client.invokeSync(null, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return ClusterInfo.decode(response.getBody(), ClusterInfo.class);
+ }
+ default:
+ break;
+ }
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ @Override
+ public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ synchronized (this) {
+ ClusterInfo clusterInfo = getBrokerClusterInfo(defaultTimeoutMills);
+ if (clusterInfo != null) {
+ HashMap<String, Set<String>> brokerAddrs = clusterInfo.getClusterAddrTable();
+ for (Map.Entry<String, Set<String>> entry : brokerAddrs.entrySet()) {
+ Set<String> brokerNames = entry.getValue();
+ if (brokerNames != null) {
+ for (String brokerName : brokerNames) {
+ enodeTable.put(brokerName, clusterInfo.getBrokerAddrTable().get(brokerName).getBrokerAddrs());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void updateNameServerAddressList(final String addrs) {
+ List<String> list = new ArrayList<String>();
+ String[] addrArray = addrs.split(";");
+ for (String addr : addrArray) {
+ list.add(addr);
+ }
+ this.client.updateNameServerAddressList(list);
+ }
+
+ public void registerSnode(SnodeConfig snodeConfig) {
+ List<String> nameServerAddressList = this.client.getNameServerAddressList();
+ RemotingCommand remotingCommand = new RemotingCommand();
+ RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
+ requestHeader.setSnodeAddr(snodeConfig.getSnodeAddr());
+ requestHeader.setSnodeName(snodeConfig.getSnodeName());
+ requestHeader.setClusterName(snodeConfig.getClusterName());
+ remotingCommand.setCustomHeader(requestHeader);
+ remotingCommand.setCode(RequestCode.REGISTER_SNODE);
+ if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
+ for (String nameServer : nameServerAddressList) {
+ try {
+ this.client.invokeSync(nameSrvAddr, remotingCommand, 3000L);
+ } catch (Exception ex) {
+ log.warn("Register Snode to Nameserver addr: {} error, ex:{} ", nameServer, ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void notifyConsumerIdsChanged(
+ final Channel channel,
+ final String consumerGroup) {
+ if (null == consumerGroup) {
+ log.error("notifyConsumerIdsChanged consumerGroup is null");
+ return;
+ }
+
+ NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ RemotingCommand request =
+ RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
+
+ try {
+ this.snodeController.getSnodeServer().invokeOneway(channel, request, 10);
+ } catch (Exception e) {
+ log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
+ }
+ }
+
+ @Override
+ public RemotingCommand creatTopic(TopicConfig topicConfig) {
+// CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
+// requestHeader.setTopic(topicConfig.getTopicName());
+// requestHeader.setDefaultTopic(defaultTopic);
+// requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
+// requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
+// requestHeader.setPerm(topicConfig.getPerm());
+// requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
+// requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
+// requestHeader.setOrder(topicConfig.isOrder());
+//
+// RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
+//
+// RemotingCommand response = this.client.invokeSync(,
+// request, defaultTimeoutMills);
+// assert response != null;
+// switch (response.getCode()) {
+// case ResponseCode.SUCCESS: {
+// return;
+// }
+// default:
+// break;
+// }
+ return null;
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/topic/TopicConfigManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/topic/TopicConfigManager.java
new file mode 100644
index 0000000..b8b6eb7
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/topic/TopicConfigManager.java
@@ -0,0 +1,19 @@
+package org.apache.rocketmq.snode.topic;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class TopicConfigManager {
+}