You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/01/09 09:53:54 UTC
[rocketmq] branch develop updated: [ISSUE #5839] Code.ILLEGAL_POLLING_TIME is not compatible with gRPC Client <=5.0.2 (#5841)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0533816d4 [ISSUE #5839] Code.ILLEGAL_POLLING_TIME is not compatible with gRPC Client <=5.0.2 (#5841)
0533816d4 is described below
commit 0533816d42961d0ba0ea012a115261ea71dd30c4
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Jan 9 17:53:41 2023 +0800
[ISSUE #5839] Code.ILLEGAL_POLLING_TIME is not compatible with gRPC Client <=5.0.2 (#5841)
* Fix #5839: Code.ILLEGAL_POLLING_TIME is not compatible with gRPC Client <=5.0.2
* Merge two tests into testReceiveMessageWithIllegalPollingTime
---
.../grpc/v2/consumer/ReceiveMessageActivity.java | 7 +++-
.../v2/consumer/ReceiveMessageActivityTest.java | 42 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 1 deletion(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index 31b841132..ddbe07083 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -49,6 +49,7 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
public class ReceiveMessageActivity extends AbstractMessingActivity {
protected ReceiptHandleProcessor receiptHandleProcessor;
+ private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION = "5.0.3";
public ReceiveMessageActivity(MessagingProcessor messagingProcessor, ReceiptHandleProcessor receiptHandleProcessor,
GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) {
@@ -85,7 +86,11 @@ public class ReceiveMessageActivity extends AbstractMessingActivity {
if (timeRemaining >= config.getGrpcClientConsumerMinLongPollingTimeoutMillis()) {
pollingTime = timeRemaining;
} else {
- writer.writeAndComplete(ctx, Code.ILLEGAL_POLLING_TIME, "The deadline time remaining is not enough" +
+ final String clientVersion = ctx.getClientVersion();
+ Code code =
+ null == clientVersion || ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION.compareTo(clientVersion) > 0 ?
+ Code.BAD_REQUEST : Code.ILLEGAL_POLLING_TIME;
+ writer.writeAndComplete(ctx, code, "The deadline time remaining is not enough" +
" for polling, please check network condition");
return;
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index 4c2f7bd1c..e5aeb025d 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -25,6 +25,7 @@ import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.Resource;
import apache.rocketmq.v2.Settings;
+import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
@@ -112,6 +113,47 @@ public class ReceiveMessageActivityTest extends BaseActivityTest {
assertEquals(0L, pollTimeCaptor.getValue().longValue());
}
+ @Test
+ public void testReceiveMessageWithIllegalPollingTime() {
+ StreamObserver<ReceiveMessageResponse> receiveStreamObserver = mock(ServerCallStreamObserver.class);
+ ArgumentCaptor<ReceiveMessageResponse> responseArgumentCaptor0 = ArgumentCaptor.forClass(ReceiveMessageResponse.class);
+ doNothing().when(receiveStreamObserver).onNext(responseArgumentCaptor0.capture());
+
+ when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType());
+
+ final ProxyContext context = createContext();
+ context.setClientVersion("5.0.2");
+ context.setRemainingMs(-1L);
+ final ReceiveMessageRequest request = ReceiveMessageRequest.newBuilder()
+ .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
+ .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
+ .setAutoRenew(false)
+ .setLongPollingTimeout(Duration.newBuilder().setSeconds(20).build())
+ .setFilterExpression(FilterExpression.newBuilder()
+ .setType(FilterType.TAG)
+ .setExpression("*")
+ .build())
+ .build();
+ this.receiveMessageActivity.receiveMessage(
+ context,
+ request,
+ receiveStreamObserver
+ );
+ assertEquals(Code.BAD_REQUEST, getResponseCodeFromReceiveMessageResponseList(responseArgumentCaptor0.getAllValues()));
+
+ ArgumentCaptor<ReceiveMessageResponse> responseArgumentCaptor1 =
+ ArgumentCaptor.forClass(ReceiveMessageResponse.class);
+ doNothing().when(receiveStreamObserver).onNext(responseArgumentCaptor1.capture());
+ context.setClientVersion("5.0.3");
+ this.receiveMessageActivity.receiveMessage(
+ context,
+ request,
+ receiveStreamObserver
+ );
+ assertEquals(Code.ILLEGAL_POLLING_TIME,
+ getResponseCodeFromReceiveMessageResponseList(responseArgumentCaptor1.getAllValues()));
+ }
+
@Test
public void testReceiveMessageIllegalFilter() {
StreamObserver<ReceiveMessageResponse> receiveStreamObserver = mock(ServerCallStreamObserver.class);