You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/27 07:44:24 UTC
[rocketmq-clients] branch master updated: 1. Enable message type validate (#75)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new fbac59a 1. Enable message type validate (#75)
fbac59a is described below
commit fbac59afaa0b1c2738a6769bdf5e9351819c9ee3
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Jul 27 15:44:20 2022 +0800
1. Enable message type validate (#75)
2. Remove ReceiveMessageResult#ok
---
.../rocketmq/client/java/impl/ClientSettings.java | 2 +-
.../client/java/impl/consumer/ConsumerImpl.java | 6 ++-
.../java/impl/consumer/ProcessQueueImpl.java | 14 ++-----
.../java/impl/consumer/ReceiveMessageResult.java | 49 +++++++---------------
.../java/impl/consumer/SimpleConsumerImpl.java | 2 +-
.../client/java/impl/producer/ProducerImpl.java | 6 ++-
.../java/impl/producer/ProducerSettings.java | 8 +++-
.../java/retry/CustomizedBackoffRetryPolicy.java | 15 +++++++
.../java/retry/ExponentialBackoffRetryPolicy.java | 15 +++++++
.../rocketmq/client/java/retry/RetryPolicy.java | 8 ++++
.../client/java/route/MessageQueueImpl.java | 4 +-
.../java/impl/consumer/ProcessQueueImplTest.java | 3 +-
12 files changed, 75 insertions(+), 57 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
index 3183597..bc50c4f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
@@ -28,7 +28,7 @@ public abstract class ClientSettings {
protected final String clientId;
protected final ClientType clientType;
protected final Endpoints accessPoint;
- protected RetryPolicy retryPolicy;
+ protected volatile RetryPolicy retryPolicy;
protected final Duration requestTimeout;
protected final SettableFuture<Void> arrivedFuture;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 89687f8..d527eef 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -83,7 +83,7 @@ abstract class ConsumerImpl extends ClientImpl {
final ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> future =
clientManager.receiveMessage(endpoints,
metadata, request, timeout);
- return Futures.transform(future, context -> {
+ return Futures.transformAsync(future, context -> {
final Iterator<ReceiveMessageResponse> it = context.getResponse();
Status status = Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
.setMessage("status was not set by server")
@@ -111,7 +111,9 @@ abstract class ConsumerImpl extends ClientImpl {
final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote);
messages.add(view);
}
- return new ReceiveMessageResult(endpoints, context.getContext().getRequestId(), status, messages);
+ final ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(endpoints,
+ context.getContext().getRequestId(), status, messages);
+ return Futures.immediateFuture(receiveMessageResult);
}, MoreExecutors.directExecutor());
} catch (Throwable t) {
return Futures.immediateFailedFuture(t);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index ccd0899..d4271bf 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -238,13 +238,9 @@ class ProcessQueueImpl implements ProcessQueue {
public void onSuccess(ReceiveMessageResult result) {
// Intercept after message reception.
final Duration duration = stopwatch.elapsed();
- final List<MessageCommon> commons = result.getMessages().stream()
+ final List<MessageCommon> commons = result.getMessageViewImpls().stream()
.map(MessageViewImpl::getMessageCommon).collect(Collectors.toList());
- if (result.ok()) {
- consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.OK);
- } else {
- consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.ERROR);
- }
+ consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.OK);
try {
onReceiveMessageResult(result);
@@ -322,11 +318,7 @@ class ProcessQueueImpl implements ProcessQueue {
}
private void onReceiveMessageResult(ReceiveMessageResult result) {
- final List<MessageViewImpl> messages = result.getMessages();
- if (!result.ok()) {
- receiveMessageLater();
- return;
- }
+ final List<MessageViewImpl> messages = result.getMessageViewImpls();
if (!messages.isEmpty()) {
cacheMessages(messages);
consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
index 9f4fa2e..cad65b2 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
@@ -36,17 +36,17 @@ import org.apache.rocketmq.client.java.route.Endpoints;
public class ReceiveMessageResult {
private final Endpoints endpoints;
- private final ClientException exception;
-
private final List<MessageViewImpl> messages;
- public ReceiveMessageResult(Endpoints endpoints, String requestId, Status status, List<MessageViewImpl> messages) {
+ public ReceiveMessageResult(Endpoints endpoints, String requestId, Status status, List<MessageViewImpl> messages)
+ throws ClientException {
this.endpoints = endpoints;
final Code code = status.getCode();
+ final int codeNumber = code.getNumber();
+ final String statusMessage = status.getMessage();
switch (code) {
case OK:
case MESSAGE_NOT_FOUND:
- this.exception = null;
break;
case BAD_REQUEST:
case ILLEGAL_TOPIC:
@@ -54,50 +54,29 @@ public class ReceiveMessageResult {
case ILLEGAL_FILTER_EXPRESSION:
case ILLEGAL_INVISIBLE_TIME:
case CLIENT_ID_REQUIRED:
- this.exception = new BadRequestException(code.getNumber(), requestId, status.getMessage());
- break;
+ throw new BadRequestException(codeNumber, requestId, statusMessage);
case UNAUTHORIZED:
- this.exception = new UnauthorizedException(code.getNumber(), requestId, status.getMessage());
- break;
+ throw new UnauthorizedException(codeNumber, requestId, statusMessage);
case FORBIDDEN:
- this.exception = new ForbiddenException(code.getNumber(), requestId, status.getMessage());
- break;
+ throw new ForbiddenException(codeNumber, requestId, statusMessage);
case NOT_FOUND:
case TOPIC_NOT_FOUND:
case CONSUMER_GROUP_NOT_FOUND:
- this.exception = new NotFoundException(code.getNumber(), requestId, status.getMessage());
- break;
+ throw new NotFoundException(codeNumber, requestId, statusMessage);
case TOO_MANY_REQUESTS:
- this.exception = new TooManyRequestsException(code.getNumber(), requestId, status.getMessage());
- break;
+ throw new TooManyRequestsException(codeNumber, requestId, statusMessage);
case INTERNAL_ERROR:
case INTERNAL_SERVER_ERROR:
- this.exception = new InternalErrorException(code.getNumber(), requestId, status.getMessage());
- break;
+ throw new InternalErrorException(codeNumber, requestId, statusMessage);
case PROXY_TIMEOUT:
- this.exception = new ProxyTimeoutException(code.getNumber(), requestId, status.getMessage());
- break;
+ throw new ProxyTimeoutException(codeNumber, requestId, statusMessage);
default:
- this.exception = new UnsupportedException(code.getNumber(), requestId, status.getMessage());
+ throw new UnsupportedException(codeNumber, requestId, statusMessage);
}
this.messages = messages;
}
- /**
- * Indicates that the result is ok or not.
- *
- * <p>The result is ok if the status code is {@link Code#OK} or {@link Code#MESSAGE_NOT_FOUND}.
- *
- * @return true if the result is ok, false otherwise.
- */
- public boolean ok() {
- return null == exception;
- }
-
- public List<MessageView> checkAndGetMessages() throws ClientException {
- if (null != exception) {
- throw exception;
- }
+ public List<MessageView> getMessageViews() {
return new ArrayList<>(messages);
}
@@ -105,7 +84,7 @@ public class ReceiveMessageResult {
return endpoints;
}
- public List<MessageViewImpl> getMessages() {
+ public List<MessageViewImpl> getMessageViewImpls() {
return messages;
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index 6eb780e..d90edf8 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -205,7 +205,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
invisibleDuration);
return receiveMessage(request, mq, awaitDuration);
}, MoreExecutors.directExecutor());
- return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.checkAndGetMessages()),
+ return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.getMessageViews()),
clientCallbackExecutor);
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index daf98df..bb4ae49 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -436,9 +436,11 @@ class ProducerImpl extends ClientImpl implements Producer {
}
// Calculate the current message queue.
final MessageQueueImpl messageQueue = candidates.get(IntMath.mod(attempt - 1, candidates.size()));
- if (producerSettings.isValidateMessageType() && !messageQueue.matchMessageType(messageType)) {
+ final List<MessageType> acceptMessageTypes = messageQueue.getAcceptMessageTypes();
+ if (producerSettings.isValidateMessageType() && !acceptMessageTypes.contains(messageType)) {
final IllegalArgumentException e = new IllegalArgumentException("Current message type not match with "
- + "topic accept message types");
+ + "topic accept message types, topic=" + topic + ", actualMessageType=" + messageType + ", "
+ + "acceptMessageTypes={}" + acceptMessageTypes);
future.setException(e);
return;
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index f1eef34..687fb96 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.client.java.impl.ClientSettings;
import org.apache.rocketmq.client.java.impl.ClientType;
import org.apache.rocketmq.client.java.impl.UserAgent;
import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
+import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,11 +63,10 @@ public class ProducerSettings extends ClientSettings {
public Settings toProtobuf() {
final Publishing publishing = Publishing.newBuilder()
.addAllTopics(topics.stream().map(name -> Resource.newBuilder().setName(name).build())
- .collect(Collectors.toList())).build();
+ .collect(Collectors.toList())).setValidateMessageType(validateMessageType).build();
final Settings.Builder builder = Settings.newBuilder()
.setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
- this.validateMessageType = publishing.getValidateMessageType();
return builder.setBackoffPolicy(retryPolicy.toProtobuf()).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
}
@@ -78,7 +78,11 @@ public class ProducerSettings extends ClientSettings {
+ "clientType={}", clientId, pubSubCase, clientType);
return;
}
+ final apache.rocketmq.v2.RetryPolicy backoffPolicy = settings.getBackoffPolicy();
final Publishing publishing = settings.getPublishing();
+ RetryPolicy exist = retryPolicy;
+ this.retryPolicy = exist.updateBackoff(backoffPolicy);
+ this.validateMessageType = settings.getPublishing().getValidateMessageType();
this.maxBodySizeBytes = publishing.getMaxBodySize();
this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
index 2157613..59b5e4c 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
@@ -67,6 +67,21 @@ public class CustomizedBackoffRetryPolicy implements RetryPolicy {
return new CustomizedBackoffRetryPolicy(durations, retryPolicy.getMaxAttempts());
}
+ @Override
+ public RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy) {
+ if (!CUSTOMIZED_BACKOFF.equals(retryPolicy.getStrategyCase())) {
+ throw new IllegalArgumentException("strategy must be customized backoff");
+ }
+ return updateBackoff(retryPolicy.getCustomizedBackoff());
+ }
+
+ private RetryPolicy updateBackoff(CustomizedBackoff backoff) {
+ final List<Duration> durations = backoff.getNextList().stream()
+ .map(duration -> Duration.ofNanos(Durations.toNanos(duration)))
+ .collect(Collectors.toList());
+ return new CustomizedBackoffRetryPolicy(durations, maxAttempts);
+ }
+
@Override
public apache.rocketmq.v2.RetryPolicy toProtobuf() {
CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder()
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
index c4e0f50..8b25579 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
@@ -89,6 +89,21 @@ public class ExponentialBackoffRetryPolicy implements RetryPolicy {
exponentialBackoff.getMultiplier());
}
+ @Override
+ public RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy) {
+ if (!EXPONENTIAL_BACKOFF.equals(retryPolicy.getStrategyCase())) {
+ throw new IllegalArgumentException("strategy must be exponential backoff");
+ }
+ return updateBackoff(retryPolicy.getExponentialBackoff());
+ }
+
+ private RetryPolicy updateBackoff(ExponentialBackoff backoff) {
+ return new ExponentialBackoffRetryPolicy(maxAttempts,
+ Duration.ofNanos(Durations.toNanos(backoff.getInitial())),
+ Duration.ofNanos(Durations.toNanos(backoff.getMax())),
+ backoff.getMultiplier());
+ }
+
@Override
public apache.rocketmq.v2.RetryPolicy toProtobuf() {
ExponentialBackoff exponentialBackoff = ExponentialBackoff.newBuilder()
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
index 2deaed0..43d9fa2 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
@@ -38,5 +38,13 @@ public interface RetryPolicy {
*/
Duration getNextAttemptDelay(int attempt);
+ /**
+ * Update the retry backoff strategy and generate a new one.
+ *
+ * @param retryPolicy retry policy which contains the backoff strategy.
+ * @return the new retry policy.
+ */
+ RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy);
+
apache.rocketmq.v2.RetryPolicy toProtobuf();
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
index a9c4635..de36c5e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
@@ -61,8 +61,8 @@ public class MessageQueueImpl {
return this.queueId;
}
- public boolean matchMessageType(MessageType messageType) {
- return acceptMessageTypes.contains(messageType);
+ public List<MessageType> getAcceptMessageTypes() {
+ return acceptMessageTypes;
}
public apache.rocketmq.v2.MessageQueue toProtobuf() {
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 3cd91b9..24fc847 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -41,6 +41,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
@@ -141,7 +142,7 @@ public class ProcessQueueImplTest extends TestBase {
}
@Test
- public void testReceiveMessageImmediately() throws InterruptedException {
+ public void testReceiveMessageImmediately() throws InterruptedException, ClientException {
final int cachedMessagesCountThresholdPerQueue = 8;
when(pushConsumer.cacheMessageCountThresholdPerQueue()).thenReturn(cachedMessagesCountThresholdPerQueue);
final int cachedMessageBytesThresholdPerQueue = 1024;