You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/09/17 03:01:44 UTC

[rocketmq] branch develop updated: [ISSUE apache#2152] Add isRunning method in DefaultLitePullConsumerImpl class and test suit (#2302)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9ddcab4  [ISSUE apache#2152] Add isRunning method in DefaultLitePullConsumerImpl class and test suit (#2302)
9ddcab4 is described below

commit 9ddcab410c1016e476021031e2c61121396aafa9
Author: anotherJJz <47...@qq.com>
AuthorDate: Thu Sep 17 11:01:22 2020 +0800

    [ISSUE apache#2152] Add isRunning method in DefaultLitePullConsumerImpl class and test suit (#2302)
---
 .../client/consumer/DefaultLitePullConsumer.java   |  5 +++++
 .../rocketmq/client/consumer/LitePullConsumer.java |  7 ++++++
 .../impl/consumer/DefaultLitePullConsumerImpl.java |  4 ++++
 .../consumer/DefaultLitePullConsumerTest.java      | 26 ++++++++++++++++++++++
 4 files changed, 42 insertions(+)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 9cc7c60..393bda9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -212,6 +212,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
     }
 
     @Override
+    public boolean isRunning() {
+        return this.defaultLitePullConsumerImpl.isRunning();
+    }
+
+    @Override
     public void subscribe(String topic, String subExpression) throws MQClientException {
         this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index ce22288..25b1104 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -36,6 +36,13 @@ public interface LitePullConsumer {
     void shutdown();
 
     /**
+     * This consumer is still running
+     *
+     * @return true if consumer is still running
+     */
+    boolean isRunning();
+
+    /**
      * Subscribe some topic with subExpression
      *
      * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index e3d60ff..676c03c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -229,6 +229,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
         }
     }
 
+    public synchronized boolean isRunning() {
+        return this.serviceState == ServiceState.RUNNING;
+    }
+
     public synchronized void start() throws MQClientException {
         switch (this.serviceState) {
             case CREATE_JUST:
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index de2f608..3726a5b 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -513,6 +513,32 @@ public class DefaultLitePullConsumerTest {
         assertThat(offset).isEqualTo(100);
     }
 
+    @Test
+    public void testConsumerAfterShutdown() throws Exception {
+        DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
+        defaultLitePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+        defaultLitePullConsumer.subscribe(topic, "*");
+        new AsyncConsumer().executeAsync(defaultLitePullConsumer);
+        Thread.sleep(10 * 1000);
+        defaultLitePullConsumer.shutdown();
+        assertThat(defaultLitePullConsumer.isRunning()).isFalse();
+    }
+
+    static class AsyncConsumer {
+        public void executeAsync(final DefaultLitePullConsumer consumer) {
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    while (consumer.isRunning()) {
+                        List<MessageExt> poll = consumer.poll(2 * 1000);
+                        System.out.println("consumer is still running");
+                    }
+                    System.out.println("consumer shutdown");
+                }
+            }).start();
+        }
+    }
+
     private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception {
 
         Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");