You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/06/23 08:29:42 UTC

[rocketmq-clients] branch master updated: Java: try to shutdown it if failed to start client

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 747fed5  Java: try to shutdown it if failed to start client
747fed5 is described below

commit 747fed570570617578ac78ef4790a30422ca7551
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Thu Jun 23 16:29:33 2022 +0800

    Java: try to shutdown it if failed to start client
---
 .../java/impl/consumer/PushConsumerImpl.java       | 34 +++++++++++++---------
 .../java/impl/consumer/SimpleConsumerImpl.java     | 12 ++++++--
 .../client/java/impl/producer/ProducerImpl.java    | 12 ++++++--
 3 files changed, 38 insertions(+), 20 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index b14c047..b8cd546 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -149,20 +149,26 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
 
     @Override
     protected void startUp() throws Exception {
-        LOGGER.info("Begin to start the rocketmq push consumer, clientId={}", clientId);
-        super.startUp();
-        messageMeter.setMessageCacheObserver(this);
-        final ScheduledExecutorService scheduler = clientManager.getScheduler();
-        this.consumeService = createConsumeService();
-        // Scan assignments periodically.
-        scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {
-            try {
-                scanAssignments();
-            } catch (Throwable t) {
-                LOGGER.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);
-            }
-        }, 1, 5, TimeUnit.SECONDS);
-        LOGGER.info("The rocketmq push consumer starts successfully, clientId={}", clientId);
+        try {
+            LOGGER.info("Begin to start the rocketmq push consumer, clientId={}", clientId);
+            super.startUp();
+            messageMeter.setMessageCacheObserver(this);
+            final ScheduledExecutorService scheduler = clientManager.getScheduler();
+            this.consumeService = createConsumeService();
+            // Scan assignments periodically.
+            scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {
+                try {
+                    scanAssignments();
+                } catch (Throwable t) {
+                    LOGGER.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);
+                }
+            }, 1, 5, TimeUnit.SECONDS);
+            LOGGER.info("The rocketmq push consumer starts successfully, clientId={}", clientId);
+        } catch (Throwable t) {
+            LOGGER.error("Exception raised while starting the rocketmq push consumer, clientId={}", clientId, t);
+            shutDown();
+            throw t;
+        }
     }
 
     @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index db48b87..a7fe537 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -85,9 +85,15 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
 
     @Override
     protected void startUp() throws Exception {
-        LOGGER.info("Begin to start the rocketmq simple consumer, clientId={}", clientId);
-        super.startUp();
-        LOGGER.info("The rocketmq simple consumer starts successfully, clientId={}", clientId);
+        try {
+            LOGGER.info("Begin to start the rocketmq simple consumer, clientId={}", clientId);
+            super.startUp();
+            LOGGER.info("The rocketmq simple consumer starts successfully, clientId={}", clientId);
+        } catch (Throwable t) {
+            LOGGER.error("Failed to start the rocketmq simple consumer, try to shutdown it, clientId={}", clientId, t);
+            shutDown();
+            throw t;
+        }
     }
 
     @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 665a076..94f9eca 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -105,9 +105,15 @@ class ProducerImpl extends ClientImpl implements Producer {
 
     @Override
     protected void startUp() throws Exception {
-        LOGGER.info("Begin to start the rocketmq producer, clientId={}", clientId);
-        super.startUp();
-        LOGGER.info("The rocketmq producer starts successfully, clientId={}", clientId);
+        try {
+            LOGGER.info("Begin to start the rocketmq producer, clientId={}", clientId);
+            super.startUp();
+            LOGGER.info("The rocketmq producer starts successfully, clientId={}", clientId);
+        } catch (Throwable t) {
+            LOGGER.error("Failed to start the rocketmq producer, try to shutdown it, clientId={}", clientId, t);
+            shutDown();
+            throw t;
+        }
     }
 
     @Override