You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/02/23 13:33:06 UTC

[rocketmq] branch snode updated: Add lockMQ function for support order message

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 3706778  Add lockMQ function for support order message
3706778 is described below

commit 3706778b21b4381d9b55915ef34fdff00bf063cc
Author: duhenglucky <du...@gmail.com>
AuthorDate: Sat Feb 23 21:32:41 2019 +0800

    Add lockMQ function for support order message
---
 .../broker/processor/AdminBrokerProcessor.java     | 19 +++++----
 .../org/apache/rocketmq/snode/SnodeController.java | 46 +++++++---------------
 .../rocketmq/snode/constant/SnodeConstant.java     |  2 +
 .../snode/processor/ConsumerManageProcessor.java   | 16 ++++++++
 .../rocketmq/snode/service/EnodeService.java       |  6 +++
 .../snode/service/impl/LocalEnodeServiceImpl.java  | 23 +++++++----
 .../snode/service/impl/RemoteEnodeServiceImpl.java | 20 ++++++++++
 .../snode/service/impl/ScheduledServiceImpl.java   |  8 +++-
 8 files changed, 88 insertions(+), 52 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 914bb5b..8492493 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -43,8 +43,6 @@ import org.apache.rocketmq.common.admin.OffsetWrapper;
 import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageId;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -102,14 +100,16 @@ import org.apache.rocketmq.common.stats.StatsItem;
 import org.apache.rocketmq.common.stats.StatsSnapshot;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.filter.util.BitsArray;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.RequestProcessor;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.RequestProcessor;
 import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
-import org.apache.rocketmq.remoting.serialize.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
 import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.ConsumeQueueExt;
@@ -129,7 +129,7 @@ public class AdminBrokerProcessor implements RequestProcessor {
     @Override
     public RemotingCommand processRequest(RemotingChannel remotingChannel,
         RemotingCommand request) throws RemotingCommandException {
-        NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel;
+        NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
         ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
         switch (request.getCode()) {
             case RequestCode.UPDATE_AND_CREATE_TOPIC:
@@ -250,7 +250,7 @@ public class AdminBrokerProcessor implements RequestProcessor {
 
         this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
 
-        this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
+        this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
 
         return null;
     }
@@ -440,11 +440,10 @@ public class AdminBrokerProcessor implements RequestProcessor {
         return response;
     }
 
-    private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
-        RemotingCommand request) throws RemotingCommandException {
+    public RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
+        RemotingCommand request) {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
-
         Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(
             requestBody.getConsumerGroup(),
             requestBody.getMqSet(),
@@ -459,7 +458,7 @@ public class AdminBrokerProcessor implements RequestProcessor {
         return response;
     }
 
-    private RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx,
+    public RemotingCommand unlockBatchMQ(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 5ca8a19..6911d9a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -313,38 +313,20 @@ public class SnodeController {
     }
 
     public void registerProcessor() {
-        this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor,
-            this.sendMessageExecutor);
-        this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor,
-            this.sendMessageExecutor);
-        this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor,
-            this.heartbeatExecutor);
-        this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor,
-            this.heartbeatExecutor);
-        this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor,
-            this.heartbeatExecutor);
-        this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor,
-            this.pullMessageExecutor);
-        this.snodeServer
-            .registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor,
-                this.consumerManageExecutor);
-        this.snodeServer
-            .registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor,
-                this.consumerManageExecutor);
-        this.snodeServer
-            .registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor,
-                this.consumerManageExecutor);
-        this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor,
-            this.consumerManageExecutor);
-        this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor,
-            this.consumerManageExecutor);
-        this.snodeServer
-            .registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor,
-                this.consumerManageExecutor);
-        this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor,
-            this.consumerManageExecutor);
-        this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE,
-            defaultMqttMessageProcessor, handleMqttMessageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor);
+        this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor);
+        this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor);
+        this.snodeServer.registerProcessor(RequestCode.PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.GET_MIN_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.GET_MAX_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, consumerManageProcessor, this.consumerManageExecutor);
+        this.snodeServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, consumerManageProcessor, this.consumerManageExecutor);
+        this.mqttRemotingServer.registerProcessor(RequestCode.MQTT_MESSAGE, defaultMqttMessageProcessor, handleMqttMessageExecutor);
 
     }
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
index 16deb6d..7798c0d 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
@@ -32,4 +32,6 @@ public class SnodeConstant {
     public static final AttributeKey<ClientRole> NETTY_CLIENT_ROLE_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client.role");
 
     public static final AttributeKey<Client> NETTY_CLIENT_ATTRIBUTE_KEY = AttributeKey.valueOf("netty.client");
+
+    public static final String ENODE_NAME = "enodeName";
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
index 1953de1..feb1b3d 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -74,6 +74,10 @@ public class ConsumerManageProcessor implements RequestProcessor {
                 return getMinOffset(remotingChannel, request);
             case RequestCode.CREATE_RETRY_TOPIC:
                 return createRetryTopic(remotingChannel, request);
+            case RequestCode.LOCK_BATCH_MQ:
+                return lockBatchMQ(remotingChannel, request);
+            case RequestCode.UNLOCK_BATCH_MQ:
+                return unlockBatchMQ(remotingChannel, request);
             default:
                 break;
         }
@@ -214,5 +218,17 @@ public class ConsumerManageProcessor implements RequestProcessor {
         requestHeader.getEnodeName();
         return this.snodeController.getEnodeService().creatRetryTopic(remotingChannel, requestHeader.getEnodeName(), request);
     }
+
+    public RemotingCommand lockBatchMQ(RemotingChannel remotingChannel,
+        RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException {
+        return this.snodeController.getEnodeService().lockBatchMQ(remotingChannel, request);
+    }
+
+    public RemotingCommand unlockBatchMQ(RemotingChannel remotingChannel,
+        RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException {
+        return this.snodeController.getEnodeService().unlockBatchMQ(remotingChannel, request);
+    }
 }
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
index e2db16e..cbb9c60 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
@@ -121,4 +121,10 @@ public interface EnodeService {
         long timestamp,
         RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
         RemotingConnectException, RemotingCommandException;
+
+    RemotingCommand lockBatchMQ(final RemotingChannel remotingChannel,
+        final RemotingCommand remotingCommand) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
+
+    RemotingCommand unlockBatchMQ(final RemotingChannel remotingChannel,
+        final RemotingCommand remotingCommand) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
index 5789104..e800777 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.snode.service.impl;
 
+import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -24,6 +25,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.netty.CodecHelper;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.service.EnodeService;
 
@@ -100,14 +102,6 @@ public class LocalEnodeServiceImpl implements EnodeService {
     public void persistOffset(RemotingChannel remotingChannel, String enodeName, String groupName, String topic,
         int queueId, long offset) {
         try {
-//            UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
-//            requestHeader.setConsumerGroup(groupName);
-//            requestHeader.setTopic(topic);
-//            requestHeader.setQueueId(queueId);
-//            requestHeader.setCommitOffset(offset);
-//            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
-//            this.brokerController.getConsumerManageProcessor().processRequest(remotingChannel, request);
-
             this.brokerController.getConsumerOffsetManager().commitOffset(remotingChannel.remoteAddress().toString(), groupName,
                 topic, queueId, offset);
         } catch (Exception ex) {
@@ -135,4 +129,17 @@ public class LocalEnodeServiceImpl implements EnodeService {
         String topic, int queueId, long timestamp, RemotingCommand request) {
         return this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, queueId, timestamp);
     }
+
+    @Override public RemotingCommand lockBatchMQ(RemotingChannel remotingChannel, RemotingCommand request) {
+        NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
+        ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
+        return this.brokerController.getAdminProcessor().lockBatchMQ(ctx, request);
+    }
+
+    @Override public RemotingCommand unlockBatchMQ(RemotingChannel remotingChannel, RemotingCommand request) {
+        NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
+        ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
+        log.info("un");
+        return this.brokerController.getAdminProcessor().lockBatchMQ(ctx, request);
+    }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
index 311406f..e305699 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
@@ -263,4 +263,24 @@ public class RemoteEnodeServiceImpl implements EnodeService {
         return this.snodeController.getRemotingClient().invokeSync(address,
             request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
     }
+
+    @Override
+    public RemotingCommand lockBatchMQ(RemotingChannel remotingChannel,
+        RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        return transferToEnode(request);
+    }
+
+    @Override
+    public RemotingCommand unlockBatchMQ(RemotingChannel remotingChannel,
+        RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        return transferToEnode(request);
+    }
+
+    private RemotingCommand transferToEnode(
+        final RemotingCommand request) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        String enodeName = request.getExtFields().get(SnodeConstant.ENODE_NAME);
+        String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
+        return this.snodeController.getRemotingClient().invokeSync(address,
+            request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
+    }
 }
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
index 79e1d2c..5d10cfa 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
@@ -101,7 +101,9 @@ public class ScheduledServiceImpl implements ScheduledService {
             @Override
             public void run() {
                 try {
-                    snodeController.getEnodeService().updateEnodeAddress(snodeConfig.getClusterName());
+                    if (!snodeController.getSnodeConfig().isEmbeddedModeEnable()) {
+                        snodeController.getEnodeService().updateEnodeAddress(snodeConfig.getClusterName());
+                    }
                 } catch (Exception ex) {
                     log.warn("Update broker addr error:{}", ex);
                 }
@@ -123,7 +125,9 @@ public class ScheduledServiceImpl implements ScheduledService {
             @Override
             public void run() {
                 try {
-                    snodeController.getNnodeService().updateEnodeClusterInfo();
+                    if (!snodeController.getSnodeConfig().isEmbeddedModeEnable()) {
+                        snodeController.getNnodeService().updateEnodeClusterInfo();
+                    }
                 } catch (Exception ex) {
                     log.warn("Update broker addr error:{}", ex);
                 }