You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/27 07:44:24 UTC

[rocketmq-clients] branch master updated: 1. Enable message type validate (#75)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new fbac59a  1. Enable message type validate (#75)
fbac59a is described below

commit fbac59afaa0b1c2738a6769bdf5e9351819c9ee3
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Jul 27 15:44:20 2022 +0800

    1. Enable message type validate (#75)
    
    2. Remove ReceiveMessageResult#ok
---
 .../rocketmq/client/java/impl/ClientSettings.java  |  2 +-
 .../client/java/impl/consumer/ConsumerImpl.java    |  6 ++-
 .../java/impl/consumer/ProcessQueueImpl.java       | 14 ++-----
 .../java/impl/consumer/ReceiveMessageResult.java   | 49 +++++++---------------
 .../java/impl/consumer/SimpleConsumerImpl.java     |  2 +-
 .../client/java/impl/producer/ProducerImpl.java    |  6 ++-
 .../java/impl/producer/ProducerSettings.java       |  8 +++-
 .../java/retry/CustomizedBackoffRetryPolicy.java   | 15 +++++++
 .../java/retry/ExponentialBackoffRetryPolicy.java  | 15 +++++++
 .../rocketmq/client/java/retry/RetryPolicy.java    |  8 ++++
 .../client/java/route/MessageQueueImpl.java        |  4 +-
 .../java/impl/consumer/ProcessQueueImplTest.java   |  3 +-
 12 files changed, 75 insertions(+), 57 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
index 3183597..bc50c4f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
@@ -28,7 +28,7 @@ public abstract class ClientSettings {
     protected final String clientId;
     protected final ClientType clientType;
     protected final Endpoints accessPoint;
-    protected RetryPolicy retryPolicy;
+    protected volatile RetryPolicy retryPolicy;
     protected final Duration requestTimeout;
     protected final SettableFuture<Void> arrivedFuture;
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 89687f8..d527eef 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -83,7 +83,7 @@ abstract class ConsumerImpl extends ClientImpl {
             final ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> future =
                 clientManager.receiveMessage(endpoints,
                     metadata, request, timeout);
-            return Futures.transform(future, context -> {
+            return Futures.transformAsync(future, context -> {
                 final Iterator<ReceiveMessageResponse> it = context.getResponse();
                 Status status = Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
                     .setMessage("status was not set by server")
@@ -111,7 +111,9 @@ abstract class ConsumerImpl extends ClientImpl {
                     final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote);
                     messages.add(view);
                 }
-                return new ReceiveMessageResult(endpoints, context.getContext().getRequestId(), status, messages);
+                final ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(endpoints,
+                    context.getContext().getRequestId(), status, messages);
+                return Futures.immediateFuture(receiveMessageResult);
             }, MoreExecutors.directExecutor());
         } catch (Throwable t) {
             return Futures.immediateFailedFuture(t);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index ccd0899..d4271bf 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -238,13 +238,9 @@ class ProcessQueueImpl implements ProcessQueue {
                 public void onSuccess(ReceiveMessageResult result) {
                     // Intercept after message reception.
                     final Duration duration = stopwatch.elapsed();
-                    final List<MessageCommon> commons = result.getMessages().stream()
+                    final List<MessageCommon> commons = result.getMessageViewImpls().stream()
                         .map(MessageViewImpl::getMessageCommon).collect(Collectors.toList());
-                    if (result.ok()) {
-                        consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.OK);
-                    } else {
-                        consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.ERROR);
-                    }
+                    consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.OK);
 
                     try {
                         onReceiveMessageResult(result);
@@ -322,11 +318,7 @@ class ProcessQueueImpl implements ProcessQueue {
     }
 
     private void onReceiveMessageResult(ReceiveMessageResult result) {
-        final List<MessageViewImpl> messages = result.getMessages();
-        if (!result.ok()) {
-            receiveMessageLater();
-            return;
-        }
+        final List<MessageViewImpl> messages = result.getMessageViewImpls();
         if (!messages.isEmpty()) {
             cacheMessages(messages);
             consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
index 9f4fa2e..cad65b2 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
@@ -36,17 +36,17 @@ import org.apache.rocketmq.client.java.route.Endpoints;
 
 public class ReceiveMessageResult {
     private final Endpoints endpoints;
-    private final ClientException exception;
-
     private final List<MessageViewImpl> messages;
 
-    public ReceiveMessageResult(Endpoints endpoints, String requestId, Status status, List<MessageViewImpl> messages) {
+    public ReceiveMessageResult(Endpoints endpoints, String requestId, Status status, List<MessageViewImpl> messages)
+        throws ClientException {
         this.endpoints = endpoints;
         final Code code = status.getCode();
+        final int codeNumber = code.getNumber();
+        final String statusMessage = status.getMessage();
         switch (code) {
             case OK:
             case MESSAGE_NOT_FOUND:
-                this.exception = null;
                 break;
             case BAD_REQUEST:
             case ILLEGAL_TOPIC:
@@ -54,50 +54,29 @@ public class ReceiveMessageResult {
             case ILLEGAL_FILTER_EXPRESSION:
             case ILLEGAL_INVISIBLE_TIME:
             case CLIENT_ID_REQUIRED:
-                this.exception = new BadRequestException(code.getNumber(), requestId, status.getMessage());
-                break;
+                throw new BadRequestException(codeNumber, requestId, statusMessage);
             case UNAUTHORIZED:
-                this.exception = new UnauthorizedException(code.getNumber(), requestId, status.getMessage());
-                break;
+                throw new UnauthorizedException(codeNumber, requestId, statusMessage);
             case FORBIDDEN:
-                this.exception = new ForbiddenException(code.getNumber(), requestId, status.getMessage());
-                break;
+                throw new ForbiddenException(codeNumber, requestId, statusMessage);
             case NOT_FOUND:
             case TOPIC_NOT_FOUND:
             case CONSUMER_GROUP_NOT_FOUND:
-                this.exception = new NotFoundException(code.getNumber(), requestId, status.getMessage());
-                break;
+                throw new NotFoundException(codeNumber, requestId, statusMessage);
             case TOO_MANY_REQUESTS:
-                this.exception = new TooManyRequestsException(code.getNumber(), requestId, status.getMessage());
-                break;
+                throw new TooManyRequestsException(codeNumber, requestId, statusMessage);
             case INTERNAL_ERROR:
             case INTERNAL_SERVER_ERROR:
-                this.exception = new InternalErrorException(code.getNumber(), requestId, status.getMessage());
-                break;
+                throw new InternalErrorException(codeNumber, requestId, statusMessage);
             case PROXY_TIMEOUT:
-                this.exception = new ProxyTimeoutException(code.getNumber(), requestId, status.getMessage());
-                break;
+                throw new ProxyTimeoutException(codeNumber, requestId, statusMessage);
             default:
-                this.exception = new UnsupportedException(code.getNumber(), requestId, status.getMessage());
+                throw new UnsupportedException(codeNumber, requestId, statusMessage);
         }
         this.messages = messages;
     }
 
-    /**
-     * Indicates that the result is ok or not.
-     *
-     * <p>The result is ok if the status code is {@link Code#OK} or {@link Code#MESSAGE_NOT_FOUND}.
-     *
-     * @return true if the result is ok, false otherwise.
-     */
-    public boolean ok() {
-        return null == exception;
-    }
-
-    public List<MessageView> checkAndGetMessages() throws ClientException {
-        if (null != exception) {
-            throw exception;
-        }
+    public List<MessageView> getMessageViews() {
         return new ArrayList<>(messages);
     }
 
@@ -105,7 +84,7 @@ public class ReceiveMessageResult {
         return endpoints;
     }
 
-    public List<MessageViewImpl> getMessages() {
+    public List<MessageViewImpl> getMessageViewImpls() {
         return messages;
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 6eb780e..d90edf8 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -205,7 +205,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
                 invisibleDuration);
             return receiveMessage(request, mq, awaitDuration);
         }, MoreExecutors.directExecutor());
-        return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.checkAndGetMessages()),
+        return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.getMessageViews()),
             clientCallbackExecutor);
     }
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index daf98df..bb4ae49 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -436,9 +436,11 @@ class ProducerImpl extends ClientImpl implements Producer {
         }
         // Calculate the current message queue.
         final MessageQueueImpl messageQueue = candidates.get(IntMath.mod(attempt - 1, candidates.size()));
-        if (producerSettings.isValidateMessageType() && !messageQueue.matchMessageType(messageType)) {
+        final List<MessageType> acceptMessageTypes = messageQueue.getAcceptMessageTypes();
+        if (producerSettings.isValidateMessageType() && !acceptMessageTypes.contains(messageType)) {
             final IllegalArgumentException e = new IllegalArgumentException("Current message type not match with "
-                + "topic accept message types");
+                + "topic accept message types, topic=" + topic + ", actualMessageType=" + messageType + ", "
+                + "acceptMessageTypes={}" + acceptMessageTypes);
             future.setException(e);
             return;
         }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index f1eef34..687fb96 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.client.java.impl.ClientSettings;
 import org.apache.rocketmq.client.java.impl.ClientType;
 import org.apache.rocketmq.client.java.impl.UserAgent;
 import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
+import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,11 +63,10 @@ public class ProducerSettings extends ClientSettings {
     public Settings toProtobuf() {
         final Publishing publishing = Publishing.newBuilder()
             .addAllTopics(topics.stream().map(name -> Resource.newBuilder().setName(name).build())
-                .collect(Collectors.toList())).build();
+                .collect(Collectors.toList())).setValidateMessageType(validateMessageType).build();
         final Settings.Builder builder = Settings.newBuilder()
             .setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
             .setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
-        this.validateMessageType = publishing.getValidateMessageType();
         return builder.setBackoffPolicy(retryPolicy.toProtobuf()).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
     }
 
@@ -78,7 +78,11 @@ public class ProducerSettings extends ClientSettings {
                 + "clientType={}", clientId, pubSubCase, clientType);
             return;
         }
+        final apache.rocketmq.v2.RetryPolicy backoffPolicy = settings.getBackoffPolicy();
         final Publishing publishing = settings.getPublishing();
+        RetryPolicy exist = retryPolicy;
+        this.retryPolicy = exist.updateBackoff(backoffPolicy);
+        this.validateMessageType = settings.getPublishing().getValidateMessageType();
         this.maxBodySizeBytes = publishing.getMaxBodySize();
         this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
index 2157613..59b5e4c 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
@@ -67,6 +67,21 @@ public class CustomizedBackoffRetryPolicy implements RetryPolicy {
         return new CustomizedBackoffRetryPolicy(durations, retryPolicy.getMaxAttempts());
     }
 
+    @Override
+    public RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy) {
+        if (!CUSTOMIZED_BACKOFF.equals(retryPolicy.getStrategyCase())) {
+            throw new IllegalArgumentException("strategy must be customized backoff");
+        }
+        return updateBackoff(retryPolicy.getCustomizedBackoff());
+    }
+
+    private RetryPolicy updateBackoff(CustomizedBackoff backoff) {
+        final List<Duration> durations = backoff.getNextList().stream()
+            .map(duration -> Duration.ofNanos(Durations.toNanos(duration)))
+            .collect(Collectors.toList());
+        return new CustomizedBackoffRetryPolicy(durations, maxAttempts);
+    }
+
     @Override
     public apache.rocketmq.v2.RetryPolicy toProtobuf() {
         CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder()
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
index c4e0f50..8b25579 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
@@ -89,6 +89,21 @@ public class ExponentialBackoffRetryPolicy implements RetryPolicy {
             exponentialBackoff.getMultiplier());
     }
 
+    @Override
+    public RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy) {
+        if (!EXPONENTIAL_BACKOFF.equals(retryPolicy.getStrategyCase())) {
+            throw new IllegalArgumentException("strategy must be exponential backoff");
+        }
+        return updateBackoff(retryPolicy.getExponentialBackoff());
+    }
+
+    private RetryPolicy updateBackoff(ExponentialBackoff backoff) {
+        return new ExponentialBackoffRetryPolicy(maxAttempts,
+            Duration.ofNanos(Durations.toNanos(backoff.getInitial())),
+            Duration.ofNanos(Durations.toNanos(backoff.getMax())),
+            backoff.getMultiplier());
+    }
+
     @Override
     public apache.rocketmq.v2.RetryPolicy toProtobuf() {
         ExponentialBackoff exponentialBackoff = ExponentialBackoff.newBuilder()
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
index 2deaed0..43d9fa2 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
@@ -38,5 +38,13 @@ public interface RetryPolicy {
      */
     Duration getNextAttemptDelay(int attempt);
 
+    /**
+     * Update the retry backoff strategy and generate a new one.
+     *
+     * @param retryPolicy retry policy which contains the backoff strategy.
+     * @return the new retry policy.
+     */
+    RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy);
+
     apache.rocketmq.v2.RetryPolicy toProtobuf();
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
index a9c4635..de36c5e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
@@ -61,8 +61,8 @@ public class MessageQueueImpl {
         return this.queueId;
     }
 
-    public boolean matchMessageType(MessageType messageType) {
-        return acceptMessageTypes.contains(messageType);
+    public List<MessageType> getAcceptMessageTypes() {
+        return acceptMessageTypes;
     }
 
     public apache.rocketmq.v2.MessageQueue toProtobuf() {
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 3cd91b9..24fc847 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -41,6 +41,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
@@ -141,7 +142,7 @@ public class ProcessQueueImplTest extends TestBase {
     }
 
     @Test
-    public void testReceiveMessageImmediately() throws InterruptedException {
+    public void testReceiveMessageImmediately() throws InterruptedException, ClientException {
         final int cachedMessagesCountThresholdPerQueue = 8;
         when(pushConsumer.cacheMessageCountThresholdPerQueue()).thenReturn(cachedMessagesCountThresholdPerQueue);
         final int cachedMessageBytesThresholdPerQueue = 1024;