You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/09/17 03:01:44 UTC
[rocketmq] branch develop updated: [ISSUE apache#2152] Add
isRunning method in DefaultLitePullConsumerImpl class and test suit (#2302)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9ddcab4 [ISSUE apache#2152] Add isRunning method in DefaultLitePullConsumerImpl class and test suit (#2302)
9ddcab4 is described below
commit 9ddcab410c1016e476021031e2c61121396aafa9
Author: anotherJJz <47...@qq.com>
AuthorDate: Thu Sep 17 11:01:22 2020 +0800
[ISSUE apache#2152] Add isRunning method in DefaultLitePullConsumerImpl class and test suit (#2302)
---
.../client/consumer/DefaultLitePullConsumer.java | 5 +++++
.../rocketmq/client/consumer/LitePullConsumer.java | 7 ++++++
.../impl/consumer/DefaultLitePullConsumerImpl.java | 4 ++++
.../consumer/DefaultLitePullConsumerTest.java | 26 ++++++++++++++++++++++
4 files changed, 42 insertions(+)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 9cc7c60..393bda9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -212,6 +212,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
}
@Override
+ public boolean isRunning() {
+ return this.defaultLitePullConsumerImpl.isRunning();
+ }
+
+ @Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index ce22288..25b1104 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -36,6 +36,13 @@ public interface LitePullConsumer {
void shutdown();
/**
+ * This consumer is still running
+ *
+ * @return true if consumer is still running
+ */
+ boolean isRunning();
+
+ /**
* Subscribe some topic with subExpression
*
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br> if
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index e3d60ff..676c03c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -229,6 +229,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
}
+ public synchronized boolean isRunning() {
+ return this.serviceState == ServiceState.RUNNING;
+ }
+
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index de2f608..3726a5b 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -513,6 +513,32 @@ public class DefaultLitePullConsumerTest {
assertThat(offset).isEqualTo(100);
}
+ @Test
+ public void testConsumerAfterShutdown() throws Exception {
+ DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
+ defaultLitePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+ defaultLitePullConsumer.subscribe(topic, "*");
+ new AsyncConsumer().executeAsync(defaultLitePullConsumer);
+ Thread.sleep(10 * 1000);
+ defaultLitePullConsumer.shutdown();
+ assertThat(defaultLitePullConsumer.isRunning()).isFalse();
+ }
+
+ static class AsyncConsumer {
+ public void executeAsync(final DefaultLitePullConsumer consumer) {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (consumer.isRunning()) {
+ List<MessageExt> poll = consumer.poll(2 * 1000);
+ System.out.println("consumer is still running");
+ }
+ System.out.println("consumer shutdown");
+ }
+ }).start();
+ }
+ }
+
private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception {
Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");