You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2018/07/14 01:56:06 UTC

[rocketmq] branch develop updated: [ROCKETMQ-353] Add sendMessageCommand and consumeMessageCommand (#332)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b75af6b  [ROCKETMQ-353] Add sendMessageCommand and consumeMessageCommand (#332)
b75af6b is described below

commit b75af6b0c253f7b9c39e812fd2e7938a99b1495d
Author: what-a-good-jungle <35...@qq.com>
AuthorDate: Sat Jul 14 09:55:59 2018 +0800

    [ROCKETMQ-353] Add sendMessageCommand and consumeMessageCommand (#332)
---
 .../rocketmq/tools/command/MQAdminStartup.java     |   4 +
 .../command/message/ConsumeMessageCommand.java     | 292 +++++++++++++++++++++
 .../tools/command/message/SendMessageCommand.java  | 156 +++++++++++
 .../command/message/ConsumeMessageCommandTest.java | 112 ++++++++
 .../command/message/SendMessageCommandTest.java    |  90 +++++++
 5 files changed, 654 insertions(+)

diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index d3342e8..c189e86 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -48,12 +48,14 @@ import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand
 import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
 import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
 import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
+import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
 import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
 import org.apache.rocketmq.tools.command.message.PrintMessageSubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgByIdSubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
+import org.apache.rocketmq.tools.command.message.SendMessageCommand;
 import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
 import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
 import org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand;
@@ -193,6 +195,8 @@ public class MQAdminStartup {
         initCommand(new GetBrokerConfigCommand());
 
         initCommand(new QueryConsumeQueueCommand());
+        initCommand(new SendMessageCommand());
+        initCommand(new ConsumeMessageCommand());
     }
 
     private static void initLogback() throws JoranException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
new file mode 100644
index 0000000..5189267
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.util.Set;
+
+public class ConsumeMessageCommand implements SubCommand {
+
+    private String topic = null;
+    private long messageCount = 128;
+    private DefaultMQPullConsumer defaultMQPullConsumer;
+
+
+    public enum ConsumeType {
+        /**
+         * Topic only
+         */
+        DEFAULT,
+        /**
+         * Topic brokerName queueId set
+         */
+        BYQUEUE,
+        /**
+         * Topic brokerName queueId offset set
+         */
+        BYOFFSET
+    }
+
+    private static long timestampFormat(final String value) {
+        long timestamp;
+        try {
+            timestamp = Long.parseLong(value);
+        } catch (NumberFormatException e) {
+            timestamp = UtilAll.parseDate(value, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
+        }
+
+        return timestamp;
+    }
+    @Override
+    public String commandName() {
+        return "consumeMessage";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Consume message";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("t", "topic", true, "Topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("b", "brokerName", true, "Broker name");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("i", "queueId", true, "Queue Id");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("o", "offset", true, "Queue offset");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("g", "consumerGroup", true, "Consumer group name");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("s", "beginTimestamp ", true,
+                "Begin timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("e", "endTimestamp ", true,
+                "End timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "MessageNumber", true, "Number of message to be consumed");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+
+        return options;
+
+    }
+
+    @Override
+    public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
+        if (defaultMQPullConsumer == null) {
+            defaultMQPullConsumer = new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
+        }
+        defaultMQPullConsumer.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        long offset = 0;
+        long timeValueEnd = 0;
+        long timeValueBegin = 0;
+        String queueId = null;
+        String brokerName = null;
+        ConsumeType consumeType = ConsumeType.DEFAULT;
+
+        try {
+            /* Group name must be set before consumer start */
+            if (commandLine.hasOption('g')) {
+                String consumerGroup = commandLine.getOptionValue('b').trim();
+                defaultMQPullConsumer.setConsumerGroup(consumerGroup);
+            }
+
+            defaultMQPullConsumer.start();
+
+            topic = commandLine.getOptionValue('t').trim();
+
+            if (commandLine.hasOption('c')) {
+                messageCount = Long.parseLong(commandLine.getOptionValue('c').trim());
+                if (messageCount <= 0) {
+                    System.out.print("please input a positive messageNumber!");
+                    return;
+                }
+            }
+            if (commandLine.hasOption('b')) {
+                brokerName = commandLine.getOptionValue('b').trim();
+
+            }
+            if (commandLine.hasOption('i')) {
+                if (!commandLine.hasOption('b')) {
+                    System.out.print("Please set the brokerName before queueId!");
+                    return;
+                }
+                queueId = commandLine.getOptionValue('i').trim();
+
+                consumeType = ConsumeType.BYQUEUE;
+            }
+            if (commandLine.hasOption('o')) {
+                if (consumeType != ConsumeType.BYQUEUE) {
+                    System.out.print("please set queueId before offset!");
+                    return;
+                }
+                offset = Long.parseLong(commandLine.getOptionValue('o').trim());
+                consumeType = ConsumeType.BYOFFSET;
+            }
+
+            if (commandLine.hasOption('s')) {
+                String timestampStr = commandLine.getOptionValue('s').trim();
+                timeValueBegin = timestampFormat(timestampStr);
+            }
+            if (commandLine.hasOption('e')) {
+                String timestampStr = commandLine.getOptionValue('e').trim();
+                timeValueEnd = timestampFormat(timestampStr);
+            }
+
+            switch (consumeType) {
+                case DEFAULT:
+                    executeDefault(timeValueBegin, timeValueEnd);
+                    break;
+                case BYOFFSET:
+                    executeByCondition(brokerName, queueId, offset, timeValueBegin, timeValueEnd);
+                    break;
+                case BYQUEUE:
+                    executeByCondition(brokerName, queueId, 0, timeValueBegin, timeValueEnd);
+                    break;
+                default:
+                    System.out.print("Unknown type of consume!");
+                    break;
+            }
+
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            defaultMQPullConsumer.shutdown();
+        }
+    }
+
+    private void pullMessageByQueue(MessageQueue mq, long minOffset, long maxOffset) {
+        READQ:
+        for (long offset = minOffset; offset <= maxOffset; ) {
+            PullResult pullResult = null;
+            try {
+                pullResult = defaultMQPullConsumer.pull(mq, "*", offset, (int)(maxOffset - offset + 1));
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            if (pullResult != null) {
+                offset = pullResult.getNextBeginOffset();
+                switch (pullResult.getPullStatus()) {
+                    case FOUND:
+                        System.out.print("Consume ok\n");
+                        PrintMessageByQueueCommand.printMessage(pullResult.getMsgFoundList(), "UTF-8",
+                            true, true);
+                        break;
+                    case NO_MATCHED_MSG:
+                        System.out.printf("%s no matched msg. status=%s, offset=%s\n", mq, pullResult.getPullStatus(),
+                            offset);
+                        break;
+                    case NO_NEW_MSG:
+                    case OFFSET_ILLEGAL:
+                        System.out.printf("%s print msg finished. status=%s, offset=%s\n", mq,
+                            pullResult.getPullStatus(), offset);
+                        break READQ;
+                    default:
+                        break;
+                }
+            }
+        }
+    }
+
+    private void executeDefault(long timeValueBegin, long timeValueEnd) {
+        try {
+            Set<MessageQueue> mqs = defaultMQPullConsumer.fetchSubscribeMessageQueues(topic);
+            long countLeft = messageCount;
+            for (MessageQueue mq : mqs) {
+                if (countLeft == 0) {
+                    return;
+                }
+                long minOffset = defaultMQPullConsumer.minOffset(mq);
+                long maxOffset = defaultMQPullConsumer.maxOffset(mq);
+                if (timeValueBegin > 0) {
+                    minOffset = defaultMQPullConsumer.searchOffset(mq, timeValueBegin);
+                }
+                if (timeValueEnd > 0) {
+                    maxOffset = defaultMQPullConsumer.searchOffset(mq, timeValueEnd);
+                }
+                if (maxOffset - minOffset > countLeft) {
+                    System.out.printf("The older %d message of the %d queue will be provided\n", countLeft, mq.getQueueId());
+                    maxOffset = minOffset + countLeft - 1;
+                    countLeft = 0;
+                } else {
+                    countLeft = countLeft - (maxOffset - minOffset) - 1;
+                }
+
+                pullMessageByQueue(mq, minOffset, maxOffset);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void executeByCondition(String brokerName, String queueId, long offset, long timeValueBegin, long timeValueEnd) {
+        MessageQueue mq = new MessageQueue(topic, brokerName, Integer.parseInt(queueId));
+        try {
+            long minOffset = defaultMQPullConsumer.minOffset(mq);
+            long maxOffset = defaultMQPullConsumer.maxOffset(mq);
+            if (timeValueBegin > 0) {
+                minOffset = defaultMQPullConsumer.searchOffset(mq, timeValueBegin);
+            }
+            if (timeValueEnd > 0) {
+                maxOffset = defaultMQPullConsumer.searchOffset(mq, timeValueEnd);
+            }
+            if (offset > maxOffset) {
+                System.out.printf("%s no matched msg, offset=%s\n", mq, offset);
+                return;
+            }
+            minOffset = minOffset > offset ? minOffset : offset;
+            if (maxOffset - minOffset > messageCount) {
+                System.out.printf("The oldler %d message will be provided\n", messageCount);
+                maxOffset = minOffset + messageCount - 1;
+            }
+
+            pullMessageByQueue(mq, minOffset, maxOffset);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java
new file mode 100644
index 0000000..e4921c6
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class SendMessageCommand implements SubCommand {
+
+    private DefaultMQProducer producer;
+
+    @Override
+    public String commandName() {
+        return "sendMessage";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Send a message";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("t", "topic", true, "Topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("p", "body", true, "UTF-8 string format of the message body");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("k", "key", true, "Message keys");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "tags", true, "Message tags");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("b", "broker", true, "Send message to target broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("i", "qid", true, "Send message to target queue");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    private DefaultMQProducer createProducer(RPCHook rpcHook) {
+        if (this.producer != null) {
+            return producer;
+        } else {
+            producer = new DefaultMQProducer(rpcHook);
+            producer.setProducerGroup(Long.toString(System.currentTimeMillis()));
+            return producer;
+        }
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+        Message msg = null;
+        String topic = commandLine.getOptionValue('t').trim();
+        String body = commandLine.getOptionValue('p').trim();
+        String tag = null;
+        String keys = null;
+        String brokerName = null;
+        int queueId = -1;
+        try {
+            if (commandLine.hasOption('k')) {
+                keys = commandLine.getOptionValue('k').trim();
+            }
+            if (commandLine.hasOption('c')) {
+                tag = commandLine.getOptionValue('c').trim();
+            }
+            if (commandLine.hasOption('b')) {
+                brokerName = commandLine.getOptionValue('b').trim();
+            }
+            if (commandLine.hasOption('i')) {
+                if (!commandLine.hasOption('b')) {
+                    System.out.print("Broker name must be set if the queue is chosen!");
+                    return;
+                } else {
+                    queueId = Integer.parseInt(commandLine.getOptionValue('i').trim());
+                }
+            }
+            msg = new Message(topic, tag, keys, body.getBytes("utf-8"));
+        } catch (Exception e) {
+            throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e);
+        }
+
+        DefaultMQProducer producer = this.createProducer(rpcHook);
+        SendResult result;
+        try {
+            producer.start();
+            if (brokerName != null && queueId > -1) {
+                MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
+                result = producer.send(msg, messageQueue);
+            } else {
+                result = producer.send(msg);
+            }
+
+        } catch (Exception e) {
+            throw new RuntimeException(this.getClass().getSimpleName() + " command failed", e);
+        } finally {
+            producer.shutdown();
+        }
+
+        System.out.printf("%-32s  %-4s  %-20s    %s%n",
+            "#Broker Name",
+            "#QID",
+            "#Send Result",
+            "#MsgId"
+        );
+
+        if (result != null) {
+            System.out.printf("%-32s  %-4s  %-20s    %s%n",
+                result.getMessageQueue().getBrokerName(),
+                result.getMessageQueue().getQueueId(),
+                result.getSendStatus(),
+                result.getMsgId()
+            );
+        } else {
+            System.out.printf("%-32s  %-4s  %-20s    %s%n",
+                "Unknown",
+                "Unknown",
+                "Failed",
+                "None"
+            );
+        }
+    }
+}
\ No newline at end of file
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
new file mode 100644
index 0000000..9a5998e
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConsumeMessageCommandTest {
+    private static ConsumeMessageCommand consumeMessageCommand;
+
+    @BeforeClass
+    public static void init() throws MQClientException, RemotingException, MQBrokerException, InterruptedException,
+        NoSuchFieldException, IllegalAccessException{
+        consumeMessageCommand = new ConsumeMessageCommand();
+        DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
+        MessageExt msg = new MessageExt();
+        msg.setBody(new byte[]{'a'});
+        List<MessageExt> msgFoundList = new ArrayList<>();
+        msgFoundList.add(msg);
+        final PullResult pullResult = new PullResult(PullStatus.FOUND,2, 0, 1, msgFoundList);
+
+        when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenReturn(pullResult);
+        when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0));
+        when(defaultMQPullConsumer.maxOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(1));
+
+        final Set<MessageQueue> mqList = new HashSet<>();
+        mqList.add(new MessageQueue());
+        when(defaultMQPullConsumer.fetchSubscribeMessageQueues(anyString())).thenReturn(mqList);
+
+        Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+        producerField.setAccessible(true);
+        producerField.set(consumeMessageCommand,defaultMQPullConsumer);
+    }
+    @AfterClass
+    public static void terminate() {
+    }
+
+    @Test
+    public void testExecuteDefault() throws SubCommandException {
+        PrintStream out = System.out;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(bos));
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-t mytopic", "-n localhost:9876"};
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(),
+            subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
+        consumeMessageCommand.execute(commandLine, options, null);
+
+        System.setOut(out);
+        String s = new String(bos.toByteArray());
+        Assert.assertTrue(s.contains("Consume ok"));
+    }
+
+    @Test
+    public void testExecuteByCondition() throws SubCommandException {
+        PrintStream out = System.out;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(bos));
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+
+        String[] subargs = new String[] {"-t mytopic","-b localhost","-i 0", "-n localhost:9876"};
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
+        consumeMessageCommand.execute(commandLine, options, null);
+        System.setOut(out);
+        String s = new String(bos.toByteArray());
+        Assert.assertTrue(s.contains("Consume ok"));
+    }
+}
\ No newline at end of file
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/SendMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/SendMessageCommandTest.java
new file mode 100644
index 0000000..e4c6673
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/SendMessageCommandTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SendMessageCommandTest {
+
+    private static SendMessageCommand sendMessageCommand = new SendMessageCommand();
+
+    @BeforeClass
+    public static void init() throws MQClientException, RemotingException, InterruptedException, MQBrokerException, NoSuchFieldException, IllegalAccessException {
+
+        DefaultMQProducer defaultMQProducer = mock(DefaultMQProducer.class);
+        SendResult sendResult = new SendResult();
+        sendResult.setMessageQueue(new MessageQueue());
+        sendResult.getMessageQueue().setBrokerName("broker1");
+        sendResult.getMessageQueue().setQueueId(1);
+        sendResult.setSendStatus(SendStatus.SEND_OK);
+        sendResult.setMsgId("fgwejigherughwueyutyu4t4343t43");
+
+        when(defaultMQProducer.send(any(Message.class))).thenReturn(sendResult);
+        when(defaultMQProducer.send(any(Message.class), any(MessageQueue.class))).thenReturn(sendResult);
+
+        Field producerField = SendMessageCommand.class.getDeclaredField("producer");
+        producerField.setAccessible(true);
+        producerField.set(sendMessageCommand, defaultMQProducer);
+    }
+
+    @AfterClass
+    public static void terminate() {
+    }
+
+    @Test
+    public void testExecute() throws SubCommandException {
+        PrintStream out = System.out;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(bos));
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-t mytopic","-p 'send message test'","-c tagA","-k order-16546745756"};
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + sendMessageCommand.commandName(), subargs, sendMessageCommand.buildCommandlineOptions(options), new PosixParser());
+        sendMessageCommand.execute(commandLine, options, null);
+
+        subargs = new String[] {"-t mytopic","-p 'send message test'","-c tagA","-k order-16546745756","-b brokera","-i 1"};
+        commandLine = ServerUtil.parseCmdLine("mqadmin " + sendMessageCommand.commandName(), subargs, sendMessageCommand.buildCommandlineOptions(options), new PosixParser());
+        sendMessageCommand.execute(commandLine, options, null);
+        System.setOut(out);
+        String s = new String(bos.toByteArray());
+        Assert.assertTrue(s.contains("SEND_OK"));
+    }
+}