You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2023/01/17 05:03:09 UTC

[rocketmq-clients] branch master updated: Polish java example (#347)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 96729a83 Polish java example (#347)
96729a83 is described below

commit 96729a83cd62f46788c5a737161b9f1c346d063f
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Tue Jan 17 13:03:04 2023 +0800

    Polish java example (#347)
---
 .../client/java/example/AsyncProducerExample.java  | 15 ++++++----
 .../java/example/AsyncSimpleConsumerExample.java   | 32 +++++++++++++++-------
 2 files changed, 32 insertions(+), 15 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
index 50632c88..7bf80986 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
@@ -20,6 +20,8 @@ package org.apache.rocketmq.client.java.example;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
@@ -72,14 +74,17 @@ public class AsyncProducerExample {
             .setKeys("yourMessageKey-0e094a5f9d85")
             .setBody(body)
             .build();
+        // Set individual thread pool for send callback.
         final CompletableFuture<SendReceipt> future = producer.sendAsync(message);
-        future.whenComplete((sendReceipt, throwable) -> {
-            if (null == throwable) {
-                log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
-            } else {
+        ExecutorService sendCallbackExecutor = Executors.newCachedThreadPool();
+        future.whenCompleteAsync((sendReceipt, throwable) -> {
+            if (null != throwable) {
                 log.error("Failed to send message", throwable);
+                // Return early.
+                return;
             }
-        });
+            log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
+        }, sendCallbackExecutor);
         // Block to avoid exist of background threads.
         Thread.sleep(Long.MAX_VALUE);
         // Close the producer when you don't need it anymore.
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 594f6367..7cd462e6 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -23,6 +23,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
@@ -75,8 +77,17 @@ public class AsyncSimpleConsumerExample {
         int maxMessageNum = 16;
         // Set message invisible duration after it is received.
         Duration invisibleDuration = Duration.ofSeconds(15);
+        // Set individual thread pool for receive callback.
+        ExecutorService receiveCallbackExecutor = Executors.newCachedThreadPool();
+        // Set individual thread pool for ack callback.
+        ExecutorService ackCallbackExecutor = Executors.newCachedThreadPool();
         final CompletableFuture<List<MessageView>> future0 = consumer.receiveAsync(maxMessageNum, invisibleDuration);
-        future0.thenAccept(messages -> {
+        future0.whenCompleteAsync(((messages, throwable) -> {
+            if (null != throwable) {
+                log.error("Failed to receive message from remote", throwable);
+                // Return early.
+                return;
+            }
             log.info("Received {} message(s)", messages.size());
             // Using messageView as key rather than message id because message id may be duplicated.
             final Map<MessageView, CompletableFuture<Void>> map =
@@ -84,16 +95,17 @@ public class AsyncSimpleConsumerExample {
             for (Map.Entry<MessageView, CompletableFuture<Void>> entry : map.entrySet()) {
                 final MessageId messageId = entry.getKey().getMessageId();
                 final CompletableFuture<Void> future = entry.getValue();
-                future.thenAccept(v -> log.info("Message is acknowledged successfully, messageId={}", messageId))
-                    .exceptionally(throwable -> {
-                        log.error("Message is failed to be acknowledged, messageId={}", messageId);
-                        return null;
-                    });
+                future.whenCompleteAsync((v, t) -> {
+                    if (null != t) {
+                        log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
+                        // Return early.
+                        return;
+                    }
+                    log.info("Message is acknowledged successfully, messageId={}", messageId);
+                }, ackCallbackExecutor);
             }
-        }).exceptionally(t -> {
-            log.error("Failed to receive message from remote", t);
-            return null;
-        });
+
+        }), receiveCallbackExecutor);
         // Block to avoid exist of background threads.
         Thread.sleep(Long.MAX_VALUE);
         // Close the simple consumer when you don't need it anymore.