You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/29 12:22:44 UTC
[rocketmq-clients] branch master updated: Polish code (#100)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new a5fc656 Polish code (#100)
a5fc656 is described below
commit a5fc656916baa4bad9b8df135023e0ef7b0b9655
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Jul 29 20:22:39 2022 +0800
Polish code (#100)
---
.../rocketmq/client/java/example/AsyncProducerExample.java | 4 +++-
.../client/java/example/AsyncSimpleConsumerExample.java | 4 +++-
.../rocketmq/client/java/exception/InternalErrorException.java | 4 ++++
.../rocketmq/client/java/impl/producer/ProducerImpl.java | 10 +++++++---
4 files changed, 17 insertions(+), 5 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
index 8ef7eaf..1035444 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncProducerExample.java
@@ -37,7 +37,7 @@ public class AsyncProducerExample {
private AsyncProducerExample() {
}
- public static void main(String[] args) throws ClientException, IOException {
+ public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// Credential provider is optional for client configuration.
@@ -79,6 +79,8 @@ public class AsyncProducerExample {
LOGGER.error("Failed to send message", throwable);
}
});
+ // Block to avoid exist of background threads.
+ Thread.sleep(Long.MAX_VALUE);
// Close the producer when you don't need it anymore.
producer.close();
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 8d0875d..12bba02 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -43,7 +43,7 @@ public class AsyncSimpleConsumerExample {
private AsyncSimpleConsumerExample() {
}
- public static void main(String[] args) throws ClientException, IOException {
+ public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// Credential provider is optional for client configuration.
@@ -92,6 +92,8 @@ public class AsyncSimpleConsumerExample {
LOGGER.error("Failed to receive message from remote", t);
return null;
});
+ // Block to avoid exist of background threads.
+ Thread.sleep(Long.MAX_VALUE);
// Close the simple consumer when you don't need it anymore.
consumer.close();
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
index baba366..975728c 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
@@ -28,6 +28,10 @@ public class InternalErrorException extends ClientException {
super(cause);
}
+ public InternalErrorException(String message) {
+ super(message);
+ }
+
public InternalErrorException(int responseCode, String requestId, String message) {
super(responseCode, requestId, message);
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 04158ff..77d7664 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -59,6 +59,7 @@ import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionChecker;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
@@ -469,10 +470,13 @@ class ProducerImpl extends ClientImpl implements Producer {
// Intercept after message publishing.
final Duration duration = stopwatch.elapsed();
doAfter(MessageHookPoints.SEND, messageCommons, duration, MessageHookPointsStatus.OK);
-
+ // Should never reach here.
if (sendReceipts.size() != messages.size()) {
- LOGGER.error("[Bug] Due to an unknown reason from remote, received send receipts' quantity[{}]" +
- " is not equal to messages' quantity[{}]", sendReceipts.size(), messages.size());
+ final InternalErrorException e = new InternalErrorException("[Bug] due to an"
+ + " unknown reason from remote, received send receipt's quantity " + sendReceipts.size()
+ + " is not equal to sent message's quantity " + messages.size());
+ future.setException(e);
+ return;
}
// No need more attempts.
future.set(sendReceipts);