You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/01/09 09:53:54 UTC

[rocketmq] branch develop updated: [ISSUE #5839] Code.ILLEGAL_POLLING_TIME is not compatible with gRPC Client <=5.0.2 (#5841)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0533816d4 [ISSUE #5839] Code.ILLEGAL_POLLING_TIME is not compatible with gRPC Client <=5.0.2 (#5841)
0533816d4 is described below

commit 0533816d42961d0ba0ea012a115261ea71dd30c4
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Jan 9 17:53:41 2023 +0800

    [ISSUE #5839] Code.ILLEGAL_POLLING_TIME is not compatible with gRPC Client <=5.0.2 (#5841)
    
    * Fix #5839: Code.ILLEGAL_POLLING_TIME is not compatible with gRPC Client <=5.0.2
    
    * Merge two tests into testReceiveMessageWithIllegalPollingTime
---
 .../grpc/v2/consumer/ReceiveMessageActivity.java   |  7 +++-
 .../v2/consumer/ReceiveMessageActivityTest.java    | 42 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 1 deletion(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index 31b841132..ddbe07083 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -49,6 +49,7 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 
 public class ReceiveMessageActivity extends AbstractMessingActivity {
     protected ReceiptHandleProcessor receiptHandleProcessor;
+    private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION = "5.0.3";
 
     public ReceiveMessageActivity(MessagingProcessor messagingProcessor, ReceiptHandleProcessor receiptHandleProcessor,
         GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) {
@@ -85,7 +86,11 @@ public class ReceiveMessageActivity extends AbstractMessingActivity {
                 if (timeRemaining >= config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
                     pollingTime = timeRemaining;
                 } else {
-                    writer.writeAndComplete(ctx, Code.ILLEGAL_POLLING_TIME, "The deadline time remaining is not enough" +
+                    final String clientVersion = ctx.getClientVersion();
+                    Code code =
+                        null == clientVersion || ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION.compareTo(clientVersion) > 0 ?
+                        Code.BAD_REQUEST : Code.ILLEGAL_POLLING_TIME;
+                    writer.writeAndComplete(ctx, code, "The deadline time remaining is not enough" +
                         " for polling, please check network condition");
                     return;
                 }
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index 4c2f7bd1c..e5aeb025d 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -25,6 +25,7 @@ import apache.rocketmq.v2.ReceiveMessageRequest;
 import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.Resource;
 import apache.rocketmq.v2.Settings;
+import com.google.protobuf.Duration;
 import com.google.protobuf.util.Durations;
 import io.grpc.stub.ServerCallStreamObserver;
 import io.grpc.stub.StreamObserver;
@@ -112,6 +113,47 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
         assertEquals(0L, pollTimeCaptor.getValue().longValue());
     }
 
+    @Test
+    public void testReceiveMessageWithIllegalPollingTime() {
+        StreamObserver<ReceiveMessageResponse> receiveStreamObserver = mock(ServerCallStreamObserver.class);
+        ArgumentCaptor<ReceiveMessageResponse> responseArgumentCaptor0 = ArgumentCaptor.forClass(ReceiveMessageResponse.class);
+        doNothing().when(receiveStreamObserver).onNext(responseArgumentCaptor0.capture());
+
+        when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType());
+
+        final ProxyContext context = createContext();
+        context.setClientVersion("5.0.2");
+        context.setRemainingMs(-1L);
+        final ReceiveMessageRequest request = ReceiveMessageRequest.newBuilder()
+            .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
+            .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
+            .setAutoRenew(false)
+            .setLongPollingTimeout(Duration.newBuilder().setSeconds(20).build())
+            .setFilterExpression(FilterExpression.newBuilder()
+                .setType(FilterType.TAG)
+                .setExpression("*")
+                .build())
+            .build();
+        this.receiveMessageActivity.receiveMessage(
+            context,
+            request,
+            receiveStreamObserver
+        );
+        assertEquals(Code.BAD_REQUEST, getResponseCodeFromReceiveMessageResponseList(responseArgumentCaptor0.getAllValues()));
+
+        ArgumentCaptor<ReceiveMessageResponse> responseArgumentCaptor1 =
+            ArgumentCaptor.forClass(ReceiveMessageResponse.class);
+        doNothing().when(receiveStreamObserver).onNext(responseArgumentCaptor1.capture());
+        context.setClientVersion("5.0.3");
+        this.receiveMessageActivity.receiveMessage(
+            context,
+            request,
+            receiveStreamObserver
+        );
+        assertEquals(Code.ILLEGAL_POLLING_TIME,
+            getResponseCodeFromReceiveMessageResponseList(responseArgumentCaptor1.getAllValues()));
+    }
+
     @Test
     public void testReceiveMessageIllegalFilter() {
         StreamObserver<ReceiveMessageResponse> receiveStreamObserver = mock(ServerCallStreamObserver.class);