You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by lo...@apache.org on 2023/05/30 06:01:47 UTC

[rocketmq] branch develop updated: [ISSUE #6828] return the number of assignments equal to the number of messageQueues for order consumer (#6829)

This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 40ada807b3 [ISSUE #6828] return the number of assignments equal to the number of messageQueues for order consumer (#6829)
40ada807b3 is described below

commit 40ada807b3d649eb5db504a3f46e01a6facb1c0d
Author: lk <xd...@outlook.com>
AuthorDate: Tue May 30 14:01:25 2023 +0800

    [ISSUE #6828] return the number of assignments equal to the number of messageQueues for order consumer (#6829)
---
 .../proxy/grpc/v2/route/RouteActivity.java         | 42 ++++++++++++++++------
 .../proxy/grpc/v2/route/RouteActivityTest.java     | 23 ++++++++++++
 .../rocketmq/test/grpc/v2/ClusterGrpcIT.java       | 11 +++---
 .../apache/rocketmq/test/grpc/v2/GrpcBaseIT.java   | 23 ++++++++++++
 .../apache/rocketmq/test/grpc/v2/LocalGrpcIT.java  | 11 +++---
 5 files changed, 88 insertions(+), 22 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index 29b9034a42..eb7385f874 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -49,6 +49,7 @@ import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
 import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
 import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 
 public class RouteActivity extends AbstractMessingActivity {
 
@@ -106,6 +107,13 @@ public class RouteActivity extends AbstractMessingActivity {
                 addressList,
                 GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()));
 
+            boolean fifo = false;
+            SubscriptionGroupConfig config = this.messagingProcessor.getSubscriptionGroupConfig(ctx,
+                GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()));
+            if (config != null && config.isConsumeMessageOrderly()) {
+                fifo = true;
+            }
+
             List<Assignment> assignments = new ArrayList<>();
             Map<String, Map<Long, Broker>> brokerMap = buildBrokerMap(proxyTopicRouteData.getBrokerDatas());
             for (QueueData queueData : proxyTopicRouteData.getQueueDatas()) {
@@ -113,16 +121,30 @@ public class RouteActivity extends AbstractMessingActivity {
                     Map<Long, Broker> brokerIdMap = brokerMap.get(queueData.getBrokerName());
                     if (brokerIdMap != null) {
                         Broker broker = brokerIdMap.get(MixAll.MASTER_ID);
-                        MessageQueue defaultMessageQueue = MessageQueue.newBuilder()
-                            .setTopic(request.getTopic())
-                            .setId(-1)
-                            .setPermission(this.convertToPermission(queueData.getPerm()))
-                            .setBroker(broker)
-                            .build();
-
-                        assignments.add(Assignment.newBuilder()
-                            .setMessageQueue(defaultMessageQueue)
-                            .build());
+                        Permission permission = this.convertToPermission(queueData.getPerm());
+                        if (fifo) {
+                            for (int i = 0; i < queueData.getReadQueueNums(); i++) {
+                                MessageQueue defaultMessageQueue = MessageQueue.newBuilder()
+                                    .setTopic(request.getTopic())
+                                    .setId(i)
+                                    .setPermission(permission)
+                                    .setBroker(broker)
+                                    .build();
+                                assignments.add(Assignment.newBuilder()
+                                    .setMessageQueue(defaultMessageQueue)
+                                    .build());
+                            }
+                        } else {
+                            MessageQueue defaultMessageQueue = MessageQueue.newBuilder()
+                                .setTopic(request.getTopic())
+                                .setId(-1)
+                                .setPermission(permission)
+                                .setBroker(broker)
+                                .build();
+                            assignments.add(Assignment.newBuilder()
+                                .setMessageQueue(defaultMessageQueue)
+                                .build());
+                        }
 
                     }
                 }
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
index 30ff1c1ff7..ce98b7494d 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.proxy.service.metadata.MetadataService;
 import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -191,6 +192,28 @@ public class RouteActivityTest extends BaseActivityTest {
         assertEquals(grpcEndpoints, response.getAssignments(0).getMessageQueue().getBroker().getEndpoints());
     }
 
+    @Test
+    public void testQueryFifoAssignment() throws Throwable {
+        when(this.messagingProcessor.getTopicRouteDataForProxy(any(), any(), anyString()))
+            .thenReturn(createProxyTopicRouteData(2, 2, 6));
+        SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+        subscriptionGroupConfig.setConsumeMessageOrderly(true);
+        when(this.messagingProcessor.getSubscriptionGroupConfig(any(), anyString())).thenReturn(subscriptionGroupConfig);
+
+        QueryAssignmentResponse response = this.routeActivity.queryAssignment(
+            createContext(),
+            QueryAssignmentRequest.newBuilder()
+                .setEndpoints(grpcEndpoints)
+                .setTopic(GRPC_TOPIC)
+                .setGroup(GRPC_GROUP)
+                .build()
+        ).get();
+
+        assertEquals(Code.OK, response.getStatus().getCode());
+        assertEquals(2, response.getAssignmentsCount());
+        assertEquals(grpcEndpoints, response.getAssignments(0).getMessageQueue().getBroker().getEndpoints());
+    }
+
     private static ProxyTopicRouteData createProxyTopicRouteData(int r, int w, int p) {
         ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData();
         proxyTopicRouteData.getQueueDatas().add(createQueueData(r, w, p));
diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java
index 6e3146fa59..33c3aa2fb8 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.test.grpc.v2;
 
-import apache.rocketmq.v2.QueryAssignmentResponse;
 import apache.rocketmq.v2.QueryRouteResponse;
 import java.time.Duration;
 import java.util.Map;
@@ -74,12 +73,12 @@ public class ClusterGrpcIT extends GrpcBaseIT {
 
     @Test
     public void testQueryAssignment() throws Exception {
-        String topic = initTopic();
-        String group = "group";
-
-        QueryAssignmentResponse response = blockingStub.queryAssignment(buildQueryAssignmentRequest(topic, group));
+        super.testQueryAssignment();
+    }
 
-        assertQueryAssignment(response, BROKER_NUM);
+    @Test
+    public void testQueryFifoAssignment() throws Exception {
+        super.testQueryFifoAssignment();
     }
 
     @Test
diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
index 243c72dec5..5d8aec822a 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
@@ -229,6 +229,29 @@ public class GrpcBaseIT extends BaseConf {
             .build());
     }
 
+    public void testQueryAssignment() throws Exception {
+        String topic = initTopic();
+        String group = "group";
+
+        QueryAssignmentResponse response = blockingStub.queryAssignment(buildQueryAssignmentRequest(topic, group));
+
+        assertQueryAssignment(response, BROKER_NUM);
+    }
+
+    public void testQueryFifoAssignment() throws Exception {
+        String topic = initTopic(TopicMessageType.FIFO);
+        String group = MQRandomUtils.getRandomConsumerGroup();
+        SubscriptionGroupConfig groupConfig = brokerController1.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
+        groupConfig.setConsumeMessageOrderly(true);
+        brokerController1.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
+        brokerController2.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
+        brokerController3.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
+
+        QueryAssignmentResponse response = blockingStub.queryAssignment(buildQueryAssignmentRequest(topic, group));
+
+        assertQueryAssignment(response, BROKER_NUM * QUEUE_NUMBERS);
+    }
+
     public void testTransactionCheckThenCommit() {
         String topic = initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.TRANSACTION);
         String group = MQRandomUtils.getRandomConsumerGroup();
diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java
index a5ca8d6186..7f837adebe 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.test.grpc.v2;
 
-import apache.rocketmq.v2.QueryAssignmentResponse;
 import apache.rocketmq.v2.QueryRouteResponse;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication;
@@ -62,12 +61,12 @@ public class LocalGrpcIT extends GrpcBaseIT {
 
     @Test
     public void testQueryAssignment() throws Exception {
-        String topic = initTopic();
-        String group = "group";
-
-        QueryAssignmentResponse response = blockingStub.queryAssignment(buildQueryAssignmentRequest(topic, group));
+        super.testQueryAssignment();
+    }
 
-        assertQueryAssignment(response, BROKER_NUM);
+    @Test
+    public void testQueryFifoAssignment() throws Exception {
+        super.testQueryFifoAssignment();
     }
 
     @Test