You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/07/26 08:30:05 UTC

[rocketmq] branch develop updated (8034690d5 -> 326f51e3c)

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


    from 8034690d5 Merge pull request #4651 from Oliverwqcwrw/develop-sendmsgstatus-fix-32
     new 1e410977a [ISSUE #3949] return acceptMessageTypes for quering topic route.
     new 326f51e3c [ISSUE #3949] update for UT.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../rocketmq/common/message/MessageConst.java      |  1 +
 .../proxy/grpc/v2/route/RouteActivity.java         | 30 ++++++++++++++++++----
 .../proxy/grpc/v2/route/RouteActivityTest.java     | 23 +++++++++++++----
 3 files changed, 44 insertions(+), 10 deletions(-)


[rocketmq] 01/02: [ISSUE #3949] return acceptMessageTypes for quering topic route.

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 1e410977a3225625b94b10389b2104085267de37
Author: Jixiang.jjx <ji...@alibaba-inc.com>
AuthorDate: Mon Jul 25 20:51:19 2022 +0800

    [ISSUE #3949] return acceptMessageTypes for quering topic route.
---
 .../rocketmq/common/message/MessageConst.java      |  1 +
 .../proxy/grpc/v2/route/RouteActivity.java         | 30 ++++++++++++++++++----
 .../proxy/grpc/v2/route/RouteActivityTest.java     | 23 +++++++++++++----
 3 files changed, 44 insertions(+), 10 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index c5311cec5..43f47efc4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -146,5 +146,6 @@ public class MessageConst {
         STRING_HASH_SET.add(PROPERTY_TIMER_DEL_UNIQKEY);
         STRING_HASH_SET.add(PROPERTY_TIMER_DELAY_LEVEL);
         STRING_HASH_SET.add(PROPERTY_BORN_HOST);
+        STRING_HASH_SET.add(PROPERTY_BORN_TIMESTAMP);
     }
 }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index c5cf8a3e0..312423778 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -23,6 +23,7 @@ import apache.rocketmq.v2.Broker;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.Endpoints;
 import apache.rocketmq.v2.MessageQueue;
+import apache.rocketmq.v2.MessageType;
 import apache.rocketmq.v2.Permission;
 import apache.rocketmq.v2.QueryAssignmentRequest;
 import apache.rocketmq.v2.QueryAssignmentResponse;
@@ -36,6 +37,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.proxy.common.ProxyContext;
@@ -61,14 +63,14 @@ public class RouteActivity extends AbstractMessingActivity {
             validateTopic(request.getTopic());
             List<org.apache.rocketmq.proxy.common.Address> addressList = this.convertToAddressList(request.getEndpoints());
 
+            String topicName = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic());
             ProxyTopicRouteData proxyTopicRouteData = this.messagingProcessor.getTopicRouteDataForProxy(
-                ctx,
-                addressList,
-                GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()));
+                ctx, addressList, topicName);
 
             List<MessageQueue> messageQueueList = new ArrayList<>();
             Map<String, Map<Long, Broker>> brokerMap = buildBrokerMap(proxyTopicRouteData.getBrokerDatas());
 
+            TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(topicName);
             for (QueueData queueData : proxyTopicRouteData.getQueueDatas()) {
                 String brokerName = queueData.getBrokerName();
                 Map<Long, Broker> brokerIdMap = brokerMap.get(brokerName);
@@ -76,7 +78,7 @@ public class RouteActivity extends AbstractMessingActivity {
                     break;
                 }
                 for (Broker broker : brokerIdMap.values()) {
-                    messageQueueList.addAll(this.genMessageQueueFromQueueData(queueData, request.getTopic(), broker));
+                    messageQueueList.addAll(this.genMessageQueueFromQueueData(queueData, request.getTopic(), topicMessageType, broker));
                 }
             }
 
@@ -205,7 +207,7 @@ public class RouteActivity extends AbstractMessingActivity {
         return brokerMap;
     }
 
-    protected List<MessageQueue> genMessageQueueFromQueueData(QueueData queueData, Resource topic, Broker broker) {
+    protected List<MessageQueue> genMessageQueueFromQueueData(QueueData queueData, Resource topic, TopicMessageType topicMessageType, Broker broker) {
         List<MessageQueue> messageQueueList = new ArrayList<>();
 
         int r = 0;
@@ -227,6 +229,7 @@ public class RouteActivity extends AbstractMessingActivity {
             MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
                 .setId(queueIdIndex++)
                 .setPermission(Permission.READ)
+                .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
                 .build();
             messageQueueList.add(messageQueue);
         }
@@ -235,6 +238,7 @@ public class RouteActivity extends AbstractMessingActivity {
             MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
                 .setId(queueIdIndex++)
                 .setPermission(Permission.WRITE)
+                .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
                 .build();
             messageQueueList.add(messageQueue);
         }
@@ -243,10 +247,26 @@ public class RouteActivity extends AbstractMessingActivity {
             MessageQueue messageQueue = MessageQueue.newBuilder().setBroker(broker).setTopic(topic)
                 .setId(queueIdIndex++)
                 .setPermission(Permission.READ_WRITE)
+                .addAcceptMessageTypes(parseTopicMessageType(topicMessageType))
                 .build();
             messageQueueList.add(messageQueue);
         }
 
         return messageQueueList;
     }
+
+    private MessageType parseTopicMessageType(TopicMessageType topicMessageType) {
+        switch (topicMessageType) {
+            case NORMAL:
+                return MessageType.NORMAL;
+            case FIFO:
+                return MessageType.FIFO;
+            case TRANSACTION:
+                return MessageType.TRANSACTION;
+            case DELAY:
+                return MessageType.DELAY;
+            default:
+                return MessageType.MESSAGE_TYPE_UNSPECIFIED;
+        }
+    }
 }
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
index a8c79442b..52572455f 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
@@ -22,6 +22,7 @@ import apache.rocketmq.v2.AddressScheme;
 import apache.rocketmq.v2.Broker;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.Endpoints;
+import apache.rocketmq.v2.MessageType;
 import apache.rocketmq.v2.MessageQueue;
 import apache.rocketmq.v2.Permission;
 import apache.rocketmq.v2.QueryAssignmentRequest;
@@ -33,16 +34,20 @@ import com.google.common.net.HostAndPort;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
 import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
+import org.apache.rocketmq.proxy.service.metadata.LocalMetadataService;
+import org.apache.rocketmq.proxy.service.metadata.MetadataService;
 import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -93,6 +98,9 @@ public class RouteActivityTest extends BaseActivityTest {
         ArgumentCaptor<List<org.apache.rocketmq.proxy.common.Address>> addressListCaptor = ArgumentCaptor.forClass(List.class);
         when(this.messagingProcessor.getTopicRouteDataForProxy(any(), addressListCaptor.capture(), anyString()))
             .thenReturn(createProxyTopicRouteData(2, 2, 6));
+        MetadataService metadataService = Mockito.mock(LocalMetadataService.class);
+        when(this.messagingProcessor.getMetadataService()).thenReturn(metadataService);
+        when(metadataService.getTopicMessageType(anyString())).thenReturn(TopicMessageType.NORMAL);
 
         QueryRouteResponse response = this.routeActivity.queryRoute(
             createContext(),
@@ -199,40 +207,45 @@ public class RouteActivityTest extends BaseActivityTest {
     public void testGenPartitionFromQueueData() throws Exception {
         // test queueData with 8 read queues, 8 write queues, and rw permission, expect 8 rw queues.
         QueueData queueDataWith8R8WPermRW = createQueueData(8, 8, PermName.PERM_READ | PermName.PERM_WRITE);
-        List<MessageQueue> partitionWith8R8WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R8WPermRW, GRPC_TOPIC, GRPC_BROKER);
+        List<MessageQueue> partitionWith8R8WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R8WPermRW, GRPC_TOPIC, TopicMessageType.NORMAL, GRPC_BROKER);
         assertEquals(8, partitionWith8R8WPermRW.size());
+        assertEquals(8, partitionWith8R8WPermRW.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.NORMAL.getNumber()).count());
         assertEquals(8, partitionWith8R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count());
         assertEquals(0, partitionWith8R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count());
         assertEquals(0, partitionWith8R8WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count());
 
         // test queueData with 8 read queues, 8 write queues, and read only permission, expect 8 read only queues.
         QueueData queueDataWith8R8WPermR = createQueueData(8, 8, PermName.PERM_READ);
-        List<MessageQueue> partitionWith8R8WPermR = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R8WPermR, GRPC_TOPIC, GRPC_BROKER);
+        List<MessageQueue> partitionWith8R8WPermR = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R8WPermR, GRPC_TOPIC, TopicMessageType.FIFO, GRPC_BROKER);
         assertEquals(8, partitionWith8R8WPermR.size());
+        assertEquals(8, partitionWith8R8WPermR.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.FIFO.getNumber()).count());
         assertEquals(8, partitionWith8R8WPermR.stream().filter(a -> a.getPermission() == Permission.READ).count());
         assertEquals(0, partitionWith8R8WPermR.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count());
         assertEquals(0, partitionWith8R8WPermR.stream().filter(a -> a.getPermission() == Permission.WRITE).count());
 
         // test queueData with 8 read queues, 8 write queues, and write only permission, expect 8 write only queues.
         QueueData queueDataWith8R8WPermW = createQueueData(8, 8, PermName.PERM_WRITE);
-        List<MessageQueue> partitionWith8R8WPermW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R8WPermW, GRPC_TOPIC, GRPC_BROKER);
+        List<MessageQueue> partitionWith8R8WPermW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R8WPermW, GRPC_TOPIC, TopicMessageType.TRANSACTION, GRPC_BROKER);
         assertEquals(8, partitionWith8R8WPermW.size());
+        assertEquals(8, partitionWith8R8WPermW.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.TRANSACTION.getNumber()).count());
         assertEquals(8, partitionWith8R8WPermW.stream().filter(a -> a.getPermission() == Permission.WRITE).count());
         assertEquals(0, partitionWith8R8WPermW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count());
         assertEquals(0, partitionWith8R8WPermW.stream().filter(a -> a.getPermission() == Permission.READ).count());
 
         // test queueData with 8 read queues, 0 write queues, and rw permission, expect 8 read only queues.
         QueueData queueDataWith8R0WPermRW = createQueueData(8, 0, PermName.PERM_READ | PermName.PERM_WRITE);
-        List<MessageQueue> partitionWith8R0WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R0WPermRW, GRPC_TOPIC, GRPC_BROKER);
+        List<MessageQueue> partitionWith8R0WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith8R0WPermRW, GRPC_TOPIC, TopicMessageType.DELAY, GRPC_BROKER);
         assertEquals(8, partitionWith8R0WPermRW.size());
+        assertEquals(8, partitionWith8R0WPermRW.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.DELAY.getNumber()).count());
         assertEquals(8, partitionWith8R0WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count());
         assertEquals(0, partitionWith8R0WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count());
         assertEquals(0, partitionWith8R0WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count());
 
         // test queueData with 4 read queues, 8 write queues, and rw permission, expect 4 rw queues and  4 write only queues.
         QueueData queueDataWith4R8WPermRW = createQueueData(4, 8, PermName.PERM_READ | PermName.PERM_WRITE);
-        List<MessageQueue> partitionWith4R8WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith4R8WPermRW, GRPC_TOPIC, GRPC_BROKER);
+        List<MessageQueue> partitionWith4R8WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith4R8WPermRW, GRPC_TOPIC, TopicMessageType.NORMAL, GRPC_BROKER);
         assertEquals(8, partitionWith4R8WPermRW.size());
+        assertEquals(8, partitionWith4R8WPermRW.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.NORMAL.getNumber()).count());
         assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count());
         assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count());
         assertEquals(0, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count());


[rocketmq] 02/02: [ISSUE #3949] update for UT.

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 326f51e3cea20c1bdd9c1857f6855a4b74aee67a
Author: Jixiang.jjx <ji...@alibaba-inc.com>
AuthorDate: Tue Jul 26 10:18:34 2022 +0800

    [ISSUE #3949] update for UT.
---
 .../org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
index 52572455f..cab5510c7 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
@@ -243,9 +243,9 @@ public class RouteActivityTest extends BaseActivityTest {
 
         // test queueData with 4 read queues, 8 write queues, and rw permission, expect 4 rw queues and  4 write only queues.
         QueueData queueDataWith4R8WPermRW = createQueueData(4, 8, PermName.PERM_READ | PermName.PERM_WRITE);
-        List<MessageQueue> partitionWith4R8WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith4R8WPermRW, GRPC_TOPIC, TopicMessageType.NORMAL, GRPC_BROKER);
+        List<MessageQueue> partitionWith4R8WPermRW = this.routeActivity.genMessageQueueFromQueueData(queueDataWith4R8WPermRW, GRPC_TOPIC, TopicMessageType.UNSPECIFIED, GRPC_BROKER);
         assertEquals(8, partitionWith4R8WPermRW.size());
-        assertEquals(8, partitionWith4R8WPermRW.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.NORMAL.getNumber()).count());
+        assertEquals(8, partitionWith4R8WPermRW.stream().filter(a -> a.getAcceptMessageTypesValue(0) == MessageType.MESSAGE_TYPE_UNSPECIFIED.getNumber()).count());
         assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.WRITE).count());
         assertEquals(4, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ_WRITE).count());
         assertEquals(0, partitionWith4R8WPermRW.stream().filter(a -> a.getPermission() == Permission.READ).count());