You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2020/09/17 05:57:22 UTC
[rocketmq] branch develop updated: [ISSUE #1770] Add a query
message trace command in mqadmin. (#2303)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a460c5c [ISSUE #1770] Add a query message trace command in mqadmin. (#2303)
a460c5c is described below
commit a460c5c10217ea15a275d43fe8e2314b7d0f592c
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Thu Sep 17 13:57:01 2020 +0800
[ISSUE #1770] Add a query message trace command in mqadmin. (#2303)
Co-authored-by: zhangjidi2016 <zh...@cmss.chinamobile.com>
---
.../rocketmq/client/trace/TraceDataEncoder.java | 10 +-
.../apache/rocketmq/client/trace/TraceView.java | 180 +++++++++++++++++++++
.../client/trace/TraceDataEncoderTest.java | 93 +++++++++++
.../rocketmq/client/trace/TraceViewTest.java | 57 +++++++
.../rocketmq/tools/command/MQAdminStartup.java | 2 +
.../message/QueryMsgTraceByIdSubCommand.java | 148 +++++++++++++++++
.../message/QueryMsgTraceByIdSubCommandTest.java | 84 ++++++++++
7 files changed, 573 insertions(+), 1 deletion(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
index 9569cc0..acf0dea 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java
@@ -103,6 +103,11 @@ public class TraceDataEncoder {
// add the context type
subAfterContext.setContextCode(Integer.parseInt(line[6]));
}
+ // compatible with the old version
+ if (line.length >= 9) {
+ subAfterContext.setTimeStamp(Long.parseLong(line[7]));
+ subAfterContext.setGroupName(line[8]);
+ }
resList.add(subAfterContext);
}
}
@@ -165,7 +170,10 @@ public class TraceDataEncoder {
.append(ctx.getCostTime()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.isSuccess()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
- .append(ctx.getContextCode()).append(TraceConstants.FIELD_SPLITOR);
+ .append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR)
+ .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
+ .append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);
+
}
}
break;
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceView.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceView.java
new file mode 100644
index 0000000..14fc360
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceView.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.trace;
+
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TraceView {
+
+ private String msgId;
+ private String tags;
+ private String keys;
+ private String storeHost;
+ private String clientHost;
+ private int costTime;
+ private String msgType;
+ private String offSetMsgId;
+ private long timeStamp;
+ private long bornTime;
+ private String topic;
+ private String groupName;
+ private String status;
+
+ public static List<TraceView> decodeFromTraceTransData(String key, String messageBody) {
+ List<TraceView> messageTraceViewList = new ArrayList<TraceView>();
+ if (messageBody == null || messageBody.length() <= 0) {
+ return messageTraceViewList;
+ }
+
+ List<TraceContext> traceContextList = TraceDataEncoder.decoderFromTraceDataString(messageBody);
+
+ for (TraceContext context : traceContextList) {
+ TraceView messageTraceView = new TraceView();
+ TraceBean traceBean = context.getTraceBeans().get(0);
+ if (!traceBean.getMsgId().equals(key)) {
+ continue;
+ }
+ messageTraceView.setCostTime(context.getCostTime());
+ messageTraceView.setGroupName(context.getGroupName());
+ if (context.isSuccess()) {
+ messageTraceView.setStatus("success");
+ }
+ else {
+ messageTraceView.setStatus("failed");
+ }
+ messageTraceView.setKeys(traceBean.getKeys());
+ messageTraceView.setMsgId(traceBean.getMsgId());
+ messageTraceView.setTags(traceBean.getTags());
+ messageTraceView.setTopic(traceBean.getTopic());
+ messageTraceView.setMsgType(context.getTraceType().name());
+ messageTraceView.setOffSetMsgId(traceBean.getOffsetMsgId());
+ messageTraceView.setTimeStamp(context.getTimeStamp());
+ messageTraceView.setStoreHost(traceBean.getStoreHost());
+ messageTraceView.setClientHost(traceBean.getClientHost());
+ messageTraceViewList.add(messageTraceView);
+ }
+ return messageTraceViewList;
+ }
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
+ }
+
+ public String getTags() {
+ return tags;
+ }
+
+ public void setTags(String tags) {
+ this.tags = tags;
+ }
+
+ public String getKeys() {
+ return keys;
+ }
+
+ public void setKeys(String keys) {
+ this.keys = keys;
+ }
+
+ public String getStoreHost() {
+ return storeHost;
+ }
+
+ public void setStoreHost(String storeHost) {
+ this.storeHost = storeHost;
+ }
+
+ public String getClientHost() {
+ return clientHost;
+ }
+
+ public void setClientHost(String clientHost) {
+ this.clientHost = clientHost;
+ }
+
+ public int getCostTime() {
+ return costTime;
+ }
+
+ public void setCostTime(int costTime) {
+ this.costTime = costTime;
+ }
+
+ public String getMsgType() {
+ return msgType;
+ }
+
+ public void setMsgType(String msgType) {
+ this.msgType = msgType;
+ }
+
+ public String getOffSetMsgId() {
+ return offSetMsgId;
+ }
+
+ public void setOffSetMsgId(String offSetMsgId) {
+ this.offSetMsgId = offSetMsgId;
+ }
+
+ public long getTimeStamp() {
+ return timeStamp;
+ }
+
+ public void setTimeStamp(long timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+
+ public long getBornTime() {
+ return bornTime;
+ }
+
+ public void setBornTime(long bornTime) {
+ this.bornTime = bornTime;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+}
\ No newline at end of file
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
new file mode 100644
index 0000000..249a0d1
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.trace;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageType;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TraceDataEncoderTest {
+
+ private String traceData;
+
+ private long time;
+
+ @Before
+ public void init() {
+ time = System.currentTimeMillis();
+ traceData = new StringBuilder()
+ .append("Pub").append(TraceConstants.CONTENT_SPLITOR)
+ .append(time).append(TraceConstants.CONTENT_SPLITOR)
+ .append("DefaultRegion").append(TraceConstants.CONTENT_SPLITOR)
+ .append("PID-test").append(TraceConstants.CONTENT_SPLITOR)
+ .append("topic-test").append(TraceConstants.CONTENT_SPLITOR)
+ .append("AC1415116D1418B4AAC217FE1B4E0000").append(TraceConstants.CONTENT_SPLITOR)
+ .append("Tags").append(TraceConstants.CONTENT_SPLITOR)
+ .append("Keys").append(TraceConstants.CONTENT_SPLITOR)
+ .append("127.0.0.1:10911").append(TraceConstants.CONTENT_SPLITOR)
+ .append(26).append(TraceConstants.CONTENT_SPLITOR)
+ .append(245).append(TraceConstants.CONTENT_SPLITOR)
+ .append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR)
+ .append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR)
+ .append(true).append(TraceConstants.CONTENT_SPLITOR)
+ .append(UtilAll.ipToIPv4Str(UtilAll.getIP())).append(TraceConstants.FIELD_SPLITOR)
+ .toString();
+ }
+
+ @Test
+ public void testDecoderFromTraceDataString() {
+ List<TraceContext> contexts = TraceDataEncoder.decoderFromTraceDataString(traceData);
+ Assert.assertEquals(contexts.size(), 1);
+ Assert.assertEquals(contexts.get(0).getTraceType(), TraceType.Pub);
+ }
+
+
+ @Test
+ public void testEncoderFromContextBean() {
+ TraceContext context = new TraceContext();
+ context.setTraceType(TraceType.Pub);
+ context.setGroupName("PID-test");
+ context.setRegionId("DefaultRegion");
+ context.setCostTime(245);
+ context.setSuccess(true);
+ context.setTimeStamp(time);
+ TraceBean traceBean = new TraceBean();
+ traceBean.setTopic("topic-test");
+ traceBean.setKeys("Keys");
+ traceBean.setTags("Tags");
+ traceBean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
+ traceBean.setOffsetMsgId("0A9A002600002A9F0000000000002329");
+ traceBean.setStoreHost("127.0.0.1:10911");
+ traceBean.setStoreTime(time);
+ traceBean.setMsgType(MessageType.Normal_Msg);
+ traceBean.setBodyLength(26);
+ List<TraceBean> traceBeans = new ArrayList<TraceBean>();
+ traceBeans.add(traceBean);
+ context.setTraceBeans(traceBeans);
+ TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(context);
+
+ Assert.assertEquals(traceTransferBean.getTransData(), traceData);
+ Assert.assertEquals(traceTransferBean.getTransKey().size(), 2);
+ }
+
+}
\ No newline at end of file
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TraceViewTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TraceViewTest.java
new file mode 100644
index 0000000..51a1543
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/TraceViewTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.trace;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TraceViewTest {
+
+ @Test
+ public void testDecodeFromTraceTransData() {
+ String messageBody = new StringBuilder()
+ .append("Pub").append(TraceConstants.CONTENT_SPLITOR)
+ .append(System.currentTimeMillis()).append(TraceConstants.CONTENT_SPLITOR)
+ .append("DefaultRegion").append(TraceConstants.CONTENT_SPLITOR)
+ .append("PID-test").append(TraceConstants.CONTENT_SPLITOR)
+ .append("topic-test").append(TraceConstants.CONTENT_SPLITOR)
+ .append("AC1415116D1418B4AAC217FE1B4E0000").append(TraceConstants.CONTENT_SPLITOR)
+ .append("Tags").append(TraceConstants.CONTENT_SPLITOR)
+ .append("Keys").append(TraceConstants.CONTENT_SPLITOR)
+ .append("127.0.0.1:10911").append(TraceConstants.CONTENT_SPLITOR)
+ .append(26).append(TraceConstants.CONTENT_SPLITOR)
+ .append(245).append(TraceConstants.CONTENT_SPLITOR)
+ .append(MessageType.Normal_Msg.ordinal()).append(TraceConstants.CONTENT_SPLITOR)
+ .append("0A9A002600002A9F0000000000002329").append(TraceConstants.CONTENT_SPLITOR)
+ .append(true).append(TraceConstants.CONTENT_SPLITOR)
+ .append(UtilAll.ipToIPv4Str(UtilAll.getIP())).append(TraceConstants.FIELD_SPLITOR)
+ .toString();
+ String key = "AC1415116D1418B4AAC217FE1B4E0000";
+ List<TraceView> traceViews = TraceView.decodeFromTraceTransData(key, messageBody);
+ Assert.assertEquals(traceViews.size(), 1);
+ Assert.assertEquals(traceViews.get(0).getMsgId(), key);
+
+ key = "AD4233434334AAC217FEFFD0000";
+ traceViews = TraceView.decodeFromTraceTransData(key, messageBody);
+ Assert.assertEquals(traceViews.size(), 0);
+ }
+}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 28431a9..f947744 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -59,6 +59,7 @@ import org.apache.rocketmq.tools.command.message.QueryMsgByIdSubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
+import org.apache.rocketmq.tools.command.message.QueryMsgTraceByIdSubCommand;
import org.apache.rocketmq.tools.command.message.SendMessageCommand;
import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
@@ -164,6 +165,7 @@ public class MQAdminStartup {
initCommand(new QueryMsgByKeySubCommand());
initCommand(new QueryMsgByUniqueKeySubCommand());
initCommand(new QueryMsgByOffsetSubCommand());
+ initCommand(new QueryMsgTraceByIdSubCommand());
initCommand(new PrintMessageSubCommand());
initCommand(new PrintMessageByQueueCommand());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java
new file mode 100644
index 0000000..bed2763
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommand.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.codec.Charsets;
+import org.apache.commons.lang3.time.DateFormatUtils;
+import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.trace.TraceType;
+import org.apache.rocketmq.client.trace.TraceView;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+
+public class QueryMsgTraceByIdSubCommand implements SubCommand {
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("i", "msgId", true, "Message Id");
+ opt.setRequired(true);
+ options.addOption(opt);
+ return options;
+ }
+
+ @Override
+ public String commandDesc() {
+ return "query a message trace";
+ }
+
+ @Override
+ public String commandName() {
+ return "QueryMsgTraceById";
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ try {
+ final String msgId = commandLine.getOptionValue('i').trim();
+ this.queryTraceByMsgId(defaultMQAdminExt, msgId);
+ } catch (Exception e) {
+ throw new SubCommandException(this.getClass().getSimpleName() + "command failed", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+
+ private void queryTraceByMsgId(final DefaultMQAdminExt admin, String msgId)
+ throws MQClientException, InterruptedException {
+ admin.start();
+ QueryResult queryResult = admin.queryMessage(TopicValidator.RMQ_SYS_TRACE_TOPIC, msgId, 64, 0, System.currentTimeMillis());
+ List<MessageExt> messageList = queryResult.getMessageList();
+ List<TraceView> traceViews = new ArrayList<>();
+ for (MessageExt message : messageList) {
+ List<TraceView> traceView = TraceView.decodeFromTraceTransData(msgId, new String(message.getBody(), Charsets.UTF_8));
+ traceViews.addAll(traceView);
+ }
+
+ this.printMessageTrace(traceViews);
+ }
+
+ private void printMessageTrace(List<TraceView> traceViews) {
+ Map<String, List<TraceView>> consumerTraceMap = new HashMap<>(16);
+ for (TraceView traceView : traceViews) {
+ if (traceView.getMsgType().equals(TraceType.Pub.name())) {
+ System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
+ "#Type",
+ "#ProducerGroup",
+ "#ClientHost",
+ "#SendTime",
+ "#CostTimes",
+ "#Status"
+ );
+ System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
+ "Pub",
+ traceView.getGroupName(),
+ traceView.getClientHost(),
+ DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"),
+ traceView.getCostTime() + "ms",
+ traceView.getStatus()
+ );
+ System.out.printf("\n");
+ }
+ if (traceView.getMsgType().equals(TraceType.SubAfter.name())) {
+ String groupName = traceView.getGroupName();
+ if (consumerTraceMap.containsKey(groupName)) {
+ consumerTraceMap.get(groupName).add(traceView);
+ } else {
+ ArrayList<TraceView> views = new ArrayList<>();
+ views.add(traceView);
+ consumerTraceMap.put(groupName, views);
+ }
+ }
+ }
+
+ Iterator<String> consumers = consumerTraceMap.keySet().iterator();
+ while (consumers.hasNext()) {
+ System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
+ "#Type",
+ "#ConsumerGroup",
+ "#ClientHost",
+ "#ConsumerTime",
+ "#CostTimes",
+ "#Status"
+ );
+ List<TraceView> consumerTraces = consumerTraceMap.get(consumers.next());
+ for (TraceView traceView : consumerTraces) {
+ System.out.printf("%-10s %-20s %-20s %-20s %-10s %-10s%n",
+ "Sub",
+ traceView.getGroupName(),
+ traceView.getClientHost(),
+ DateFormatUtils.format(traceView.getTimeStamp(), "yyyy-MM-dd HH:mm:ss"),
+ traceView.getCostTime() + "ms",
+ traceView.getStatus()
+ );
+ }
+ System.out.printf("\n");
+ }
+ }
+}
\ No newline at end of file
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommandTest.java
new file mode 100644
index 0000000..f61c71d
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommandTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+
+import static org.mockito.Mockito.mock;
+
+public class QueryMsgTraceByIdSubCommandTest {
+ private static DefaultMQAdminExt defaultMQAdminExt;
+ private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+ private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
+ private static MQClientAPIImpl mQClientAPIImpl;
+
+ @BeforeClass
+ public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
+ mQClientAPIImpl = mock(MQClientAPIImpl.class);
+ defaultMQAdminExt = new DefaultMQAdminExt();
+ defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+
+ Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+ field.setAccessible(true);
+ field.set(defaultMQAdminExtImpl, mqClientInstance);
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mqClientInstance, mQClientAPIImpl);
+ field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
+ field.setAccessible(true);
+ field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+ }
+
+ @AfterClass
+ public static void terminate() {
+ defaultMQAdminExt.shutdown();
+ }
+
+ @Ignore
+ @Test
+ public void testExecute() throws SubCommandException {
+ System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876");
+ QueryMsgTraceByIdSubCommand cmd = new QueryMsgTraceByIdSubCommand();
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ String[] subargs = new String[] {"-i AC1FF54E81C418B4AAC24F92E1E00000"};
+ final CommandLine commandLine =
+ ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+ cmd.execute(commandLine, options, null);
+ }
+}
\ No newline at end of file