You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/04/22 05:52:42 UTC

[rocketmq] branch 5.0.0-beta updated: sync message request mode from master (#4101)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta by this push:
     new 531cdb0be sync message request mode from master (#4101)
531cdb0be is described below

commit 531cdb0bed16b2f418dcf1aeaf474916f23adaec
Author: cserwen <cs...@163.com>
AuthorDate: Fri Apr 22 13:52:34 2022 +0800

    sync message request mode from master (#4101)
---
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 18 +++++++
 .../broker/processor/AdminBrokerProcessor.java     | 29 +++++++++++
 .../broker/processor/QueryAssignmentProcessor.java |  3 ++
 .../rocketmq/broker/slave/SlaveSynchronize.java    | 24 +++++++++
 .../rocketmq/common/protocol/RequestCode.java      |  1 +
 .../body/MessageRequestModeSerializeWrapper.java   | 35 +++++++++++++
 .../MessageRequestModeSerializeWrapperTest.java    | 58 ++++++++++++++++++++++
 7 files changed, 168 insertions(+)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index b637951b2..00b1cbb4d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -49,6 +49,7 @@ import org.apache.rocketmq.common.protocol.body.GetBrokerMemberGroupResponseBody
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
+import org.apache.rocketmq.common.protocol.body.MessageRequestModeSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
@@ -970,4 +971,21 @@ public class BrokerOuterAPI {
     public RpcClient getRpcClient() {
         return rpcClient;
     }
+
+    public MessageRequestModeSerializeWrapper getAllMessageRequestMode(
+        final String addr) throws RemotingSendRequestException, RemotingConnectException,
+        MQBrokerException, RemotingTimeoutException, InterruptedException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_MESSAGE_REQUEST_MODE, null);
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return MessageRequestModeSerializeWrapper.decode(response.getBody(), MessageRequestModeSerializeWrapper.class);
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index a283228f9..0917858f7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -231,6 +231,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 return this.getAllConsumerOffset(ctx, request);
             case RequestCode.GET_ALL_DELAY_OFFSET:
                 return this.getAllDelayOffset(ctx, request);
+            case RequestCode.GET_ALL_MESSAGE_REQUEST_MODE:
+                return this.getAllMessageRequestMode(ctx, request);
             case RequestCode.INVOKE_BROKER_TO_RESET_OFFSET:
                 return this.resetOffset(ctx, request);
             case RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS:
@@ -1516,6 +1518,33 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         return response;
     }
 
+    private RemotingCommand getAllMessageRequestMode(ChannelHandlerContext ctx, RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        String content = this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager().encode();
+        if (content != null && content.length() > 0) {
+            try {
+                response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+            } catch (UnsupportedEncodingException e) {
+                LOGGER.error("get all message request mode from master error.", e);
+
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("UnsupportedEncodingException " + e);
+                return response;
+            }
+        } else {
+            LOGGER.error("No message request mode in this broker, client: {} ", ctx.channel().remoteAddress());
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("No message request mode in this broker");
+            return response;
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        return response;
+    }
+
     public RemotingCommand resetOffset(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final ResetOffsetRequestHeader requestHeader =
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
index 6fe92108f..d548a356e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
@@ -315,4 +315,7 @@ public class QueryAssignmentProcessor implements NettyRequestProcessor {
         return response;
     }
 
+    public MessageRequestModeManager getMessageRequestModeManager() {
+        return messageRequestModeManager;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 9012eb9a8..09cfecf17 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -18,9 +18,11 @@ package org.apache.rocketmq.broker.slave;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.MessageRequestModeSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
@@ -55,6 +57,7 @@ public class SlaveSynchronize {
         this.syncConsumerOffset();
         this.syncDelayOffset();
         this.syncSubscriptionGroupConfig();
+        this.syncMessageRequestMode();
     }
 
     private void syncTopicConfig() {
@@ -156,4 +159,25 @@ public class SlaveSynchronize {
             }
         }
     }
+
+    private void syncMessageRequestMode() {
+        String masterAddrBak = this.masterAddr;
+        if (masterAddrBak != null  && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
+            try {
+                MessageRequestModeSerializeWrapper messageRequestModeSerializeWrapper =
+                        this.brokerController.getBrokerOuterAPI().getAllMessageRequestMode(masterAddrBak);
+
+                MessageRequestModeManager messageRequestModeManager =
+                        this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager();
+                messageRequestModeManager.getMessageRequestModeMap().clear();
+                messageRequestModeManager.getMessageRequestModeMap().putAll(
+                        messageRequestModeSerializeWrapper.getMessageRequestModeMap()
+                );
+                messageRequestModeManager.persist();
+                LOGGER.info("Update slave Message Request Mode from master, {}", masterAddrBak);
+            } catch (Exception e) {
+                LOGGER.error("SyncMessageRequestMode Exception, {}", masterAddrBak, e);
+            }
+        }
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 541227f4a..1b2692da0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -214,6 +214,7 @@ public class RequestCode {
 
     public static final int QUERY_ASSIGNMENT = 400;
     public static final int SET_MESSAGE_REQUEST_MODE = 401;
+    public static final int GET_ALL_MESSAGE_REQUEST_MODE = 402;
 
     public static final int UPDATE_AND_CREATE_STATIC_TOPIC = 513;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/MessageRequestModeSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/MessageRequestModeSerializeWrapper.java
new file mode 100644
index 000000000..4271ebf4e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/MessageRequestModeSerializeWrapper.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class MessageRequestModeSerializeWrapper extends RemotingSerializable {
+
+    private ConcurrentHashMap<String/* Topic */, ConcurrentHashMap<String/* Group */, SetMessageRequestModeRequestBody>>
+            messageRequestModeMap = new ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>>();
+
+    public ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>> getMessageRequestModeMap() {
+        return messageRequestModeMap;
+    }
+
+    public void setMessageRequestModeMap(ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>> messageRequestModeMap) {
+        this.messageRequestModeMap = messageRequestModeMap;
+    }
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/body/MessageRequestModeSerializeWrapperTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/body/MessageRequestModeSerializeWrapperTest.java
new file mode 100644
index 000000000..bea2f5fdc
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/protocol/body/MessageRequestModeSerializeWrapperTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MessageRequestModeSerializeWrapperTest {
+
+    @Test
+    public void testFromJson(){
+        MessageRequestModeSerializeWrapper  messageRequestModeSerializeWrapper = new MessageRequestModeSerializeWrapper();
+        ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>>
+                messageRequestModeMap = new ConcurrentHashMap<String, ConcurrentHashMap<String, SetMessageRequestModeRequestBody>>();
+        String topic = "TopicTest";
+        String group = "Consumer";
+        MessageRequestMode requestMode = MessageRequestMode.POP;
+        int popShareQueueNum = -1;
+        SetMessageRequestModeRequestBody requestBody = new SetMessageRequestModeRequestBody();
+        requestBody.setTopic(topic);
+        requestBody.setConsumerGroup(group);
+        requestBody.setMode(requestMode);
+        requestBody.setPopShareQueueNum(popShareQueueNum);
+        ConcurrentHashMap<String, SetMessageRequestModeRequestBody> map = new ConcurrentHashMap<>();
+        map.put(group, requestBody);
+        messageRequestModeMap.put(topic, map);
+
+        messageRequestModeSerializeWrapper.setMessageRequestModeMap(messageRequestModeMap);
+
+        String json = RemotingSerializable.toJson(messageRequestModeSerializeWrapper, true);
+        MessageRequestModeSerializeWrapper fromJson = RemotingSerializable.fromJson(json, MessageRequestModeSerializeWrapper.class);
+        assertThat(fromJson.getMessageRequestModeMap()).containsKey(topic);
+        assertThat(fromJson.getMessageRequestModeMap().get(topic)).containsKey(group);
+        assertThat(fromJson.getMessageRequestModeMap().get(topic).get(group).getTopic()).isEqualTo(topic);
+        assertThat(fromJson.getMessageRequestModeMap().get(topic).get(group).getConsumerGroup()).isEqualTo(group);
+        assertThat(fromJson.getMessageRequestModeMap().get(topic).get(group).getMode()).isEqualTo(requestMode);
+        assertThat(fromJson.getMessageRequestModeMap().get(topic).get(group).getPopShareQueueNum()).isEqualTo(popShareQueueNum);
+    }
+}