You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:31 UTC
[rocketmq] 07/14: Add subscription, consumer offset,
sendback etc. management module
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 20da514fe758fb23ccc8d18cfa68c7cc348887d9
Author: duhenglucky <du...@gmail.com>
AuthorDate: Sat Dec 29 19:41:02 2018 +0800
Add subscription, consumer offset, sendback etc. management module
---
.../apache/rocketmq/broker/BrokerController.java | 2 +-
.../rocketmq/broker/longpolling/PullRequest.java | 19 ++
.../broker/longpolling/PullRequestHoldService.java | 45 ++--
.../broker/processor/SendMessageProcessor.java | 34 +--
.../processor/SnodePullMessageProcessor.java | 73 ++---
.../org/apache/rocketmq/client/ClientConfig.java | 2 +-
.../consumer/store/RemoteBrokerOffsetStore.java | 11 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 10 +-
.../impl/consumer/DefaultMQPullConsumerImpl.java | 2 +-
.../impl/consumer/DefaultMQPushConsumerImpl.java | 2 +-
.../client/impl/factory/MQClientInstance.java | 10 +-
.../impl/producer/DefaultMQProducerImpl.java | 2 +-
.../org/apache/rocketmq/common/BrokerConfig.java | 12 +
.../rocketmq/common/protocol/ResponseCode.java | 4 +
.../header/ConsumerSendMsgBackRequestHeader.java | 25 +-
.../protocol/header/GetMaxOffsetRequestHeader.java | 10 +
.../protocol/header/GetMinOffsetRequestHeader.java | 11 +
.../header/QueryConsumerOffsetRequestHeader.java | 10 +
.../protocol/header/SearchOffsetRequestHeader.java | 9 +
.../protocol/header/SendMessageRequestHeader.java | 29 +-
.../header/SendMessageRequestHeaderV2.java | 4 +-
.../header/UpdateConsumerOffsetRequestHeader.java | 10 +
.../rocketmq/example/quickstart/Consumer.java | 3 +-
.../namesrv/processor/DefaultRequestProcessor.java | 1 -
.../remoting/netty/NettyRemotingAbstract.java | 3 +-
.../transport/rocketmq/NettyRemotingClient.java | 1 +
.../org/apache/rocketmq/snode/SnodeController.java | 110 ++++++--
.../snode/client/ClientHousekeepingService.java | 2 +-
.../rocketmq/snode/client/ConsumerGroupInfo.java | 10 +-
.../rocketmq/snode/client/ConsumerManager.java | 23 +-
.../client/DefaultConsumerIdsChangeListener.java | 2 +-
.../rocketmq/snode/client/ProducerManager.java | 74 +++---
.../snode/client/SubscriptionGroupManager.java | 20 +-
.../apache/rocketmq/snode/config/SnodeConfig.java | 12 +
.../rocketmq/snode/constant/SnodeConstant.java | 2 +
.../SnodeException.java} | 29 +-
.../snode/offset/ConsumerOffsetManager.java | 243 +++++++++++++++++
.../snode/processor/ConsumerManageProcessor.java | 115 +++++++-
.../snode/processor/HearbeatProcessor.java | 64 ++++-
.../snode/processor/PullMessageProcessor.java | 56 +++-
.../snode/processor/SendMessageProcessor.java | 3 +-
.../{SnodeOuterService.java => EnodeService.java} | 33 ++-
.../rocketmq/snode/service/NnodeService.java | 47 ++++
.../snode/service/impl/EnodeServiceImpl.java | 295 +++++++++++++++++++++
.../snode/service/impl/NnodeServiceImpl.java | 208 +++++++++++++++
.../snode/service/impl/ScheduledServiceImpl.java | 51 +++-
.../snode/service/impl/SnodeOuterServiceImpl.java | 280 -------------------
47 files changed, 1503 insertions(+), 520 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9639f65..eff8fd4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -515,7 +515,7 @@ public class BrokerController {
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
- this.remotingServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, this.snodePullMessageProcessor,pullMessageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, this.snodePullMessageProcessor, pullMessageExecutor);
this.snodePullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* QueryMessageProcessor
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
index 045ab9b..e64b0e9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
@@ -29,6 +29,7 @@ public class PullRequest {
private final long pullFromThisOffset;
private final SubscriptionData subscriptionData;
private final MessageFilter messageFilter;
+ private final boolean snodeRequest;
public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp,
long pullFromThisOffset, SubscriptionData subscriptionData,
@@ -40,6 +41,20 @@ public class PullRequest {
this.pullFromThisOffset = pullFromThisOffset;
this.subscriptionData = subscriptionData;
this.messageFilter = messageFilter;
+ this.snodeRequest = false;
+ }
+
+ public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp,
+ long pullFromThisOffset, SubscriptionData subscriptionData,
+ MessageFilter messageFilter, boolean snodeRequest) {
+ this.requestCommand = requestCommand;
+ this.clientChannel = clientChannel;
+ this.timeoutMillis = timeoutMillis;
+ this.suspendTimestamp = suspendTimestamp;
+ this.pullFromThisOffset = pullFromThisOffset;
+ this.subscriptionData = subscriptionData;
+ this.messageFilter = messageFilter;
+ this.snodeRequest = snodeRequest;
}
public RemotingCommand getRequestCommand() {
@@ -69,4 +84,8 @@ public class PullRequest {
public MessageFilter getMessageFilter() {
return messageFilter;
}
+
+ public boolean isSnodeRequest() {
+ return snodeRequest;
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index af6addc..ee02017 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -127,34 +127,37 @@ public class PullRequestHoldService extends ServiceThread {
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
+ try {
+ if (newestOffset > request.getPullFromThisOffset()) {
+ boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
+ new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
+ // match by bit map, need eval again when properties is not null.
+ if (match && properties != null) {
+ match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
+ }
- if (newestOffset > request.getPullFromThisOffset()) {
- boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
- new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
- // match by bit map, need eval again when properties is not null.
- if (match && properties != null) {
- match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
- }
-
- if (match) {
- try {
- if (request.getMessageFilter() == null && request.getSubscriptionData() == null) {
- this.brokerController.getSnodePullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
- request.getRequestCommand());
- } else {
- this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
- request.getRequestCommand());
+ if (match) {
+ try {
+ if (request.isSnodeRequest()) {
+ this.brokerController.getSnodePullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
+ request.getRequestCommand());
+ } else {
+ this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
+ request.getRequestCommand());
+ }
+ } catch (Throwable e) {
+ log.error("execute request when wakeup failed.", e);
}
- } catch (Throwable e) {
- log.error("execute request when wakeup failed.", e);
+ continue;
}
- continue;
}
+ } catch (Exception ex) {
+ log.error("Error occurred:{}", ex);
}
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
- if (request.getMessageFilter() == null && request.getSubscriptionData() == null) {
+ if (request.isSnodeRequest()) {
this.brokerController.getSnodePullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} else {
@@ -166,7 +169,6 @@ public class PullRequestHoldService extends ServiceThread {
}
continue;
}
-
replayList.add(request);
}
@@ -177,3 +179,4 @@ public class PullRequestHoldService extends ServiceThread {
}
}
}
+
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index b7e7a61..5f1c2f1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -63,7 +63,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
- RemotingCommand request) throws RemotingCommandException {
+ RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
@@ -99,7 +99,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
- (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
+ (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
@@ -249,8 +249,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
- RemotingCommand request,
- MessageExt msg, TopicConfig topicConfig) {
+ RemotingCommand request,
+ MessageExt msg, TopicConfig topicConfig) {
String newTopic = requestHeader.getTopic();
if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
@@ -293,12 +293,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
- final RemotingCommand request,
- final SendMessageContext sendMessageContext,
- final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
+ final RemotingCommand request,
+ final SendMessageContext sendMessageContext,
+ final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
- final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
+ final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
response.setOpaque(request.getOpaque());
@@ -366,9 +366,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
- RemotingCommand request, MessageExt msg,
- SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
- int queueIdInt) {
+ RemotingCommand request, MessageExt msg,
+ SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
+ int queueIdInt) {
if (putMessageResult == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
@@ -448,7 +448,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
- int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
sendMessageContext.setCommercialSendTimes(incValue);
@@ -459,7 +459,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} else {
if (hasSendMessageHook()) {
int wroteSize = request.getBody().length;
- int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
sendMessageContext.setCommercialSendTimes(incValue);
@@ -471,12 +471,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
- final RemotingCommand request,
- final SendMessageContext sendMessageContext,
- final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
+ final RemotingCommand request,
+ final SendMessageContext sendMessageContext,
+ final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
- final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
+ final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
response.setOpaque(request.getOpaque());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
index 8beb6fa..788c498 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
@@ -24,7 +24,6 @@ import io.netty.channel.FileRegion;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
@@ -34,22 +33,18 @@ import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
-import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
-import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@@ -95,23 +90,50 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
response.setOpaque(request.getOpaque());
- log.info("receive PullMessage request command, {}", request);
final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
- final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
+
+ ConsumerFilterData consumerFilterData = null;
+ SubscriptionData subscriptionData;
+ try {
+ subscriptionData = FilterAPI.build(
+ requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
+ );
+ if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+ consumerFilterData = ConsumerFilterManager.build(
+ requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
+ requestHeader.getExpressionType(), requestHeader.getSubVersion()
+ );
+ assert consumerFilterData != null;
+ }
+ } catch (Exception e) {
+ log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
+ requestHeader.getConsumerGroup());
+ response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
+ response.setRemark("parse the consumer's subscription failed");
+ return response;
+ }
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
+ response.setRemark(String.format("The broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
+ MessageFilter messageFilter;
+ if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
+ messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
+ this.brokerController.getConsumerFilterManager());
+ } else {
+ messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
+ this.brokerController.getConsumerFilterManager());
+ }
+
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), null);
+ requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
- log.info("Get message response:{}",getMessageResult);
if (getMessageResult != null) {
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
@@ -136,19 +158,6 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
break;
}
-// if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
-// // consume too slow ,redirect to another machine
-// if (getMessageResult.isSuggestPullingFromSlave()) {
-// responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
-// }
-// // consume ok
-// else {
-// responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
-// }
-// } else {
-// responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
-// }
-
switch (getMessageResult.getStatus()) {
case FOUND:
response.setCode(ResponseCode.SUCCESS);
@@ -162,7 +171,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
- log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
+ log.info("The broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
requestHeader.getQueueOffset(),
getMessageResult.getNextBeginOffset(),
requestHeader.getTopic(),
@@ -182,7 +191,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
case OFFSET_OVERFLOW_BADLY:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
- log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
+ log.info("The request offset: {} over flow badly, broker max offset: {}, consumer: {}",
requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
break;
case OFFSET_OVERFLOW_ONE:
@@ -190,7 +199,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
break;
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
- log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
+ log.info("The request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
@@ -267,12 +276,12 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
public void operationComplete(ChannelFuture future) throws Exception {
getMessageResult.release();
if (!future.isSuccess()) {
- log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
+ log.error("Transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
}
}
});
} catch (Throwable e) {
- log.error("transfer many message by pagecache exception", e);
+ log.error("Transfer many message by pagecache exception", e);
getMessageResult.release();
}
@@ -291,7 +300,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
- this.brokerController.getMessageStore().now(), offset, null, null);
+ this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter, true);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
@@ -407,7 +416,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
}
public void executeRequestWhenWakeup(final Channel channel,
- final RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand request) {
Runnable run = new Runnable() {
@Override
public void run() {
@@ -422,7 +431,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
- log.error("processRequestWrapper response to {} failed",
+ log.error("ProcessRequestWrapper snode response to {} failed",
future.channel().remoteAddress(), future.cause());
log.error(request.toString());
log.error(response.toString());
@@ -430,7 +439,7 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
}
});
} catch (Throwable e) {
- log.error("processRequestWrapper process request over, but response failed", e);
+ log.error("ProcessRequestWrapper snode process request over, but response failed", e);
log.error(request.toString());
log.error(response.toString());
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 562810f..48fb934 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -45,7 +45,7 @@ public class ClientConfig {
private int persistConsumerOffsetInterval = 1000 * 5;
private boolean unitMode = false;
private String unitName;
- private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
+ private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
private boolean useTLS = TlsSystemConfig.tlsEnable;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index c1524e1..f3e2f4e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.consumer.store;
+import com.sun.org.apache.regexp.internal.RE;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -187,8 +188,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
/**
- * Update the Consumer Offset in one way, once the Master is off, updated to Slave,
- * here need to be optimized.
+ * Update the Consumer Offset in one way, once the Master is off, updated to Slave, here need to be optimized.
*/
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
@@ -196,8 +196,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
}
/**
- * Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
- * here need to be optimized.
+ * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
*/
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
@@ -215,7 +214,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setCommitOffset(offset);
-
+ requestHeader.setEnodeName(mq.getBrokerName());
if (isOneway) {
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
@@ -242,7 +241,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
-
+ requestHeader.setEnodeName(mq.getBrokerName());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 6302cd0..a7aead1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -353,8 +353,7 @@ public class MQClientAPIImpl {
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
- String addrS = "localhost:11911";//TODO FIXME
- RemotingCommand response = this.remotingClient.invokeSync(addrS, request, timeoutMillis);
+ RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}
@@ -566,7 +565,6 @@ public class MQClientAPIImpl {
) throws RemotingException, MQBrokerException, InterruptedException {
requestHeader.setEnodeAddr(addr);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SNODE_PULL_MESSAGE, requestHeader);
- addr = "localhost:11911"; //TODO FIXME
switch (communicationMode) {
case ONEWAY:
assert false;
@@ -649,7 +647,6 @@ public class MQClientAPIImpl {
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
- log.info("response header: {}", responseHeader.getSuggestWhichBrokerId());
return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}
@@ -734,8 +731,7 @@ public class MQClientAPIImpl {
GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
- String addrS = "localhost:11911";//TODO FIXME
- RemotingCommand response = this.remotingClient.invokeSync(addrS,
+ RemotingCommand response = this.remotingClient.invokeSync(addr,
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
@@ -940,6 +936,7 @@ public class MQClientAPIImpl {
}
public void consumerSendMessageBack(
+ final String brokerName,
final String addr,
final MessageExt msg,
final String consumerGroup,
@@ -956,6 +953,7 @@ public class MQClientAPIImpl {
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
+ requestHeader.setEnodeName(brokerName);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 420d89b..20a72d8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -495,7 +495,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
}
- this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000,
+ this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, brokerAddr, msg, consumerGroup, delayLevel, 3000,
this.defaultMQPullConsumer.getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 393ef92..2f3cc97 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -499,7 +499,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
- this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
+ this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerName, brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 984e2cc..4e9d9ba 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -537,8 +537,7 @@ public class MQClientInstance {
}
try {
- String addrS = "localhost:11911"; //TODO FIXME
- int version = this.mQClientAPIImpl.sendHearbeat(addrS, heartbeatData, 3000);
+ int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
@@ -548,7 +547,7 @@ public class MQClientInstance {
log.info(heartbeatData.toString());
}
} catch (Exception e) {
- log.error("send heart beat error:{}",e);
+ log.error("send heart beat error:{}", e);
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
} else {
@@ -722,9 +721,10 @@ public class MQClientInstance {
return false;
}
+
/**
- * This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
- * is recommended.
+ * This method will be removed in the version 5.0.0,because filterServer was removed,and method
+ * <code>subscribe(final String topic, final MessageSelector messageSelector)</code> is recommended.
*/
@Deprecated
private void uploadFilterClassToAllFilterServer(final String consumerGroup, final String fullClassName,
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 9ada834..291834f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -733,7 +733,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
- requestHeader.setEnodeAddr(brokerAddr);
+ requestHeader.setEnodeName(mq.getBrokerName());
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f81af21..de0b8e2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -171,6 +171,18 @@ public class BrokerConfig {
@ImportantField
private long transactionCheckInterval = 60 * 1000;
+
+ @ImportantField
+ private boolean transactionEnable = true;
+
+ public boolean isTransactionEnable() {
+ return transactionEnable;
+ }
+
+ public void setTransactionEnable(boolean transactionEnable) {
+ this.transactionEnable = transactionEnable;
+ }
+
public boolean isTraceOn() {
return traceOn;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index f62c4ea..97d433b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -73,4 +73,8 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int CONSUME_MSG_TIMEOUT = 207;
public static final int NO_MESSAGE = 208;
+
+ public static final int QUERY_OFFSET_ERROR = 210;
+
+ public static final int PARAMETER_ERROR = 211;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
index bd8fbb4..8ead5ef 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
@@ -35,6 +35,8 @@ public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
private boolean unitMode = false;
private Integer maxReconsumeTimes;
+ private String enodeName;
+
@Override
public void checkFields() throws RemotingCommandException {
@@ -96,9 +98,24 @@ public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
this.maxReconsumeTimes = maxReconsumeTimes;
}
- @Override
- public String toString() {
- return "ConsumerSendMsgBackRequestHeader [group=" + group + ", originTopic=" + originTopic + ", originMsgId=" + originMsgId
- + ", delayLevel=" + delayLevel + ", unitMode=" + unitMode + ", maxReconsumeTimes=" + maxReconsumeTimes + "]";
+ public String getEnodeName() {
+ return enodeName;
+ }
+
+ public void setEnodeName(String enodeName) {
+ this.enodeName = enodeName;
+ }
+
+ @Override public String toString() {
+ return "ConsumerSendMsgBackRequestHeader{" +
+ "offset=" + offset +
+ ", group='" + group + '\'' +
+ ", delayLevel=" + delayLevel +
+ ", originMsgId='" + originMsgId + '\'' +
+ ", originTopic='" + originTopic + '\'' +
+ ", unitMode=" + unitMode +
+ ", maxReconsumeTimes=" + maxReconsumeTimes +
+ ", enodeName='" + enodeName + '\'' +
+ '}';
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
index 871309d..1b5f951 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMaxOffsetRequestHeader.java
@@ -30,6 +30,8 @@ public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
@CFNotNull
private Integer queueId;
+ private String enodeName;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -49,4 +51,12 @@ public class GetMaxOffsetRequestHeader implements CommandCustomHeader {
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
+
+ public String getEnodeName() {
+ return enodeName;
+ }
+
+ public void setEnodeName(String enodeName) {
+ this.enodeName = enodeName;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
index 6fb8ed4..1ac771b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetRequestHeader.java
@@ -27,9 +27,12 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class GetMinOffsetRequestHeader implements CommandCustomHeader {
@CFNotNull
private String topic;
+
@CFNotNull
private Integer queueId;
+ private String enodeName;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -49,4 +52,12 @@ public class GetMinOffsetRequestHeader implements CommandCustomHeader {
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
+
+ public String getEnodeName() {
+ return enodeName;
+ }
+
+ public void setEnodeName(String enodeName) {
+ this.enodeName = enodeName;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
index 3b7f627..2034c97 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumerOffsetRequestHeader.java
@@ -32,6 +32,8 @@ public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader {
@CFNotNull
private Integer queueId;
+ private String enodeName;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -59,4 +61,12 @@ public class QueryConsumerOffsetRequestHeader implements CommandCustomHeader {
public void setQueueId(Integer queueId) {
this.queueId = queueId;
}
+
+ public String getEnodeName() {
+ return enodeName;
+ }
+
+ public void setEnodeName(String enodeName) {
+ this.enodeName = enodeName;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
index 5ea2e24..4db36b5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
@@ -32,6 +32,8 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader {
@CFNotNull
private Long timestamp;
+ private String enodeName;
+
@Override
public void checkFields() throws RemotingCommandException {
@@ -61,4 +63,11 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader {
this.timestamp = timestamp;
}
+ public String getEnodeName() {
+ return enodeName;
+ }
+
+ public void setEnodeName(String enodeName) {
+ this.enodeName = enodeName;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index a032911..bab833b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -53,7 +53,7 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
private Integer maxReconsumeTimes;
- private String enodeAddr;
+ private String enodeName;
@Override
public void checkFields() throws RemotingCommandException {
@@ -163,11 +163,30 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
this.batch = batch;
}
- public String getEnodeAddr() {
- return enodeAddr;
+ public String getEnodeName() {
+ return enodeName;
}
- public void setEnodeAddr(String enodeAddr) {
- this.enodeAddr = enodeAddr;
+ public void setEnodeName(String enodeName) {
+ this.enodeName = enodeName;
+ }
+
+ @Override public String toString() {
+ return "SendMessageRequestHeader{" +
+ "producerGroup='" + producerGroup + '\'' +
+ ", topic='" + topic + '\'' +
+ ", defaultTopic='" + defaultTopic + '\'' +
+ ", defaultTopicQueueNums=" + defaultTopicQueueNums +
+ ", queueId=" + queueId +
+ ", sysFlag=" + sysFlag +
+ ", bornTimestamp=" + bornTimestamp +
+ ", flag=" + flag +
+ ", properties='" + properties + '\'' +
+ ", reconsumeTimes=" + reconsumeTimes +
+ ", unitMode=" + unitMode +
+ ", batch=" + batch +
+ ", maxReconsumeTimes=" + maxReconsumeTimes +
+ ", enodeName='" + enodeName + '\'' +
+ '}';
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
index 9602805..ed6babe 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeaderV2.java
@@ -71,7 +71,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v1.setUnitMode(v2.k);
v1.setMaxReconsumeTimes(v2.l);
v1.setBatch(v2.m);
- v1.setEnodeAddr(v2.n);
+ v1.setEnodeName(v2.n);
return v1;
}
@@ -90,7 +90,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
v2.k = v1.isUnitMode();
v2.l = v1.getMaxReconsumeTimes();
v2.m = v1.isBatch();
- v2.n = v1.getEnodeAddr();
+ v2.n = v1.getEnodeName();
return v2;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
index 3f44db6..dd4d3b4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UpdateConsumerOffsetRequestHeader.java
@@ -34,6 +34,8 @@ public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader {
@CFNotNull
private Long commitOffset;
+ private String enodeName;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -69,4 +71,12 @@ public class UpdateConsumerOffsetRequestHeader implements CommandCustomHeader {
public void setCommitOffset(Long commitOffset) {
this.commitOffset = commitOffset;
}
+
+ public String getEnodeName() {
+ return enodeName;
+ }
+
+ public void setEnodeName(String enodeName) {
+ this.enodeName = enodeName;
+ }
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
index 6d3b936..8d667a4 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java
@@ -35,7 +35,7 @@ public class Consumer {
/*
* Instantiate with specified consumer group name.
*/
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RocketMQ5");
/*
* Specify name server addresses.
@@ -58,7 +58,6 @@ public class Consumer {
* Subscribe one more more topics to consume.
*/
consumer.subscribe("TopicTest", "*");
-
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 3b12d49..0af8c98 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -379,7 +379,6 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand getBrokerClusterInfo(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
-
byte[] content = this.namesrvController.getRouteInfoManager().getAllClusterInfo();
response.setBody(content);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index cae2bf4..8d3f54b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -296,8 +296,7 @@ public abstract class NettyRemotingAbstract {
responseFuture.release();
}
} else {
- log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- log.warn(cmd.toString());
+ log.warn("receive response, but not matched any request: {}, cmd: {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
index 4e691f1..55a1d3d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
@@ -181,6 +181,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 50337df..cb8d662 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -26,6 +26,8 @@ import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.RemotingClientFactory;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RemotingServerFactory;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
@@ -37,14 +39,17 @@ import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.snode.client.ProducerManager;
import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
import org.apache.rocketmq.snode.processor.HearbeatProcessor;
import org.apache.rocketmq.snode.processor.PullMessageProcessor;
import org.apache.rocketmq.snode.processor.SendMessageProcessor;
+import org.apache.rocketmq.snode.service.EnodeService;
+import org.apache.rocketmq.snode.service.NnodeService;
import org.apache.rocketmq.snode.service.ScheduledService;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
+import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
+import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
-import org.apache.rocketmq.snode.service.impl.SnodeOuterServiceImpl;
public class SnodeController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@@ -52,17 +57,25 @@ public class SnodeController {
private final SnodeConfig snodeConfig;
private final NettyServerConfig nettyServerConfig;
private final NettyClientConfig nettyClientConfig;
+ private RemotingClient remotingClient;
private RemotingServer snodeServer;
- private ExecutorService sendMessageExcutor;
+ private ExecutorService sendMessageExecutor;
private ExecutorService heartbeatExecutor;
- private ExecutorService pullMessageExcutor;
- private SnodeOuterService snodeOuterService;
- private ExecutorService consumerManagerExcutor;
+ private ExecutorService pullMessageExecutor;
+ private ExecutorService consumerManageExecutor;
+ private EnodeService enodeService;
+ private NnodeService nnodeService;
+ private ExecutorService consumerManagerExecutor;
private ScheduledService scheduledService;
private ProducerManager producerManager;
private ConsumerManager consumerManager;
private ClientHousekeepingService clientHousekeepingService;
private SubscriptionGroupManager subscriptionGroupManager;
+ private ConsumerOffsetManager consumerOffsetManager;
+ private ConsumerManageProcessor consumerManageProcessor;
+ private SendMessageProcessor sendMessageProcessor;
+ private PullMessageProcessor pullMessageProcessor;
+ private HearbeatProcessor hearbeatProcessor;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"SnodeControllerScheduledThread"));
@@ -73,9 +86,12 @@ public class SnodeController {
this.nettyClientConfig = nettyClientConfig;
this.nettyServerConfig = nettyServerConfig;
this.snodeConfig = snodeConfig;
- this.snodeOuterService = SnodeOuterServiceImpl.getInstance(this);
- this.scheduledService = new ScheduledServiceImpl(this.snodeOuterService, this.snodeConfig);
- this.sendMessageExcutor = ThreadUtils.newThreadPoolExecutor(
+ this.enodeService = new EnodeServiceImpl(this);
+ this.nnodeService = new NnodeServiceImpl(this);
+ this.scheduledService = new ScheduledServiceImpl(this);
+ this.remotingClient = RemotingClientFactory.createInstance().init(this.getNettyClientConfig(), null);
+
+ this.sendMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
@@ -84,7 +100,7 @@ public class SnodeController {
"SnodeSendMessageThread",
false);
- this.pullMessageExcutor = ThreadUtils.newThreadPoolExecutor(
+ this.pullMessageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
@@ -102,7 +118,7 @@ public class SnodeController {
"SnodeHeartbeatThread",
true);
- this.consumerManagerExcutor = ThreadUtils.newThreadPoolExecutor(
+ this.consumerManagerExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
3000,
@@ -111,8 +127,17 @@ public class SnodeController {
"SnodePullMessageThread",
false);
+ this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
+ snodeConfig.getSnodeSendMessageMinPoolSize(),
+ snodeConfig.getSnodeSendMessageMaxPoolSize(),
+ 3000,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(snodeConfig.getSnodeSendThreadPoolQueueCapacity()),
+ "ConsumerManagerThread",
+ false);
+
if (this.snodeConfig.getNamesrvAddr() != null) {
- this.snodeOuterService.updateNameServerAddressList(this.snodeConfig.getNamesrvAddr());
+ this.nnodeService.updateNnodeAddressList(this.snodeConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.snodeConfig.getNamesrvAddr());
}
@@ -122,6 +147,11 @@ public class SnodeController {
this.consumerManager = new ConsumerManager(consumerIdsChangeListener);
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager);
+ this.consumerOffsetManager = new ConsumerOffsetManager(this);
+ this.consumerManageProcessor = new ConsumerManageProcessor(this);
+ this.sendMessageProcessor = new SendMessageProcessor(this);
+ this.hearbeatProcessor = new HearbeatProcessor(this);
+ this.pullMessageProcessor = new PullMessageProcessor(this);
}
public SnodeConfig getSnodeConfig() {
@@ -135,26 +165,34 @@ public class SnodeController {
}
public void registerProcessor() {
- snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, new SendMessageProcessor(this), sendMessageExcutor);
- snodeServer.registerProcessor(RequestCode.HEART_BEAT, new HearbeatProcessor(this), heartbeatExecutor);
- snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, new PullMessageProcessor(this), pullMessageExcutor);
- snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, new ConsumerManageProcessor(this), consumerManagerExcutor);
+ this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, hearbeatProcessor, this.heartbeatExecutor);
+ this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, hearbeatProcessor, this.heartbeatExecutor);
+ this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor);
}
public void start() {
initialize();
this.snodeServer.start();
- this.snodeOuterService.start();
+ this.remotingClient.start();
this.scheduledService.startScheduleTask();
this.clientHousekeepingService.start(this.snodeConfig.getHouseKeepingInterval());
}
public void shutdown() {
- this.sendMessageExcutor.shutdown();
- this.pullMessageExcutor.shutdown();
+ this.sendMessageExecutor.shutdown();
+ this.pullMessageExecutor.shutdown();
this.heartbeatExecutor.shutdown();
+ this.consumerManagerExecutor.shutdown();
this.scheduledExecutorService.shutdown();
- this.snodeOuterService.shutdown();
+ this.remotingClient.shutdown();
this.scheduledService.shutdown();
this.clientHousekeepingService.shutdown();
}
@@ -195,11 +233,35 @@ public class SnodeController {
return nettyClientConfig;
}
- public SnodeOuterService getSnodeOuterService() {
- return snodeOuterService;
+ public EnodeService getEnodeService() {
+ return enodeService;
+ }
+
+ public void setEnodeService(EnodeService enodeService) {
+ this.enodeService = enodeService;
+ }
+
+ public NnodeService getNnodeService() {
+ return nnodeService;
+ }
+
+ public void setNnodeService(NnodeService nnodeService) {
+ this.nnodeService = nnodeService;
+ }
+
+ public RemotingClient getRemotingClient() {
+ return remotingClient;
+ }
+
+ public void setRemotingClient(RemotingClient remotingClient) {
+ this.remotingClient = remotingClient;
+ }
+
+ public ConsumerOffsetManager getConsumerOffsetManager() {
+ return consumerOffsetManager;
}
- public void setSnodeOuterService(SnodeOuterService snodeOuterService) {
- this.snodeOuterService = snodeOuterService;
+ public void setConsumerOffsetManager(ConsumerOffsetManager consumerOffsetManager) {
+ this.consumerOffsetManager = consumerOffsetManager;
}
}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
index e71ea0a..02598b9 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java
@@ -54,7 +54,7 @@ public class ClientHousekeepingService implements ChannelEventListener {
private void scanExceptionChannel() {
this.producerManager.scanNotActiveChannel();
- //this.consumerManager.scanNotActiveChannel();
+ this.consumerManager.scanNotActiveChannel();
}
public void shutdown() {
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
index 9b366a5..89b02fd 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerGroupInfo.java
@@ -36,9 +36,9 @@ public class ConsumerGroupInfo {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final String groupName;
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
- new ConcurrentHashMap<String, SubscriptionData>();
+ new ConcurrentHashMap<>();
private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
- new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
+ new ConcurrentHashMap<>(16);
private volatile ConsumeType consumeType;
private volatile MessageModel messageModel;
private volatile ConsumeFromWhere consumeFromWhere;
@@ -124,7 +124,7 @@ public class ConsumerGroupInfo {
if (null == infoOld) {
ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
if (null == prev) {
- log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
+ log.info("New consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
messageModel, infoNew.toString());
updated = true;
}
@@ -155,13 +155,13 @@ public class ConsumerGroupInfo {
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
updated = true;
- log.info("subscription changed, add new topic, group: {} {}",
+ log.info("Subscription changed, add new topic, group: {} {}",
this.groupName,
sub.toString());
}
} else if (sub.getSubVersion() > old.getSubVersion()) {
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
- log.info("subscription changed, group: {} OLD: {} NEW: {}",
+ log.info("Subscription changed, group: {} OLD: {} NEW: {}",
this.groupName,
old.toString(),
sub.toString()
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
index 8d3b665..a0bab83 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ConsumerManager.java
@@ -37,20 +37,18 @@ public class ConsumerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
- new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
+ new ConcurrentHashMap<>(1024);
private final ConsumerIdsChangeListener consumerIdsChangeListener;
public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
this.consumerIdsChangeListener = consumerIdsChangeListener;
}
- public ClientChannelInfo findChannel(final String group, final String clientId) {
- ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
- if (consumerGroupInfo != null) {
- return consumerGroupInfo.findChannel(clientId);
- }
- return null;
- }
+ /**
+ * public ClientChannelInfo findChannel(final String group, final String clientId) { ConsumerGroupInfo
+ * consumerGroupInfo = this.consumerTable.get(group); if (consumerGroupInfo != null) { return
+ * consumerGroupInfo.findChannel(clientId); } return null; }
+ **/
public SubscriptionData findSubscriptionData(final String group, final String topic) {
ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
@@ -84,12 +82,11 @@ public class ConsumerManager {
if (info.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
if (remove != null) {
- log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
+ log.info("Unregister consumer ok, no any connection, and remove consumer group, {}",
next.getKey());
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());
}
}
-
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
}
}
@@ -130,7 +127,7 @@ public class ConsumerManager {
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
- log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
+ log.info("Unregister consumer ok, no any connection, and remove consumer group, {}", group);
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
}
@@ -157,7 +154,7 @@ public class ConsumerManager {
long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
log.warn(
- "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
+ "SCAN: Remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
RemotingUtil.closeChannel(clientChannelInfo.getChannel());
itChannel.remove();
@@ -166,7 +163,7 @@ public class ConsumerManager {
if (channelInfoTable.isEmpty()) {
log.warn(
- "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
+ "SCAN: Remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
group);
it.remove();
}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java
index cb7c164..1f46c95 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/DefaultConsumerIdsChangeListener.java
@@ -43,7 +43,7 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen
List<Channel> channels = (List<Channel>) args[0];
if (channels != null && snodeController.getSnodeConfig().isNotifyConsumerIdsChangedEnable()) {
for (Channel chl : channels) {
- this.snodeController.getSnodeOuterService().notifyConsumerIdsChanged(chl, group);
+ this.snodeController.getEnodeService().notifyConsumerIdsChanged(chl, group);
}
}
break;
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
index b80c027..4513c7d 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/ProducerManager.java
@@ -36,11 +36,11 @@ public class ProducerManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
- private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
private final Lock groupChannelLock = new ReentrantLock();
private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
- new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+ new HashMap<>();
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
+
public ProducerManager() {
}
@@ -144,7 +144,7 @@ public class ProducerManager {
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
- log.info("new producer connected, group: {} channel: {}", group,
+ log.info("New producer connected, group: {} channel: {}", group,
clientChannelInfo.toString());
}
} finally {
@@ -158,7 +158,7 @@ public class ProducerManager {
log.warn("ProducerManager registerProducer lock timeout");
}
} catch (InterruptedException e) {
- log.error("", e);
+ log.error("Register Producer error: {}", e);
}
}
@@ -170,13 +170,13 @@ public class ProducerManager {
if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
if (old != null) {
- log.info("unregister a producer[{}] from groupChannelTable {}", group,
+ log.info("Unregister a producer[{}] from groupChannelTable {}", group,
clientChannelInfo.toString());
}
if (channelTable.isEmpty()) {
this.groupChannelTable.remove(group);
- log.info("unregister a producer group[{}] from groupChannelTable", group);
+ log.info("Unregister a producer group[{}] from groupChannelTable", group);
}
}
} finally {
@@ -190,35 +190,35 @@ public class ProducerManager {
}
}
- public Channel getAvaliableChannel(String groupId) {
- HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
- List<Channel> channelList = new ArrayList<Channel>();
- if (channelClientChannelInfoHashMap != null) {
- for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
- channelList.add(channel);
- }
- int size = channelList.size();
- if (0 == size) {
- log.warn("Channel list is empty. groupId={}", groupId);
- return null;
- }
-
- int index = positiveAtomicCounter.incrementAndGet() % size;
- Channel channel = channelList.get(index);
- int count = 0;
- boolean isOk = channel.isActive() && channel.isWritable();
- while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
- if (isOk) {
- return channel;
- }
- index = (++index) % size;
- channel = channelList.get(index);
- isOk = channel.isActive() && channel.isWritable();
- }
- } else {
- log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
- return null;
- }
- return null;
- }
+// public Channel getAvaliableChannel(String groupId) {
+// HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
+// List<Channel> channelList = new ArrayList<Channel>();
+// if (channelClientChannelInfoHashMap != null) {
+// for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
+// channelList.add(channel);
+// }
+// int size = channelList.size();
+// if (0 == size) {
+// log.warn("Channel list is empty. groupId={}", groupId);
+// return null;
+// }
+//
+// int index = positiveAtomicCounter.incrementAndGet() % size;
+// Channel channel = channelList.get(index);
+// int count = 0;
+// boolean isOk = channel.isActive() && channel.isWritable();
+// while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
+// if (isOk) {
+// return channel;
+// }
+// index = (++index) % size;
+// channel = channelList.get(index);
+// isOk = channel.isActive() && channel.isWritable();
+// }
+// } else {
+// log.warn("Check transaction failed, channel table is empty. groupId={}", groupId);
+// return null;
+// }
+// return null;
+// }
}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
index 3c6799e..8d83d7a 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
@@ -38,12 +38,6 @@ public class SubscriptionGroupManager extends ConfigManager {
private final DataVersion dataVersion = new DataVersion();
private transient SnodeController snodeController;
- public enum SUBSCRIPTION_EVENT {
- CREATE,
- UPDATE,
- DELETE
- }
-
public SubscriptionGroupManager() {
this.init();
}
@@ -104,14 +98,14 @@ public class SubscriptionGroupManager extends ConfigManager {
public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
if (old != null) {
- log.info("update subscription group config, old: {} new: {}", old, config);
+ log.info("Update subscription group config, old: {} new: {}", old, config);
} else {
- log.info("create new subscription group, {}", config);
+ log.info("Create new subscription group, {}", config);
}
this.dataVersion.nextVersion();
- this.persistToEnode(SUBSCRIPTION_EVENT.UPDATE, config);
+ this.persistToEnode(config);
}
public void disableConsume(final String groupName) {
@@ -133,7 +127,7 @@ public class SubscriptionGroupManager extends ConfigManager {
log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
}
this.dataVersion.nextVersion();
- this.persistToEnode(SUBSCRIPTION_EVENT.CREATE, subscriptionGroupConfig);
+ this.persistToEnode(subscriptionGroupConfig);
}
}
@@ -188,13 +182,13 @@ public class SubscriptionGroupManager extends ConfigManager {
if (old != null) {
log.info("delete subscription group OK, subscription group:{}", old);
this.dataVersion.nextVersion();
- this.persistToEnode(SUBSCRIPTION_EVENT.DELETE, old);
+ this.persistToEnode(old);
} else {
log.warn("delete subscription group failed, subscription groupName: {} not exist", groupName);
}
}
- void persistToEnode(SUBSCRIPTION_EVENT event, SubscriptionGroupConfig config) {
-
+ void persistToEnode(SubscriptionGroupConfig config) {
+ this.snodeController.getEnodeService().persistSubscriptionGroupConfig(config);
}
}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
index 725cf6a..6ef55b1 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
@@ -21,6 +21,8 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
+
public class SnodeConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
@@ -60,6 +62,8 @@ public class SnodeConfig {
private int listenPort = 11911;
+ private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
+
public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) {
this.snodeHeartBeatInterval = snodeHeartBeatInterval;
}
@@ -220,4 +224,12 @@ public class SnodeConfig {
public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) {
this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup;
}
+
+ public boolean isVipChannelEnabled() {
+ return vipChannelEnabled;
+ }
+
+ public void setVipChannelEnabled(boolean vipChannelEnabled) {
+ this.vipChannelEnabled = vipChannelEnabled;
+ }
}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
index 2ba91b2..1f5c7dd 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
@@ -22,4 +22,6 @@ public class SnodeConstant {
public static final long defaultTimeoutMills = 3000L;
+ public static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
+
}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SendTransferService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/exception/SnodeException.java
similarity index 50%
rename from rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SendTransferService.java
rename to rocketmq-snode/src/main/java/org/apache/rocketmq/snode/exception/SnodeException.java
index 6dad57c..e9bd114 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SendTransferService.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/exception/SnodeException.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.snode.service;/*
+package org.apache.rocketmq.snode.exception;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,12 +15,27 @@ package org.apache.rocketmq.snode.service;/*
* limitations under the License.
*/
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.help.FAQUrl;
-public interface SendTransferService {
- RemotingCommand sendMessage(RemotingCommand request);
+public class SnodeException extends RuntimeException {
+ private static final long serialVersionUID = 5975020272601250368L;
- boolean start();
+ private final int responseCode;
+ private final String errorMessage;
- void shutdown();
-}
+ public SnodeException(int responseCode, String errorMessage) {
+ super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
+ + errorMessage));
+ this.responseCode = responseCode;
+ this.errorMessage = errorMessage;
+ }
+
+ public int getResponseCode() {
+ return responseCode;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
new file mode 100644
index 0000000..c177ccf
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.offset;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.exception.SnodeException;
+
+public class ConsumerOffsetManager {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final String TOPIC_GROUP_SEPARATOR = "@";
+
+ private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
+ new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
+
+ private transient SnodeController snodeController;
+
+ public ConsumerOffsetManager() {
+ }
+
+ public ConsumerOffsetManager(SnodeController brokerController) {
+ this.snodeController = brokerController;
+ }
+
+ public void scanUnsubscribedTopic(String enodeName) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
+ Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
+ String topicAtGroup = next.getKey();
+ String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+ if (arrays.length == 2) {
+ String topic = arrays[0];
+ String group = arrays[1];
+
+ if (null == snodeController.getConsumerManager().findSubscriptionData(group, topic)
+ && this.offsetBehindMuchThanData(enodeName, topic, next.getValue())) {
+ it.remove();
+ log.warn("Remove topic offset, {}", topicAtGroup);
+ }
+ }
+ }
+ }
+
+ private String buildKey(final String enodeName, final String topic, final String consumerGroup) {
+ if (enodeName == null || topic == null || consumerGroup == null) {
+ log.warn("Build key parameter error enodeName: {}, topic: {} consumerGroup:{}",
+ enodeName, topic, consumerGroup);
+ throw new SnodeException(ResponseCode.PARAMETER_ERROR, "Build key parameter error!");
+ }
+ StringBuilder sb = new StringBuilder(50);
+ sb.append(enodeName).append(TOPIC_GROUP_SEPARATOR).append(topic).append(TOPIC_GROUP_SEPARATOR).append(consumerGroup);
+ return sb.toString();
+ }
+
+ private boolean offsetBehindMuchThanData(final String enodeName, final String topic,
+ ConcurrentMap<Integer, Long> table) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
+ Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
+ boolean result = !table.isEmpty();
+
+ while (it.hasNext() && result) {
+ Entry<Integer, Long> next = it.next();
+ RemotingCommand remotingCommand = this.snodeController.getEnodeService().getMinOffsetInQueue(enodeName, topic, next.getKey());
+ long minOffsetInStore = 0;
+ if (remotingCommand != null) {
+ switch (remotingCommand.getCode()) {
+ case ResponseCode.SUCCESS: {
+ GetMinOffsetResponseHeader responseHeader =
+ (GetMinOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
+ minOffsetInStore = responseHeader.getOffset();
+ }
+ default:
+ break;
+ }
+ } else {
+ throw new SnodeException(ResponseCode.QUERY_OFFSET_ERROR, "Query min offset error!");
+ }
+ long offsetInPersist = next.getValue();
+ result = offsetInPersist <= minOffsetInStore;
+ }
+ return result;
+ }
+
+ public Set<String> whichTopicByConsumer(final String group) {
+ Set<String> topics = new HashSet<String>();
+
+ Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
+ String topicAtGroup = next.getKey();
+ String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+ if (arrays.length == 2) {
+ if (group.equals(arrays[1])) {
+ topics.add(arrays[0]);
+ }
+ }
+ }
+
+ return topics;
+ }
+
+ public Set<String> whichGroupByTopic(final String topic) {
+ Set<String> groups = new HashSet<String>();
+ Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
+ String topicAtGroup = next.getKey();
+ String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+ if (arrays.length == 2) {
+ if (topic.equals(arrays[0])) {
+ groups.add(arrays[1]);
+ }
+ }
+ }
+
+ return groups;
+ }
+
+ public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic,
+ final int queueId,
+ final long offset) {
+ // topic@group
+ String key = buildKey(enodeName, topic, group);
+ this.commitOffset(clientHost, key, queueId, offset);
+ }
+
+ private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
+ ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
+ if (null == map) {
+ map = new ConcurrentHashMap<>(32);
+ map.put(queueId, offset);
+ this.offsetTable.put(key, map);
+ } else {
+ Long storeOffset = map.put(queueId, offset);
+ if (storeOffset != null && offset < storeOffset) {
+ log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
+ }
+ }
+ }
+
+ public long queryOffset(final String enodeName, final String group, final String topic, final int queueId) {
+ String key = buildKey(enodeName, topic, group);
+ ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
+ if (null != map) {
+ Long offset = map.get(queueId);
+ if (offset != null)
+ return offset;
+ }
+
+ return -1;
+ }
+
+ public String encode() {
+ return this.encode(false);
+ }
+
+ public void decode(String jsonString) {
+ if (jsonString != null) {
+ ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
+ if (obj != null) {
+ this.offsetTable = obj.offsetTable;
+ }
+ }
+ }
+
+ public String encode(final boolean prettyFormat) {
+ return RemotingSerializable.toJson(this, prettyFormat);
+ }
+
+ public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() {
+ return offsetTable;
+ }
+
+ public void setOffsetTable(ConcurrentHashMap<String, ConcurrentMap<Integer, Long>> offsetTable) {
+ this.offsetTable = offsetTable;
+ }
+
+
+ public Map<Integer, Long> queryOffset(final String enodeName, final String group, final String topic) {
+ // topic@group
+ String key = buildKey(enodeName, topic, group);
+ return this.offsetTable.get(key);
+ }
+
+ public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
+ ConcurrentMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
+ if (offsets != null) {
+ this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets));
+ }
+ }
+
+ public void persist() {
+ for (Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
+ ConcurrentHashMap<Integer, Long> map = (ConcurrentHashMap<Integer, Long>) offSetEntry.getValue();
+ String key = offSetEntry.getKey();
+ String[] keys = key.split(TOPIC_GROUP_SEPARATOR);
+ if (keys.length == 3) {
+ String enodeName = keys[0];
+ String topic = keys[1];
+ String consumerGroup = keys[2];
+ for (Entry<Integer, Long> queueEntry : map.entrySet()) {
+ Integer queueId = queueEntry.getKey();
+ Long offset = queueEntry.getValue();
+ this.snodeController.getEnodeService().persistOffsetToEnode(enodeName, consumerGroup, topic, queueId, offset);
+ }
+ } else {
+ log.error("Persist offset split keys error:{}", key);
+ }
+ }
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
index aec6f04..3e53795 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -24,10 +24,20 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
@@ -38,16 +48,27 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
private final SnodeController snodeController;
- public ConsumerManageProcessor(final SnodeController snodeController) {
- this.snodeController = snodeController;
+ public ConsumerManageProcessor(final SnodeController brokerController) {
+ this.snodeController = brokerController;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
+ case RequestCode.UPDATE_CONSUMER_OFFSET:
+ return this.updateConsumerOffset(ctx, request);
+ case RequestCode.QUERY_CONSUMER_OFFSET:
+ return this.queryConsumerOffset(ctx, request);
+ case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
+ return searchOffsetByTimestamp(ctx, request);
+ case RequestCode.GET_MAX_OFFSET:
+ return getMaxOffset(ctx, request);
+ case RequestCode.GET_MIN_OFFSET:
+ return getMinOffset(ctx, request);
default:
break;
}
@@ -59,6 +80,45 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
return false;
}
+ public RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ final SearchOffsetRequestHeader requestHeader =
+ (SearchOffsetRequestHeader) request
+ .decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
+ try {
+ return this.snodeController.getEnodeService().getOffsetByTimestamp(requestHeader.getEnodeName(), request);
+ } catch (Exception ex) {
+ log.error("Search offset by timestamp error:{}", ex);
+ }
+ return null;
+ }
+
+ public RemotingCommand getMinOffset(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ final GetMinOffsetRequestHeader requestHeader =
+ (GetMinOffsetRequestHeader) request
+ .decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
+ try {
+ return this.snodeController.getEnodeService().getMinOffsetInQueue(requestHeader.getEnodeName(), requestHeader.getTopic(), requestHeader.getQueueId());
+ } catch (Exception ex) {
+ log.error("Get min offset error:{}", ex);
+ }
+ return null;
+ }
+
+ public RemotingCommand getMaxOffset(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ final GetMaxOffsetRequestHeader requestHeader =
+ (GetMaxOffsetRequestHeader) request
+ .decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
+ try {
+ return this.snodeController.getEnodeService().getMaxOffsetInQueue(requestHeader.getEnodeName(), request);
+ } catch (Exception ex) {
+ log.error("Get min offset error:{}", ex);
+ }
+ return null;
+ }
+
public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
@@ -80,7 +140,7 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
response.setRemark(null);
return response;
} else {
- log.warn("Get all client failed, {} {}", requestHeader.getConsumerGroup(),
+ log.warn("GetAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
} else {
@@ -89,8 +149,53 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
}
response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());
+ response.setRemark("No consumer for this group, " + requestHeader.getConsumerGroup());
return response;
}
+ private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
+ throws RemotingCommandException {
+ final RemotingCommand response =
+ RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
+ final UpdateConsumerOffsetRequestHeader requestHeader =
+ (UpdateConsumerOffsetRequestHeader) request
+ .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
+ this.snodeController.getConsumerOffsetManager().commitOffset(requestHeader.getEnodeName(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
+ requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
+ throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
+ final RemotingCommand response =
+ RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
+ final QueryConsumerOffsetResponseHeader responseHeader =
+ (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
+ final QueryConsumerOffsetRequestHeader requestHeader =
+ (QueryConsumerOffsetRequestHeader) request
+ .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
+
+ long offset =
+ this.snodeController.getConsumerOffsetManager().queryOffset(requestHeader.getEnodeName(),
+ requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
+
+ if (offset < 0) {
+ log.info("Load offset from enode server, enodeName: {}, consumer group: {}, topic: {}, queueId: {}",
+ requestHeader.getEnodeName(),
+ requestHeader.getConsumerGroup(),
+ requestHeader.getTopic(),
+ requestHeader.getQueueId());
+ return this.snodeController.getEnodeService().loadOffset(requestHeader.getEnodeName(), requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+ requestHeader.getQueueId());
+ } else {
+ responseHeader.setOffset(offset);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+ }
}
+
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
index c26ed7c..a0af26a 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
@@ -15,12 +15,20 @@
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor;
+
import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -38,7 +46,19 @@ public class HearbeatProcessor implements NettyRequestProcessor {
}
@Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+ switch (request.getCode()) {
+ case RequestCode.HEART_BEAT:
+ return heartbeat(ctx, request);
+ case RequestCode.UNREGISTER_CLIENT:
+ return unregister(ctx, request);
+ default:
+ break;
+ }
+ return null;
+ }
+
+ private RemotingCommand heartbeat(ChannelHandlerContext ctx, RemotingCommand request) {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
@@ -55,11 +75,15 @@ public class HearbeatProcessor implements NettyRequestProcessor {
}
if (heartbeatData.getConsumerDataSet() != null) {
+ log.info("ConsumerDataSet: {}", heartbeatData.getConsumerDataSet());
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
SubscriptionGroupConfig subscriptionGroupConfig =
this.snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
+ if (null != subscriptionGroupConfig) {
+ isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
+ }
boolean changed = this.snodeController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
@@ -79,6 +103,44 @@ public class HearbeatProcessor implements NettyRequestProcessor {
}
}
RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand unregister(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+ final RemotingCommand response =
+ RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
+ final UnregisterClientRequestHeader requestHeader =
+ (UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
+
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+ ctx.channel(),
+ requestHeader.getClientID(),
+ request.getLanguage(),
+ request.getVersion());
+ {
+ final String group = requestHeader.getProducerGroup();
+ if (group != null) {
+ this.snodeController.getProducerManager().unregisterProducer(group, clientChannelInfo);
+ }
+ }
+
+ {
+ final String group = requestHeader.getConsumerGroup();
+ if (group != null) {
+ SubscriptionGroupConfig subscriptionGroupConfig =
+ this.snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
+ boolean isNotifyConsumerIdsChangedEnable = true;
+ if (null != subscriptionGroupConfig) {
+ isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
+ }
+ this.snodeController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);
+ }
+ }
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
return response;
}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
index 6e474bc..a636f87 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -18,11 +18,19 @@ package org.apache.rocketmq.snode.processor;/*
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
public class PullMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@@ -34,8 +42,52 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
@Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
- CompletableFuture<RemotingCommand> responseFuture = snodeController.getSnodeOuterService().pullMessage(ctx, request);
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+ RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+
+ final PullMessageRequestHeader requestHeader =
+ (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+
+ ConsumerGroupInfo consumerGroupInfo = snodeController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
+
+ SubscriptionGroupConfig subscriptionGroupConfig =
+ this.snodeController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
+ if (null == subscriptionGroupConfig) {
+ response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+ response.setRemark(String.format("Subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
+ return response;
+ }
+
+ if (!subscriptionGroupConfig.isConsumeEnable()) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark("Subscription group no permission, " + requestHeader.getConsumerGroup());
+ return response;
+ }
+
+ if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
+ && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
+ response.setCode(ResponseCode.NO_PERMISSION);
+ response.setRemark("The consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
+ return response;
+ }
+
+ SubscriptionData subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
+ if (null == subscriptionData) {
+ log.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
+ response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
+ response.setRemark("The consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
+ return response;
+ }
+
+ if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
+ log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
+ subscriptionData.getSubString());
+ response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
+ response.setRemark("The consumer's subscription not latest");
+ return response;
+ }
+
+ CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(ctx, request);
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
this.snodeController.getSnodeServer().sendResponse(ctx, data);
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index 3f52ed3..15e4294 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -35,7 +35,8 @@ public class SendMessageProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
- CompletableFuture<RemotingCommand> responseFuture = snodeController.getSnodeOuterService().sendMessage(request);
+ log.info("-----Receive sendback request: {}", request);
+ CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(request);
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
snodeController.getSnodeServer().sendResponse(ctx, data);
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
similarity index 56%
rename from rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
rename to rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
index a1ffdd8..cf7c1e9 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
@@ -17,17 +17,17 @@ package org.apache.rocketmq.snode.service;/*
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.concurrent.CompleteFuture;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.config.SnodeConfig;
-public interface SnodeOuterService {
+public interface EnodeService {
void sendHearbeat(RemotingCommand remotingCommand);
CompletableFuture<RemotingCommand> sendMessage(final RemotingCommand request);
@@ -35,22 +35,29 @@ public interface SnodeOuterService {
CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
final RemotingCommand remotingCommand);
- void saveSubscriptionData(RemotingCommand remotingCommand);
+ void notifyConsumerIdsChanged(final Channel channel, final String consumerGroup);
- void start();
+ RemotingCommand creatTopic(String enodeName, TopicConfig topicConfig)throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException ;
- void shutdown();
+ void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, MQBrokerException;
- void registerSnode(SnodeConfig snodeConfig);
+ boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig);
- void updateNameServerAddressList(final String addrs);
+ void persistOffsetToEnode(String enodeName, String groupName, String topic, int queueId, long offset);
- String fetchNameServerAddr();
+ RemotingCommand loadOffset(String enodeName, String consumerGroup, String topic,
+ int queueId) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException;
- void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException, MQBrokerException;
+ RemotingCommand getMaxOffsetInQueue(String enodeName,
+ RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, RemotingCommandException;
- void notifyConsumerIdsChanged(final Channel channel, final String consumerGroup);
+ RemotingCommand getMinOffsetInQueue(String enodeName, String topic,
+ int queueId) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, RemotingCommandException;
- RemotingCommand creatTopic(TopicConfig topicConfig);
+ RemotingCommand getOffsetByTimestamp(String enodeName,
+ RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java
new file mode 100644
index 0000000..21bc6ed
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java
@@ -0,0 +1,47 @@
+package org.apache.rocketmq.snode.service;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.Set;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+
+public interface NnodeService {
+ void registerSnode(SnodeConfig snodeConfig);
+
+ void updateNnodeAddressList(final String addrs);
+
+ String fetchNnodeAdress();
+
+ void updateTopicRouteDataByTopic();
+
+ Set<String> getEnodeClusterInfo(String clusterName);
+
+ ClusterInfo updateEnodeClusterInfo() throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException;
+
+ String getAddressByEnodeName(String brokerName,
+ boolean isUseSlave) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException;
+
+ TopicRouteData getTopicRouteDataByTopic(String topic,
+ boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
new file mode 100644
index 0000000..d647f47
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
@@ -0,0 +1,295 @@
+package org.apache.rocketmq.snode.service.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.constant.SnodeConstant;
+import org.apache.rocketmq.snode.service.EnodeService;
+
+public class EnodeServiceImpl implements EnodeService {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+ private SnodeController snodeController;
+
+ private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> enodeTable =
+ new ConcurrentHashMap<>();
+
+ public EnodeServiceImpl(SnodeController snodeController) {
+ this.snodeController = snodeController;
+ }
+
+ @Override
+ public void sendHearbeat(RemotingCommand remotingCommand) {
+ for (Map.Entry<String, HashMap<Long, String>> entry : enodeTable.entrySet()) {
+ String enodeAddr = entry.getValue().get(MixAll.MASTER_ID);
+ if (enodeAddr != null) {
+ try {
+ this.snodeController.getRemotingClient().invokeSync(enodeAddr, remotingCommand, SnodeConstant.defaultTimeoutMills);
+ } catch (Exception ex) {
+ log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
+ RemotingCommand request) {
+
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ try {
+ final PullMessageRequestHeader requestHeader =
+ (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+ this.snodeController.getRemotingClient().invokeAsync(requestHeader.getEnodeAddr(), request, SnodeConstant.CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (response != null) {
+ future.complete(response);
+ } else {
+ if (!responseFuture.isSendRequestOK()) {
+ log.error("Pull message error in async callback: {}", responseFuture.getCause());
+ } else if (responseFuture.isTimeout()) {
+ log.warn("Pull message timeout!");
+ } else {
+ log.error("Unknown pull message error occurred: {}", responseFuture.getCause());
+ }
+ }
+ }
+ });
+ } catch (Exception ex) {
+ log.error("pull message async error:", ex);
+ future.completeExceptionally(ex);
+ }
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<RemotingCommand> sendMessage(RemotingCommand request) {
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ try {
+ String enodeName = null;
+ if (request.getCode() == RequestCode.SEND_MESSAGE_V2) {
+ SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+ enodeName = sendMessageRequestHeaderV2.getN();
+ } else {
+ ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
+ enodeName = consumerSendMsgBackRequestHeader.getEnodeName();
+ }
+ String enodeAddress = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+ log.info("Receive request: {}", request);
+ this.snodeController.getRemotingClient().invokeAsync(enodeAddress, request, SnodeConstant.defaultTimeoutMills, (responseFuture) -> {
+ future.complete(responseFuture.getResponseCommand());
+ });
+ } catch (Exception ex) {
+ log.error("Send message async error:{}", ex);
+ future.completeExceptionally(ex);
+ }
+ return future;
+ }
+
+ @Override
+ public void notifyConsumerIdsChanged(
+ final Channel channel,
+ final String consumerGroup) {
+ if (null == consumerGroup) {
+ log.error("NotifyConsumerIdsChanged consumerGroup is null");
+ return;
+ }
+
+ NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ RemotingCommand request =
+ RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
+
+ try {
+ this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.oneWaytimeout);
+ } catch (Exception e) {
+ log.error("NotifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
+ }
+ }
+
+ private ClusterInfo getBrokerClusterInfo(
+ final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
+ RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return ClusterInfo.decode(response.getBody(), ClusterInfo.class);
+ }
+ default:
+ break;
+ }
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ @Override
+ public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ synchronized (this) {
+ ClusterInfo clusterInfo = getBrokerClusterInfo(SnodeConstant.defaultTimeoutMills);
+ if (clusterInfo != null) {
+ HashMap<String, Set<String>> enodeAddress = clusterInfo.getClusterAddrTable();
+ for (Map.Entry<String, Set<String>> entry : enodeAddress.entrySet()) {
+ Set<String> enodeNames = entry.getValue();
+ if (enodeNames != null) {
+ for (String enodeName : enodeNames) {
+ enodeTable.put(enodeName, clusterInfo.getBrokerAddrTable().get(enodeName).getBrokerAddrs());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig) {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
+ boolean persist = false;
+ for (Map.Entry<String, HashMap<Long, String>> entry : enodeTable.entrySet()) {
+ byte[] body = RemotingSerializable.encode(subscriptionGroupConfig);
+ request.setBody(body);
+ String enodeAddress = entry.getValue().get(MixAll.MASTER_ID);
+ try {
+ RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(enodeAddress,
+ request, SnodeConstant.defaultTimeoutMills);
+ if (response != null && response.getCode() == ResponseCode.SUCCESS) {
+ persist = true;
+ } else {
+ persist = false;
+ }
+ log.info("Persist to broker address: {} result: {}", enodeAddress, persist);
+ } catch (Exception ex) {
+ log.warn("Persist Subscription to Enode {} error", enodeAddress);
+ persist = false;
+ }
+ }
+ return persist;
+ }
+
+ @Override
+ public void persistOffsetToEnode(String enodeName, String groupName, String topic, int queueId, long offset) {
+ try {
+ String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+ UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setConsumerGroup(groupName);
+ requestHeader.setQueueId(queueId);
+ requestHeader.setCommitOffset(offset);
+ requestHeader.setEnodeName(enodeName);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
+ this.snodeController.getRemotingClient().invokeOneway(address, request, SnodeConstant.defaultTimeoutMills);
+ } catch (Exception ex) {
+ log.error("Persist offset to Enode error!");
+ }
+ }
+
+ @Override
+ public RemotingCommand getMinOffsetInQueue(String enodeName, String topic,
+ int queueId) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException {
+ GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
+ String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+ return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
+ request, SnodeConstant.defaultTimeoutMills);
+ }
+
+ @Override
+ public RemotingCommand loadOffset(String enodeName, String consumerGroup, String topic,
+ int queueId) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException {
+ QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setQueueId(queueId);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
+ String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+ return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(this.snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
+ request, SnodeConstant.defaultTimeoutMills);
+ }
+
+ @Override
+ public RemotingCommand getMaxOffsetInQueue(String enodeName,
+ RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
+ String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+ return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
+ request, SnodeConstant.defaultTimeoutMills);
+ }
+
+ @Override
+ public RemotingCommand getOffsetByTimestamp(String enodeName,
+ RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ String addr = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+ return this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(), addr),
+ request, SnodeConstant.defaultTimeoutMills);
+ }
+
+ @Override
+ public RemotingCommand creatTopic(String enodeName,
+ TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
+ requestHeader.setTopic(topicConfig.getTopicName());
+ requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
+ requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
+ requestHeader.setPerm(topicConfig.getPerm());
+ requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
+ requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
+ requestHeader.setOrder(topicConfig.isOrder());
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
+ String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+ return this.snodeController.getRemotingClient().invokeSync(address,
+ request, SnodeConstant.defaultTimeoutMills);
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
new file mode 100644
index 0000000..fe28571
--- /dev/null
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
@@ -0,0 +1,208 @@
+package org.apache.rocketmq.snode.service.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.TopAddressing;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.constant.SnodeConstant;
+import org.apache.rocketmq.snode.exception.SnodeException;
+import org.apache.rocketmq.snode.service.NnodeService;
+
+public class NnodeServiceImpl implements NnodeService {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+ private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
+ private String nameSrvAddr = null;
+ private SnodeController snodeController;
+ private ConcurrentHashMap<String /*Topic*/, TopicRouteData> topicRouteDataMap = new ConcurrentHashMap<>(1000);
+ private ClusterInfo clusterInfo;
+
+ public NnodeServiceImpl(SnodeController snodeController) {
+ this.snodeController = snodeController;
+ }
+
+ @Override
+ public void registerSnode(SnodeConfig snodeConfig) {
+ List<String> nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList();
+ RemotingCommand remotingCommand = new RemotingCommand();
+ RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
+ requestHeader.setSnodeAddr(snodeConfig.getSnodeAddr());
+ requestHeader.setSnodeName(snodeConfig.getSnodeName());
+ requestHeader.setClusterName(snodeConfig.getClusterName());
+ remotingCommand.setCustomHeader(requestHeader);
+ remotingCommand.setCode(RequestCode.REGISTER_SNODE);
+ if (nnodeAddressList != null && nnodeAddressList.size() > 0) {
+ for (String nodeAddress : nnodeAddressList) {
+ try {
+ this.snodeController.getRemotingClient().invokeSync(nodeAddress, remotingCommand, SnodeConstant.heartbeatTimeout);
+ } catch (Exception ex) {
+ log.warn("Register Snode to Nnode addr: {} error, ex:{} ", nodeAddress, ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void updateTopicRouteDataByTopic() {
+ Set<String> topSet = topicRouteDataMap.keySet();
+ for (String topic : topSet) {
+ try {
+ TopicRouteData topicRouteData = getTopicRouteDataByTopic(topic, false);
+ topicRouteDataMap.put(topic, topicRouteData);
+ } catch (Exception ex) {
+ log.error("Update topic {} error: {}", topic, ex);
+ }
+ }
+ }
+
+ private TopicRouteData getTopicRouteDataByTopicFromNnode(String topic,
+ boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
+ requestHeader.setTopic(topic);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
+ RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.defaultTimeoutMills);
+ log.info("getTopicRouteInfoFromNameServer response: " + response);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.TOPIC_NOT_EXIST: {
+ if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
+ log.warn("Topic [{}] RouteInfo is not exist value", topic);
+ }
+ break;
+ }
+ case ResponseCode.SUCCESS: {
+ byte[] body = response.getBody();
+ if (body != null) {
+ return TopicRouteData.decode(body, TopicRouteData.class);
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+ @Override
+ public TopicRouteData getTopicRouteDataByTopic(
+ String topic,
+ boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ if (topic == null || "".equals(topic)) {
+ return null;
+ }
+
+ TopicRouteData topicRouteData = topicRouteDataMap.get(topic);
+ if (topicRouteData == null) {
+ topicRouteData = getTopicRouteDataByTopicFromNnode(topic, allowTopicNotExist);
+ if (topicRouteData != null) {
+ topicRouteDataMap.put(topic, topicRouteData);
+ }
+ }
+ return topicRouteData;
+ }
+
+ @Override
+ public void updateNnodeAddressList(final String addrs) {
+ List<String> list = new ArrayList<String>();
+ String[] addrArray = addrs.split(";");
+ for (String addr : addrArray) {
+ list.add(addr);
+ }
+ this.snodeController.getRemotingClient().updateNameServerAddressList(list);
+ }
+
+ @Override
+ public String fetchNnodeAdress() {
+ try {
+ String addrs = this.topAddressing.fetchNSAddr();
+ if (addrs != null) {
+ if (!addrs.equals(this.nameSrvAddr)) {
+ log.info("Nnode server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
+ this.updateNnodeAddressList(addrs);
+ this.nameSrvAddr = addrs;
+ return nameSrvAddr;
+ }
+ }
+ } catch (Exception e) {
+ log.error("FetchNnodeServerAddr Exception", e);
+ }
+ return nameSrvAddr;
+ }
+
+ @Override
+ public ClusterInfo updateEnodeClusterInfo() throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
+
+ RemotingCommand response = this.snodeController.getRemotingClient().invokeSync(null, request, SnodeConstant.defaultTimeoutMills);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ ClusterInfo clusterInfo = ClusterInfo.decode(response.getBody(), ClusterInfo.class);
+ this.clusterInfo = clusterInfo;
+ return clusterInfo;
+ }
+ default:
+ break;
+ }
+ log.error("Update Cluster info error: {}", response);
+ return clusterInfo;
+ }
+
+ public Set<String> getEnodeClusterInfo(String clusterName) {
+ if (this.clusterInfo == null) {
+ try {
+ updateEnodeClusterInfo();
+ } catch (Exception ex) {
+ log.error("Update Cluster info error:{}", ex);
+ }
+ }
+ return this.clusterInfo.getClusterAddrTable().get(clusterName);
+ }
+
+ @Override
+ public String getAddressByEnodeName(String enodeName,
+ boolean isUseSlave) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException {
+ if (this.clusterInfo == null) {
+ clusterInfo = this.updateEnodeClusterInfo();
+ }
+ if (this.clusterInfo != null) {
+ return this.clusterInfo.getBrokerAddrTable().get(enodeName).getBrokerAddrs().get(MixAll.MASTER_ID);
+ }
+ return null;
+ }
+}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
index 23d8867..384a0c2 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
@@ -26,21 +26,22 @@ import org.apache.rocketmq.common.protocol.heartbeat.SnodeData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.ScheduledService;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
public class ScheduledServiceImpl implements ScheduledService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
- private SnodeOuterService snodeOuterService;
+ private SnodeController snodeController;
+
private SnodeConfig snodeConfig;
private final RemotingCommand enodeHeartbeat;
- public ScheduledServiceImpl(SnodeOuterService snodeOuterService, SnodeConfig snodeConfig) {
- this.snodeOuterService = snodeOuterService;
- this.snodeConfig = snodeConfig;
+ public ScheduledServiceImpl(SnodeController snodeController) {
+ this.snodeController = snodeController;
+ this.snodeConfig = snodeController.getSnodeConfig();
enodeHeartbeat = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
HeartbeatData heartbeatData = new HeartbeatData();
heartbeatData.setClientID(snodeConfig.getSnodeName());
@@ -64,7 +65,7 @@ public class ScheduledServiceImpl implements ScheduledService {
@Override
public void run() {
try {
- snodeOuterService.sendHearbeat(enodeHeartbeat);
+ snodeController.getEnodeService().sendHearbeat(enodeHeartbeat);
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
@@ -76,7 +77,7 @@ public class ScheduledServiceImpl implements ScheduledService {
@Override
public void run() {
try {
- snodeOuterService.fetchNameServerAddr();
+ snodeController.getNnodeService().fetchNnodeAdress();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
@@ -87,7 +88,18 @@ public class ScheduledServiceImpl implements ScheduledService {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
- snodeOuterService.registerSnode(snodeConfig);
+ snodeController.getNnodeService().registerSnode(snodeConfig);
+ }
+ }, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ snodeController.getEnodeService().updateEnodeAddr(snodeConfig.getClusterName());
+ } catch (Exception ex) {
+ log.warn("Update broker addr error:{}", ex);
+ }
}
}, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
@@ -95,13 +107,34 @@ public class ScheduledServiceImpl implements ScheduledService {
@Override
public void run() {
try {
- snodeOuterService.updateEnodeAddr(snodeConfig.getClusterName());
+ snodeController.getNnodeService().updateTopicRouteDataByTopic();
} catch (Exception ex) {
log.warn("Update broker addr error:{}", ex);
}
}
}, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ snodeController.getNnodeService().updateEnodeClusterInfo();
+ } catch (Exception ex) {
+ log.warn("Update broker addr error:{}", ex);
+ }
+ }
+ }, 1000 * 10, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ snodeController.getConsumerOffsetManager().persist();
+ } catch (Throwable e) {
+ log.error("ScheduledTask fetchNameServerAddr exception", e);
+ }
+ }
+ }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
@Override
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
deleted file mode 100644
index 1863f6b..0000000
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
+++ /dev/null
@@ -1,280 +0,0 @@
-package org.apache.rocketmq.snode.service.impl;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.concurrent.CompleteFuture;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.namesrv.TopAddressing;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
-import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
-import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.RemotingClientFactory;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.netty.ResponseFuture;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.SnodeController;
-import org.apache.rocketmq.snode.config.SnodeConfig;
-import org.apache.rocketmq.snode.constant.SnodeConstant;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
-
-public class SnodeOuterServiceImpl implements SnodeOuterService {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
- private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
- private String nameSrvAddr = null;
- private RemotingClient client;
- private SnodeController snodeController;
- private static SnodeOuterServiceImpl snodeOuterService;
- private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> enodeTable =
- new ConcurrentHashMap<>();
-
- private SnodeOuterServiceImpl() {
-
- }
-
- public static SnodeOuterServiceImpl getInstance(SnodeController snodeController) {
- if (snodeOuterService == null) {
- synchronized (SnodeOuterServiceImpl.class) {
- if (snodeOuterService == null) {
- snodeOuterService = new SnodeOuterServiceImpl(snodeController);
- return snodeOuterService;
- }
- }
- }
- return snodeOuterService;
- }
-
- private SnodeOuterServiceImpl(SnodeController snodeController) {
- this.snodeController = snodeController;
- this.client = RemotingClientFactory.createInstance().init(snodeController.getNettyClientConfig(), null);
- }
-
- @Override
- public void start() {
- this.client.start();
- }
-
- @Override
- public void shutdown() {
- this.client.shutdown();
- }
-
- @Override
- public void sendHearbeat(RemotingCommand remotingCommand) {
- for (Map.Entry<String, HashMap<Long, String>> entry : enodeTable.entrySet()) {
- String enodeAddr = entry.getValue().get(MixAll.MASTER_ID);
- if (enodeAddr != null) {
- try {
- RemotingCommand response = this.client.invokeSync(enodeAddr, remotingCommand, SnodeConstant.defaultTimeoutMills);
- } catch (Exception ex) {
- log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex);
- }
- }
- }
- }
-
- @Override
- public CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
- RemotingCommand request) {
- try {
- final PullMessageRequestHeader requestHeader =
- (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
- this.client.invokeAsync(requestHeader.getEnodeAddr(), request, SnodeConstant.defaultTimeoutMills, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- RemotingCommand response = responseFuture.getResponseCommand();
- snodeController.getSnodeServer().sendResponse(context, response);
- }
- });
- return null;
- } catch (Exception ex) {
- log.error("pull message async error:", ex);
- }
- return null;
- }
-
- @Override
- public void saveSubscriptionData(RemotingCommand remotingCommand) {
-
- }
-
- @Override
- public String fetchNameServerAddr() {
- try {
- String addrs = this.topAddressing.fetchNSAddr();
- if (addrs != null) {
- if (!addrs.equals(this.nameSrvAddr)) {
- log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs);
- this.updateNameServerAddressList(addrs);
- this.nameSrvAddr = addrs;
- return nameSrvAddr;
- }
- }
- } catch (Exception e) {
- log.error("fetchNameServerAddr Exception", e);
- }
- return nameSrvAddr;
- }
-
- private ClusterInfo getBrokerClusterInfo(
- final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
- RemotingCommand response = this.client.invokeSync(null, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return ClusterInfo.decode(response.getBody(), ClusterInfo.class);
- }
- default:
- break;
- }
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
- @Override
- public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- synchronized (this) {
- ClusterInfo clusterInfo = getBrokerClusterInfo(SnodeConstant.defaultTimeoutMills);
- if (clusterInfo != null) {
- HashMap<String, Set<String>> brokerAddrs = clusterInfo.getClusterAddrTable();
- for (Map.Entry<String, Set<String>> entry : brokerAddrs.entrySet()) {
- Set<String> brokerNames = entry.getValue();
- if (brokerNames != null) {
- for (String brokerName : brokerNames) {
- enodeTable.put(brokerName, clusterInfo.getBrokerAddrTable().get(brokerName).getBrokerAddrs());
- }
- }
- }
- }
- }
- }
-
- public void updateNameServerAddressList(final String addrs) {
- List<String> list = new ArrayList<String>();
- String[] addrArray = addrs.split(";");
- for (String addr : addrArray) {
- list.add(addr);
- }
- this.client.updateNameServerAddressList(list);
- }
-
- public void registerSnode(SnodeConfig snodeConfig) {
- List<String> nameServerAddressList = this.client.getNameServerAddressList();
- RemotingCommand remotingCommand = new RemotingCommand();
- RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
- requestHeader.setSnodeAddr(snodeConfig.getSnodeAddr());
- requestHeader.setSnodeName(snodeConfig.getSnodeName());
- requestHeader.setClusterName(snodeConfig.getClusterName());
- remotingCommand.setCustomHeader(requestHeader);
- remotingCommand.setCode(RequestCode.REGISTER_SNODE);
- if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
- for (String nameServer : nameServerAddressList) {
- try {
- this.client.invokeSync(nameSrvAddr, remotingCommand, SnodeConstant.heartbeatTimeout);
- } catch (Exception ex) {
- log.warn("Register Snode to Nameserver addr: {} error, ex:{} ", nameServer, ex);
- }
- }
- }
- }
-
- @Override
- public CompletableFuture<RemotingCommand> sendMessage(RemotingCommand request) {
- CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
- try {
- SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
- this.client.invokeAsync(sendMessageRequestHeaderV2.getN(), request, SnodeConstant.defaultTimeoutMills, (responseFuture) -> {
- future.complete(responseFuture.getResponseCommand());
- });
- } catch (Exception ex) {
- log.error("Send message async error:{}", ex);
- future.completeExceptionally(ex);
- }
- return future;
- }
-
- @Override
- public void notifyConsumerIdsChanged(
- final Channel channel,
- final String consumerGroup) {
- if (null == consumerGroup) {
- log.error("notifyConsumerIdsChanged consumerGroup is null");
- return;
- }
-
- NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
- requestHeader.setConsumerGroup(consumerGroup);
- RemotingCommand request =
- RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
-
- try {
- this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.oneWaytimeout);
- } catch (Exception e) {
- log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
- }
- }
-
- @Override
- public RemotingCommand creatTopic(TopicConfig topicConfig) {
-// CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
-// requestHeader.setTopic(topicConfig.getTopicName());
-// requestHeader.setDefaultTopic(defaultTopic);
-// requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
-// requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
-// requestHeader.setPerm(topicConfig.getPerm());
-// requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
-// requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
-// requestHeader.setOrder(topicConfig.isOrder());
-//
-// RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
-//
-// RemotingCommand response = this.client.invokeSync(,
-// request, defaultTimeoutMills);
-// assert response != null;
-// switch (response.getCode()) {
-// case ResponseCode.SUCCESS: {
-// return;
-// }
-// default:
-// break;
-// }
- return null;
- }
-}