You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by lo...@apache.org on 2023/05/30 06:01:47 UTC
[rocketmq] branch develop updated: [ISSUE #6828] return the number of assignments equal to the number of messageQueues for order consumer (#6829)
This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 40ada807b3 [ISSUE #6828] return the number of assignments equal to the number of messageQueues for order consumer (#6829)
40ada807b3 is described below
commit 40ada807b3d649eb5db504a3f46e01a6facb1c0d
Author: lk <xd...@outlook.com>
AuthorDate: Tue May 30 14:01:25 2023 +0800
[ISSUE #6828] return the number of assignments equal to the number of messageQueues for order consumer (#6829)
---
.../proxy/grpc/v2/route/RouteActivity.java | 42 ++++++++++++++++------
.../proxy/grpc/v2/route/RouteActivityTest.java | 23 ++++++++++++
.../rocketmq/test/grpc/v2/ClusterGrpcIT.java | 11 +++---
.../apache/rocketmq/test/grpc/v2/GrpcBaseIT.java | 23 ++++++++++++
.../apache/rocketmq/test/grpc/v2/LocalGrpcIT.java | 11 +++---
5 files changed, 88 insertions(+), 22 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index 29b9034a42..eb7385f874 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -49,6 +49,7 @@ import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
public class RouteActivity extends AbstractMessingActivity {
@@ -106,6 +107,13 @@ public class RouteActivity extends AbstractMessingActivity {
addressList,
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()));
+ boolean fifo = false;
+ SubscriptionGroupConfig config = this.messagingProcessor.getSubscriptionGroupConfig(ctx,
+ GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup()));
+ if (config != null && config.isConsumeMessageOrderly()) {
+ fifo = true;
+ }
+
List<Assignment> assignments = new ArrayList<>();
Map<String, Map<Long, Broker>> brokerMap = buildBrokerMap(proxyTopicRouteData.getBrokerDatas());
for (QueueData queueData : proxyTopicRouteData.getQueueDatas()) {
@@ -113,16 +121,30 @@ public class RouteActivity extends AbstractMessingActivity {
Map<Long, Broker> brokerIdMap = brokerMap.get(queueData.getBrokerName());
if (brokerIdMap != null) {
Broker broker = brokerIdMap.get(MixAll.MASTER_ID);
- MessageQueue defaultMessageQueue = MessageQueue.newBuilder()
- .setTopic(request.getTopic())
- .setId(-1)
- .setPermission(this.convertToPermission(queueData.getPerm()))
- .setBroker(broker)
- .build();
-
- assignments.add(Assignment.newBuilder()
- .setMessageQueue(defaultMessageQueue)
- .build());
+ Permission permission = this.convertToPermission(queueData.getPerm());
+ if (fifo) {
+ for (int i = 0; i < queueData.getReadQueueNums(); i++) {
+ MessageQueue defaultMessageQueue = MessageQueue.newBuilder()
+ .setTopic(request.getTopic())
+ .setId(i)
+ .setPermission(permission)
+ .setBroker(broker)
+ .build();
+ assignments.add(Assignment.newBuilder()
+ .setMessageQueue(defaultMessageQueue)
+ .build());
+ }
+ } else {
+ MessageQueue defaultMessageQueue = MessageQueue.newBuilder()
+ .setTopic(request.getTopic())
+ .setId(-1)
+ .setPermission(permission)
+ .setBroker(broker)
+ .build();
+ assignments.add(Assignment.newBuilder()
+ .setMessageQueue(defaultMessageQueue)
+ .build());
+ }
}
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
index 30ff1c1ff7..ce98b7494d 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -191,6 +192,28 @@ public class RouteActivityTest extends BaseActivityTest {
assertEquals(grpcEndpoints, response.getAssignments(0).getMessageQueue().getBroker().getEndpoints());
}
+ @Test
+ public void testQueryFifoAssignment() throws Throwable {
+ when(this.messagingProcessor.getTopicRouteDataForProxy(any(), any(), anyString()))
+ .thenReturn(createProxyTopicRouteData(2, 2, 6));
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setConsumeMessageOrderly(true);
+ when(this.messagingProcessor.getSubscriptionGroupConfig(any(), anyString())).thenReturn(subscriptionGroupConfig);
+
+ QueryAssignmentResponse response = this.routeActivity.queryAssignment(
+ createContext(),
+ QueryAssignmentRequest.newBuilder()
+ .setEndpoints(grpcEndpoints)
+ .setTopic(GRPC_TOPIC)
+ .setGroup(GRPC_GROUP)
+ .build()
+ ).get();
+
+ assertEquals(Code.OK, response.getStatus().getCode());
+ assertEquals(2, response.getAssignmentsCount());
+ assertEquals(grpcEndpoints, response.getAssignments(0).getMessageQueue().getBroker().getEndpoints());
+ }
+
private static ProxyTopicRouteData createProxyTopicRouteData(int r, int w, int p) {
ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData();
proxyTopicRouteData.getQueueDatas().add(createQueueData(r, w, p));
diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java
index 6e3146fa59..33c3aa2fb8 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/ClusterGrpcIT.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.test.grpc.v2;
-import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryRouteResponse;
import java.time.Duration;
import java.util.Map;
@@ -74,12 +73,12 @@ public class ClusterGrpcIT extends GrpcBaseIT {
@Test
public void testQueryAssignment() throws Exception {
- String topic = initTopic();
- String group = "group";
-
- QueryAssignmentResponse response = blockingStub.queryAssignment(buildQueryAssignmentRequest(topic, group));
+ super.testQueryAssignment();
+ }
- assertQueryAssignment(response, BROKER_NUM);
+ @Test
+ public void testQueryFifoAssignment() throws Exception {
+ super.testQueryFifoAssignment();
}
@Test
diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
index 243c72dec5..5d8aec822a 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/GrpcBaseIT.java
@@ -229,6 +229,29 @@ public class GrpcBaseIT extends BaseConf {
.build());
}
+ public void testQueryAssignment() throws Exception {
+ String topic = initTopic();
+ String group = "group";
+
+ QueryAssignmentResponse response = blockingStub.queryAssignment(buildQueryAssignmentRequest(topic, group));
+
+ assertQueryAssignment(response, BROKER_NUM);
+ }
+
+ public void testQueryFifoAssignment() throws Exception {
+ String topic = initTopic(TopicMessageType.FIFO);
+ String group = MQRandomUtils.getRandomConsumerGroup();
+ SubscriptionGroupConfig groupConfig = brokerController1.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
+ groupConfig.setConsumeMessageOrderly(true);
+ brokerController1.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
+ brokerController2.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
+ brokerController3.getSubscriptionGroupManager().updateSubscriptionGroupConfig(groupConfig);
+
+ QueryAssignmentResponse response = blockingStub.queryAssignment(buildQueryAssignmentRequest(topic, group));
+
+ assertQueryAssignment(response, BROKER_NUM * QUEUE_NUMBERS);
+ }
+
public void testTransactionCheckThenCommit() {
String topic = initTopicOnSampleTopicBroker(BROKER1_NAME, TopicMessageType.TRANSACTION);
String group = MQRandomUtils.getRandomConsumerGroup();
diff --git a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java
index a5ca8d6186..7f837adebe 100644
--- a/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/grpc/v2/LocalGrpcIT.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.test.grpc.v2;
-import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryRouteResponse;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication;
@@ -62,12 +61,12 @@ public class LocalGrpcIT extends GrpcBaseIT {
@Test
public void testQueryAssignment() throws Exception {
- String topic = initTopic();
- String group = "group";
-
- QueryAssignmentResponse response = blockingStub.queryAssignment(buildQueryAssignmentRequest(topic, group));
+ super.testQueryAssignment();
+ }
- assertQueryAssignment(response, BROKER_NUM);
+ @Test
+ public void testQueryFifoAssignment() throws Exception {
+ super.testQueryFifoAssignment();
}
@Test