You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/01/17 05:03:09 UTC
[rocketmq-clients] branch master updated: Polish java example (#347)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 96729a83 Polish java example (#347)
96729a83 is described below
commit 96729a83cd62f46788c5a737161b9f1c346d063f
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Tue Jan 17 13:03:04 2023 +0800
Polish java example (#347)
---
.../client/java/example/AsyncProducerExample.java | 15 ++++++----
.../java/example/AsyncSimpleConsumerExample.java | 32 +++++++++++++++-------
2 files changed, 32 insertions(+), 15 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
index 50632c88..7bf80986 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
@@ -20,6 +20,8 @@ package org.apache.rocketmq.client.java.example;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
@@ -72,14 +74,17 @@ public class AsyncProducerExample {
.setKeys("yourMessageKey-0e094a5f9d85")
.setBody(body)
.build();
+ // Set individual thread pool for send callback.
final CompletableFuture<SendReceipt> future = producer.sendAsync(message);
- future.whenComplete((sendReceipt, throwable) -> {
- if (null == throwable) {
- log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
- } else {
+ ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
+ future.whenCompleteAsync((sendReceipt, throwable) -> {
+ if (null != throwable) {
log.error("Failed to send message", throwable);
+ // Return early.
+ return;
}
- });
+ log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
+ }, sendCallbackExecutor);
// Block to avoid exist of background threads.
Thread.sleep(Long.MAX_VALUE);
// Close the producer when you don't need it anymore.
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 594f6367..7cd462e6 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -23,6 +23,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
@@ -75,8 +77,17 @@ public class AsyncSimpleConsumerExample {
int maxMessageNum = 16;
// Set message invisible duration after it is received.
Duration invisibleDuration = Duration.ofSeconds(15);
+ // Set individual thread pool for receive callback.
+ ExecutorService receiveCallbackExecutor = Executors.newCachedThreadPool();
+ // Set individual thread pool for ack callback.
+ ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
final CompletableFuture<List<MessageView>> future0 = consumer.receiveAsync(maxMessageNum, invisibleDuration);
- future0.thenAccept(messages -> {
+ future0.whenCompleteAsync(((messages, throwable) -> {
+ if (null != throwable) {
+ log.error("Failed to receive message from remote", throwable);
+ // Return early.
+ return;
+ }
log.info("Received {} message(s)", messages.size());
// Using messageView as key rather than message id because message id may be duplicated.
final Map<MessageView, CompletableFuture<Void>> map =
@@ -84,16 +95,17 @@ public class AsyncSimpleConsumerExample {
for (Map.Entry<MessageView, CompletableFuture<Void>> entry : map.entrySet()) {
final MessageId messageId = entry.getKey().getMessageId();
final CompletableFuture<Void> future = entry.getValue();
- future.thenAccept(v -> log.info("Message is acknowledged successfully, messageId={}", messageId))
- .exceptionally(throwable -> {
- log.error("Message is failed to be acknowledged, messageId={}", messageId);
- return null;
- });
+ future.whenCompleteAsync((v, t) -> {
+ if (null != t) {
+ log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
+ // Return early.
+ return;
+ }
+ log.info("Message is acknowledged successfully, messageId={}", messageId);
+ }, ackCallbackExecutor);
}
- }).exceptionally(t -> {
- log.error("Failed to receive message from remote", t);
- return null;
- });
+
+ }), receiveCallbackExecutor);
// Block to avoid exist of background threads.
Thread.sleep(Long.MAX_VALUE);
// Close the simple consumer when you don't need it anymore.