You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/06/23 08:29:42 UTC
[rocketmq-clients] branch master updated: Java: try to shutdown it if failed to start client
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 747fed5 Java: try to shutdown it if failed to start client
747fed5 is described below
commit 747fed570570617578ac78ef4790a30422ca7551
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Thu Jun 23 16:29:33 2022 +0800
Java: try to shutdown it if failed to start client
---
.../java/impl/consumer/PushConsumerImpl.java | 34 +++++++++++++---------
.../java/impl/consumer/SimpleConsumerImpl.java | 12 ++++++--
.../client/java/impl/producer/ProducerImpl.java | 12 ++++++--
3 files changed, 38 insertions(+), 20 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index b14c047..b8cd546 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -149,20 +149,26 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
@Override
protected void startUp() throws Exception {
- LOGGER.info("Begin to start the rocketmq push consumer, clientId={}", clientId);
- super.startUp();
- messageMeter.setMessageCacheObserver(this);
- final ScheduledExecutorService scheduler = clientManager.getScheduler();
- this.consumeService = createConsumeService();
- // Scan assignments periodically.
- scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {
- try {
- scanAssignments();
- } catch (Throwable t) {
- LOGGER.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);
- }
- }, 1, 5, TimeUnit.SECONDS);
- LOGGER.info("The rocketmq push consumer starts successfully, clientId={}", clientId);
+ try {
+ LOGGER.info("Begin to start the rocketmq push consumer, clientId={}", clientId);
+ super.startUp();
+ messageMeter.setMessageCacheObserver(this);
+ final ScheduledExecutorService scheduler = clientManager.getScheduler();
+ this.consumeService = createConsumeService();
+ // Scan assignments periodically.
+ scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {
+ try {
+ scanAssignments();
+ } catch (Throwable t) {
+ LOGGER.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);
+ }
+ }, 1, 5, TimeUnit.SECONDS);
+ LOGGER.info("The rocketmq push consumer starts successfully, clientId={}", clientId);
+ } catch (Throwable t) {
+ LOGGER.error("Exception raised while starting the rocketmq push consumer, clientId={}", clientId, t);
+ shutDown();
+ throw t;
+ }
}
@Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index db48b87..a7fe537 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -85,9 +85,15 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
@Override
protected void startUp() throws Exception {
- LOGGER.info("Begin to start the rocketmq simple consumer, clientId={}", clientId);
- super.startUp();
- LOGGER.info("The rocketmq simple consumer starts successfully, clientId={}", clientId);
+ try {
+ LOGGER.info("Begin to start the rocketmq simple consumer, clientId={}", clientId);
+ super.startUp();
+ LOGGER.info("The rocketmq simple consumer starts successfully, clientId={}", clientId);
+ } catch (Throwable t) {
+ LOGGER.error("Failed to start the rocketmq simple consumer, try to shutdown it, clientId={}", clientId, t);
+ shutDown();
+ throw t;
+ }
}
@Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 665a076..94f9eca 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -105,9 +105,15 @@ class ProducerImpl extends ClientImpl implements Producer {
@Override
protected void startUp() throws Exception {
- LOGGER.info("Begin to start the rocketmq producer, clientId={}", clientId);
- super.startUp();
- LOGGER.info("The rocketmq producer starts successfully, clientId={}", clientId);
+ try {
+ LOGGER.info("Begin to start the rocketmq producer, clientId={}", clientId);
+ super.startUp();
+ LOGGER.info("The rocketmq producer starts successfully, clientId={}", clientId);
+ } catch (Throwable t) {
+ LOGGER.error("Failed to start the rocketmq producer, try to shutdown it, clientId={}", clientId, t);
+ shutDown();
+ throw t;
+ }
}
@Override