You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2018/07/23 11:50:54 UTC

[rocketmq] branch develop updated: [ISSUE #377] FIX Admin subcommand consumeMessage should quit when catching an exception (#378)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3f7dda3  [ISSUE #377] FIX Admin subcommand consumeMessage should quit when catching an exception (#378)
3f7dda3 is described below

commit 3f7dda3c0c5fb7de74ece7b9ce690a978c58a319
Author: XiaoZYang <Xi...@users.noreply.github.com>
AuthorDate: Mon Jul 23 19:50:51 2018 +0800

    [ISSUE #377] FIX Admin subcommand consumeMessage should quit when catching an exception (#378)
---
 .../command/message/ConsumeMessageCommand.java     |  1 +
 .../command/message/ConsumeMessageCommandTest.java | 70 ++++++++++++++++++----
 2 files changed, 58 insertions(+), 13 deletions(-)

diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
index 5189267..aa98ee6 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
@@ -207,6 +207,7 @@ public class ConsumeMessageCommand implements SubCommand {
                 pullResult = defaultMQPullConsumer.pull(mq, "*", offset, (int)(maxOffset - offset + 1));
             } catch (Exception e) {
                 e.printStackTrace();
+                return;
             }
             if (pullResult != null) {
                 offset = pullResult.getNextBeginOffset();
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
index 9a5998e..1154395 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
@@ -16,6 +16,13 @@
  */
 package org.apache.rocketmq.tools.command.message;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
@@ -34,14 +41,6 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -54,14 +53,14 @@ public class ConsumeMessageCommandTest {
 
     @BeforeClass
     public static void init() throws MQClientException, RemotingException, MQBrokerException, InterruptedException,
-        NoSuchFieldException, IllegalAccessException{
+        NoSuchFieldException, IllegalAccessException {
         consumeMessageCommand = new ConsumeMessageCommand();
         DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
         MessageExt msg = new MessageExt();
-        msg.setBody(new byte[]{'a'});
+        msg.setBody(new byte[] {'a'});
         List<MessageExt> msgFoundList = new ArrayList<>();
         msgFoundList.add(msg);
-        final PullResult pullResult = new PullResult(PullStatus.FOUND,2, 0, 1, msgFoundList);
+        final PullResult pullResult = new PullResult(PullStatus.FOUND, 2, 0, 1, msgFoundList);
 
         when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenReturn(pullResult);
         when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0));
@@ -73,8 +72,9 @@ public class ConsumeMessageCommandTest {
 
         Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
         producerField.setAccessible(true);
-        producerField.set(consumeMessageCommand,defaultMQPullConsumer);
+        producerField.set(consumeMessageCommand, defaultMQPullConsumer);
     }
+
     @AfterClass
     public static void terminate() {
     }
@@ -102,11 +102,55 @@ public class ConsumeMessageCommandTest {
         System.setOut(new PrintStream(bos));
         Options options = ServerUtil.buildCommandlineOptions(new Options());
 
-        String[] subargs = new String[] {"-t mytopic","-b localhost","-i 0", "-n localhost:9876"};
+        String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0", "-n localhost:9876"};
         CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
         consumeMessageCommand.execute(commandLine, options, null);
         System.setOut(out);
         String s = new String(bos.toByteArray());
         Assert.assertTrue(s.contains("Consume ok"));
     }
+
+    @Test
+    public void testExecuteDefaultWhenPullMessageByQueueGotException() throws SubCommandException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, IllegalAccessException {
+        DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
+        when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class);
+        Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+        producerField.setAccessible(true);
+        producerField.set(consumeMessageCommand, defaultMQPullConsumer);
+
+        PrintStream out = System.out;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(bos));
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-t topic-not-existu", "-n localhost:9876"};
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(),
+            subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
+        consumeMessageCommand.execute(commandLine, options, null);
+
+        System.setOut(out);
+        String s = new String(bos.toByteArray());
+        Assert.assertTrue(!s.contains("Consume ok"));
+    }
+
+    @Test
+    public void testExecuteByConditionWhenPullMessageByQueueGotException() throws IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, SubCommandException {
+        DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
+        when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class);
+        Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+        producerField.setAccessible(true);
+        producerField.set(consumeMessageCommand, defaultMQPullConsumer);
+
+        PrintStream out = System.out;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(bos));
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+
+        String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0", "-n localhost:9876"};
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
+        consumeMessageCommand.execute(commandLine, options, null);
+
+        System.setOut(out);
+        String s = new String(bos.toByteArray());
+        Assert.assertTrue(!s.contains("Consume ok"));
+    }
 }
\ No newline at end of file