You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2018/07/23 11:50:54 UTC
[rocketmq] branch develop updated: [ISSUE #377] FIX Admin
subcommand consumeMessage should quit when catching an exception (#378)
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3f7dda3 [ISSUE #377] FIX Admin subcommand consumeMessage should quit when catching an exception (#378)
3f7dda3 is described below
commit 3f7dda3c0c5fb7de74ece7b9ce690a978c58a319
Author: XiaoZYang <Xi...@users.noreply.github.com>
AuthorDate: Mon Jul 23 19:50:51 2018 +0800
[ISSUE #377] FIX Admin subcommand consumeMessage should quit when catching an exception (#378)
---
.../command/message/ConsumeMessageCommand.java | 1 +
.../command/message/ConsumeMessageCommandTest.java | 70 ++++++++++++++++++----
2 files changed, 58 insertions(+), 13 deletions(-)
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
index 5189267..aa98ee6 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
@@ -207,6 +207,7 @@ public class ConsumeMessageCommand implements SubCommand {
pullResult = defaultMQPullConsumer.pull(mq, "*", offset, (int)(maxOffset - offset + 1));
} catch (Exception e) {
e.printStackTrace();
+ return;
}
if (pullResult != null) {
offset = pullResult.getNextBeginOffset();
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
index 9a5998e..1154395 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.tools.command.message;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
@@ -34,14 +41,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -54,14 +53,14 @@ public class ConsumeMessageCommandTest {
@BeforeClass
public static void init() throws MQClientException, RemotingException, MQBrokerException, InterruptedException,
- NoSuchFieldException, IllegalAccessException{
+ NoSuchFieldException, IllegalAccessException {
consumeMessageCommand = new ConsumeMessageCommand();
DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
MessageExt msg = new MessageExt();
- msg.setBody(new byte[]{'a'});
+ msg.setBody(new byte[] {'a'});
List<MessageExt> msgFoundList = new ArrayList<>();
msgFoundList.add(msg);
- final PullResult pullResult = new PullResult(PullStatus.FOUND,2, 0, 1, msgFoundList);
+ final PullResult pullResult = new PullResult(PullStatus.FOUND, 2, 0, 1, msgFoundList);
when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenReturn(pullResult);
when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0));
@@ -73,8 +72,9 @@ public class ConsumeMessageCommandTest {
Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
producerField.setAccessible(true);
- producerField.set(consumeMessageCommand,defaultMQPullConsumer);
+ producerField.set(consumeMessageCommand, defaultMQPullConsumer);
}
+
@AfterClass
public static void terminate() {
}
@@ -102,11 +102,55 @@ public class ConsumeMessageCommandTest {
System.setOut(new PrintStream(bos));
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[] {"-t mytopic","-b localhost","-i 0", "-n localhost:9876"};
+ String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0", "-n localhost:9876"};
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
consumeMessageCommand.execute(commandLine, options, null);
System.setOut(out);
String s = new String(bos.toByteArray());
Assert.assertTrue(s.contains("Consume ok"));
}
+
+ @Test
+ public void testExecuteDefaultWhenPullMessageByQueueGotException() throws SubCommandException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, IllegalAccessException {
+ DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
+ when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class);
+ Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+ producerField.setAccessible(true);
+ producerField.set(consumeMessageCommand, defaultMQPullConsumer);
+
+ PrintStream out = System.out;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(bos));
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ String[] subargs = new String[] {"-t topic-not-existu", "-n localhost:9876"};
+ CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(),
+ subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
+ consumeMessageCommand.execute(commandLine, options, null);
+
+ System.setOut(out);
+ String s = new String(bos.toByteArray());
+ Assert.assertTrue(!s.contains("Consume ok"));
+ }
+
+ @Test
+ public void testExecuteByConditionWhenPullMessageByQueueGotException() throws IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, SubCommandException {
+ DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
+ when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class);
+ Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+ producerField.setAccessible(true);
+ producerField.set(consumeMessageCommand, defaultMQPullConsumer);
+
+ PrintStream out = System.out;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(bos));
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+
+ String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0", "-n localhost:9876"};
+ CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
+ consumeMessageCommand.execute(commandLine, options, null);
+
+ System.setOut(out);
+ String s = new String(bos.toByteArray());
+ Assert.assertTrue(!s.contains("Consume ok"));
+ }
}
\ No newline at end of file