You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/02/23 13:33:06 UTC
[rocketmq] branch snode updated: Add lockMQ function for support
order message
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 3706778 Add lockMQ function for support order message
3706778 is described below
commit 3706778b21b4381d9b55915ef34fdff00bf063cc
Author: duhenglucky <du...@gmail.com>
AuthorDate: Sat Feb 23 21:32:41 2019 +0800
Add lockMQ function for support order message
---
.../broker/processor/AdminBrokerProcessor.java | 19 +++++----
.../org/apache/rocketmq/snode/SnodeController.java | 46 +++++++---------------
.../rocketmq/snode/constant/SnodeConstant.java | 2 +
.../snode/processor/ConsumerManageProcessor.java | 16 ++++++++
.../rocketmq/snode/service/EnodeService.java | 6 +++
.../snode/service/impl/LocalEnodeServiceImpl.java | 23 +++++++----
.../snode/service/impl/RemoteEnodeServiceImpl.java | 20 ++++++++++
.../snode/service/impl/ScheduledServiceImpl.java | 8 +++-
8 files changed, 88 insertions(+), 52 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 914bb5b..8492493 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -43,8 +43,6 @@ import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -102,14 +100,16 @@ import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsSnapshot;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.filter.util.BitsArray;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
-import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
@@ -129,7 +129,7 @@ public class AdminBrokerProcessor implements RequestProcessor {
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
- NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel;
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
switch (request.getCode()) {
case RequestCode.UPDATE_AND_CREATE_TOPIC:
@@ -250,7 +250,7 @@ public class AdminBrokerProcessor implements RequestProcessor {
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
- this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
+ this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
return null;
}
@@ -440,11 +440,10 @@ public class AdminBrokerProcessor implements RequestProcessor {
return response;
}
- private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
- RemotingCommand request) throws RemotingCommandException {
+ public RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
+ RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
-
Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(
requestBody.getConsumerGroup(),
requestBody.getMqSet(),
@@ -459,7 +458,7 @@ public class AdminBrokerProcessor implements RequestProcessor {
return response;
}
- private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx,
+ public RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 5ca8a19..6911d9a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -313,38 +313,20 @@ public class SnodeController {
}
public void registerProcessor() {
- this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor,
- this.sendMessageExecutor);
- this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor,
- this.sendMessageExecutor);
- this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor,
- this.heartbeatExecutor);
- this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor,
- this.heartbeatExecutor);
- this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor,
- this.heartbeatExecutor);
- this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor,
- this.pullMessageExecutor);
- this.snodeServer
- .registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor,
- this.consumerManageExecutor);
- this.snodeServer
- .registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor,
- this.consumerManageExecutor);
- this.snodeServer
- .registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor,
- this.consumerManageExecutor);
- this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor,
- this.consumerManageExecutor);
- this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor,
- this.consumerManageExecutor);
- this.snodeServer
- .registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor,
- this.consumerManageExecutor);
- this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor,
- this.consumerManageExecutor);
- this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE,
- defaultMqttMessageProcessor, handleMqttMessageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor);
+ this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor);
+ this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor);
+ this.snodeServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor);
+ this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor);
+ this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor);
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
index 16deb6d..7798c0d 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
@@ -32,4 +32,6 @@ public class SnodeConstant {
public static final AttributeKey<ClientRole> NETTY_CLIENT_ROLE_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client.role");
public static final AttributeKey<Client> NETTY_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client");
+
+ public static final String ENODE_NAME = "enodeName";
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
index 1953de1..feb1b3d 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -74,6 +74,10 @@ public class ConsumerManageProcessor implements RequestProcessor {
return getMinOffset(remotingChannel, request);
case RequestCode.CREATE_RETRY_TOPIC:
return createRetryTopic(remotingChannel, request);
+ case RequestCode.LOCK_BATCH_MQ:
+ return lockBatchMQ(remotingChannel, request);
+ case RequestCode.UNLOCK_BATCH_MQ:
+ return unlockBatchMQ(remotingChannel, request);
default:
break;
}
@@ -214,5 +218,17 @@ public class ConsumerManageProcessor implements RequestProcessor {
requestHeader.getEnodeName();
return this.snodeController.getEnodeService().creatRetryTopic(remotingChannel, requestHeader.getEnodeName(), request);
}
+
+ public RemotingCommand lockBatchMQ(RemotingChannel remotingChannel,
+ RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException {
+ return this.snodeController.getEnodeService().lockBatchMQ(remotingChannel, request);
+ }
+
+ public RemotingCommand unlockBatchMQ(RemotingChannel remotingChannel,
+ RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException {
+ return this.snodeController.getEnodeService().unlockBatchMQ(remotingChannel, request);
+ }
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
index e2db16e..cbb9c60 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
@@ -121,4 +121,10 @@ public interface EnodeService {
long timestamp,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, RemotingCommandException;
+
+ RemotingCommand lockBatchMQ(final RemotingChannel remotingChannel,
+ final RemotingCommand remotingCommand) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+
+ RemotingCommand unlockBatchMQ(final RemotingChannel remotingChannel,
+ final RemotingCommand remotingCommand) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
index 5789104..e800777 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.snode.service.impl;
+import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -24,6 +25,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.netty.CodecHelper;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.service.EnodeService;
@@ -100,14 +102,6 @@ public class LocalEnodeServiceImpl implements EnodeService {
public void persistOffset(RemotingChannel remotingChannel, String enodeName, String groupName, String topic,
int queueId, long offset) {
try {
-// UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
-// requestHeader.setConsumerGroup(groupName);
-// requestHeader.setTopic(topic);
-// requestHeader.setQueueId(queueId);
-// requestHeader.setCommitOffset(offset);
-// RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
-// this.brokerController.getConsumerManageProcessor().processRequest(remotingChannel, request);
-
this.brokerController.getConsumerOffsetManager().commitOffset(remotingChannel.remoteAddress().toString(), groupName,
topic, queueId, offset);
} catch (Exception ex) {
@@ -135,4 +129,17 @@ public class LocalEnodeServiceImpl implements EnodeService {
String topic, int queueId, long timestamp, RemotingCommand request) {
return this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, queueId, timestamp);
}
+
+ @Override public RemotingCommand lockBatchMQ(RemotingChannel remotingChannel, RemotingCommand request) {
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
+ ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
+ return this.brokerController.getAdminProcessor().lockBatchMQ(ctx, request);
+ }
+
+ @Override public RemotingCommand unlockBatchMQ(RemotingChannel remotingChannel, RemotingCommand request) {
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
+ ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
+ log.info("un");
+ return this.brokerController.getAdminProcessor().lockBatchMQ(ctx, request);
+ }
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
index 311406f..e305699 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
@@ -263,4 +263,24 @@ public class RemoteEnodeServiceImpl implements EnodeService {
return this.snodeController.getRemotingClient().invokeSync(address,
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
}
+
+ @Override
+ public RemotingCommand lockBatchMQ(RemotingChannel remotingChannel,
+ RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ return transferToEnode(request);
+ }
+
+ @Override
+ public RemotingCommand unlockBatchMQ(RemotingChannel remotingChannel,
+ RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ return transferToEnode(request);
+ }
+
+ private RemotingCommand transferToEnode(
+ final RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ String enodeName = request.getExtFields().get(SnodeConstant.ENODE_NAME);
+ String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+ return this.snodeController.getRemotingClient().invokeSync(address,
+ request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
+ }
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
index 79e1d2c..5d10cfa 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
@@ -101,7 +101,9 @@ public class ScheduledServiceImpl implements ScheduledService {
@Override
public void run() {
try {
- snodeController.getEnodeService().updateEnodeAddress(snodeConfig.getClusterName());
+ if (!snodeController.getSnodeConfig().isEmbeddedModeEnable()) {
+ snodeController.getEnodeService().updateEnodeAddress(snodeConfig.getClusterName());
+ }
} catch (Exception ex) {
log.warn("Update broker addr error:{}", ex);
}
@@ -123,7 +125,9 @@ public class ScheduledServiceImpl implements ScheduledService {
@Override
public void run() {
try {
- snodeController.getNnodeService().updateEnodeClusterInfo();
+ if (!snodeController.getSnodeConfig().isEmbeddedModeEnable()) {
+ snodeController.getNnodeService().updateEnodeClusterInfo();
+ }
} catch (Exception ex) {
log.warn("Update broker addr error:{}", ex);
}