You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/29 12:22:44 UTC

[rocketmq-clients] branch master updated: Polish code (#100)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new a5fc656  Polish code (#100)
a5fc656 is described below

commit a5fc656916baa4bad9b8df135023e0ef7b0b9655
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Jul 29 20:22:39 2022 +0800

    Polish code (#100)
---
 .../rocketmq/client/java/example/AsyncProducerExample.java     |  4 +++-
 .../client/java/example/AsyncSimpleConsumerExample.java        |  4 +++-
 .../rocketmq/client/java/exception/InternalErrorException.java |  4 ++++
 .../rocketmq/client/java/impl/producer/ProducerImpl.java       | 10 +++++++---
 4 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
index 8ef7eaf..1035444 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
@@ -37,7 +37,7 @@ public class AsyncProducerExample {
     private AsyncProducerExample() {
     }
 
-    public static void main(String[] args) throws ClientException, IOException {
+    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
         final ClientServiceProvider provider = ClientServiceProvider.loadService();
 
         // Credential provider is optional for client configuration.
@@ -79,6 +79,8 @@ public class AsyncProducerExample {
                 LOGGER.error("Failed to send message", throwable);
             }
         });
+        // Block to avoid exist of background threads.
+        Thread.sleep(Long.MAX_VALUE);
         // Close the producer when you don't need it anymore.
         producer.close();
     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 8d0875d..12bba02 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -43,7 +43,7 @@ public class AsyncSimpleConsumerExample {
     private AsyncSimpleConsumerExample() {
     }
 
-    public static void main(String[] args) throws ClientException, IOException {
+    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
         final ClientServiceProvider provider = ClientServiceProvider.loadService();
 
         // Credential provider is optional for client configuration.
@@ -92,6 +92,8 @@ public class AsyncSimpleConsumerExample {
             LOGGER.error("Failed to receive message from remote", t);
             return null;
         });
+        // Block to avoid exist of background threads.
+        Thread.sleep(Long.MAX_VALUE);
         // Close the simple consumer when you don't need it anymore.
         consumer.close();
     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
index baba366..975728c 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
@@ -28,6 +28,10 @@ public class InternalErrorException extends ClientException {
         super(cause);
     }
 
+    public InternalErrorException(String message) {
+        super(message);
+    }
+
     public InternalErrorException(int responseCode, String requestId, String message) {
         super(responseCode, requestId, message);
     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 04158ff..77d7664 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -59,6 +59,7 @@ import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.apache.rocketmq.client.apis.producer.Transaction;
 import org.apache.rocketmq.client.apis.producer.TransactionChecker;
 import org.apache.rocketmq.client.apis.producer.TransactionResolution;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
 import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
@@ -469,10 +470,13 @@ class ProducerImpl extends ClientImpl implements Producer {
                 // Intercept after message publishing.
                 final Duration duration = stopwatch.elapsed();
                 doAfter(MessageHookPoints.SEND, messageCommons, duration, MessageHookPointsStatus.OK);
-
+                // Should never reach here.
                 if (sendReceipts.size() != messages.size()) {
-                    LOGGER.error("[Bug] Due to an unknown reason from remote, received send receipts' quantity[{}]" +
-                        " is not equal to messages' quantity[{}]", sendReceipts.size(), messages.size());
+                    final InternalErrorException e = new InternalErrorException("[Bug] due to an"
+                        + " unknown reason from remote, received send receipt's quantity " + sendReceipts.size()
+                        + " is not equal to sent message's quantity " + messages.size());
+                    future.setException(e);
+                    return;
                 }
                 // No need more attempts.
                 future.set(sendReceipts);