You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:58 UTC
[67/99] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java
index e15ce1f..b3f4377 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommand.java
@@ -16,6 +16,12 @@
*/
package org.apache.rocketmq.tools.command.consumer;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.Connection;
@@ -25,20 +31,12 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.MQAdminStartup;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
public class ConsumerStatusSubCommand implements SubCommand {
public static void main(String[] args) {
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
- MQAdminStartup.main(new String[]{new ConsumerStatusSubCommand().commandName(), "-g", "benchmark_consumer"});
+ MQAdminStartup.main(new String[] {new ConsumerStatusSubCommand().commandName(), "-g", "benchmark_consumer"});
}
@Override
@@ -86,16 +84,16 @@ public class ConsumerStatusSubCommand implements SubCommand {
for (Connection conn : cc.getConnectionSet()) {
try {
ConsumerRunningInfo consumerRunningInfo =
- defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack);
+ defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack);
if (consumerRunningInfo != null) {
criTable.put(conn.getClientId(), consumerRunningInfo);
String filePath = now + "/" + conn.getClientId();
MixAll.string2FileNotSafe(consumerRunningInfo.formatString(), filePath);
System.out.printf("%03d %-40s %-20s %s%n",
- i++,
- conn.getClientId(),
- MQVersion.getVersionDesc(conn.getVersion()),
- filePath);
+ i++,
+ conn.getClientId(),
+ MQVersion.getVersionDesc(conn.getVersion()),
+ filePath);
}
} catch (Exception e) {
e.printStackTrace();
@@ -114,7 +112,7 @@ public class ConsumerStatusSubCommand implements SubCommand {
while (it.hasNext()) {
Entry<String, ConsumerRunningInfo> next = it.next();
String result =
- ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue());
+ ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue());
if (result.length() > 0) {
System.out.printf(result);
}
@@ -126,7 +124,7 @@ public class ConsumerStatusSubCommand implements SubCommand {
} else {
String clientId = commandLine.getOptionValue('i').trim();
ConsumerRunningInfo consumerRunningInfo =
- defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack);
+ defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack);
if (consumerRunningInfo != null) {
System.out.printf("%s", consumerRunningInfo.formatString());
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerSubCommand.java
index 6e7cc27..699625d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerSubCommand.java
@@ -16,6 +16,12 @@
*/
package org.apache.rocketmq.tools.command.consumer;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.Connection;
@@ -25,20 +31,12 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.MQAdminStartup;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
public class ConsumerSubCommand implements SubCommand {
public static void main(String[] args) {
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
- MQAdminStartup.main(new String[]{new ConsumerSubCommand().commandName(), "-g", "benchmark_consumer"});
+ MQAdminStartup.main(new String[] {new ConsumerSubCommand().commandName(), "-g", "benchmark_consumer"});
}
@Override
@@ -81,20 +79,20 @@ public class ConsumerSubCommand implements SubCommand {
int i = 1;
long now = System.currentTimeMillis();
final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable =
- new TreeMap<String, ConsumerRunningInfo>();
+ new TreeMap<String, ConsumerRunningInfo>();
for (Connection conn : cc.getConnectionSet()) {
try {
ConsumerRunningInfo consumerRunningInfo =
- defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack);
+ defaultMQAdminExt.getConsumerRunningInfo(group, conn.getClientId(), jstack);
if (consumerRunningInfo != null) {
criTable.put(conn.getClientId(), consumerRunningInfo);
String filePath = now + "/" + conn.getClientId();
MixAll.string2FileNotSafe(consumerRunningInfo.formatString(), filePath);
System.out.printf("%03d %-40s %-20s %s%n",
- i++,
- conn.getClientId(),
- MQVersion.getVersionDesc(conn.getVersion()),
- filePath);
+ i++,
+ conn.getClientId(),
+ MQVersion.getVersionDesc(conn.getVersion()),
+ filePath);
}
} catch (Exception e) {
e.printStackTrace();
@@ -113,7 +111,7 @@ public class ConsumerSubCommand implements SubCommand {
while (it.hasNext()) {
Entry<String, ConsumerRunningInfo> next = it.next();
String result =
- ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue());
+ ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue());
if (result.length() > 0) {
System.out.printf(result);
}
@@ -125,7 +123,7 @@ public class ConsumerSubCommand implements SubCommand {
} else {
String clientId = commandLine.getOptionValue('i').trim();
ConsumerRunningInfo consumerRunningInfo =
- defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack);
+ defaultMQAdminExt.getConsumerRunningInfo(group, clientId, jstack);
if (consumerRunningInfo != null) {
System.out.printf(consumerRunningInfo.formatString());
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
index 0cc5879..75e6b65 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
@@ -6,16 +6,20 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.consumer;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
@@ -23,12 +27,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Set;
-
public class DeleteSubscriptionGroupCommand implements SubCommand {
@Override
@@ -36,13 +34,11 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
return "deleteSubGroup";
}
-
@Override
public String commandDesc() {
return "Delete subscription group from broker.";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("b", "brokerAddr", true, "delete subscription group from which broker");
@@ -60,7 +56,6 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
return options;
}
-
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
@@ -75,7 +70,7 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
adminExt.deleteSubscriptionGroup(addr, groupName);
System.out.printf("delete subscription group [%s] from broker [%s] success.%n", groupName,
- addr);
+ addr);
return;
} else if (commandLine.hasOption('c')) {
@@ -86,15 +81,15 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
for (String master : masterSet) {
adminExt.deleteSubscriptionGroup(master, groupName);
System.out.printf(
- "delete subscription group [%s] from broker [%s] in cluster [%s] success.%n",
- groupName, master, clusterName);
+ "delete subscription group [%s] from broker [%s] in cluster [%s] success.%n",
+ groupName, master, clusterName);
}
try {
DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.RETRY_GROUP_TOPIC_PREFIX
- + groupName);
+ + groupName);
DeleteTopicSubCommand.deleteTopic(adminExt, clusterName, MixAll.DLQ_GROUP_TOPIC_PREFIX
- + groupName);
+ + groupName);
} catch (Exception e) {
}
return;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java
index 4d5315b..8bb7c0d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/StartMonitoringSubCommand.java
@@ -6,54 +6,49 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.consumer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.monitor.DefaultMonitorListener;
import org.apache.rocketmq.tools.monitor.MonitorConfig;
import org.apache.rocketmq.tools.monitor.MonitorService;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
import org.slf4j.Logger;
-
public class StartMonitoringSubCommand implements SubCommand {
private final Logger log = ClientLogger.getLog();
-
@Override
public String commandName() {
return "startMonitoring";
}
-
@Override
public String commandDesc() {
return "Start Monitoring";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
return options;
}
-
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
try {
MonitorService monitorService =
- new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook);
+ new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook);
monitorService.start();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
index 93eb8ec..4ff032e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
@@ -16,18 +16,16 @@
*/
package org.apache.rocketmq.tools.command.consumer;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Set;
-
public class UpdateSubGroupSubCommand implements SubCommand {
@@ -36,13 +34,11 @@ public class UpdateSubGroupSubCommand implements SubCommand {
return "updateSubGroup";
}
-
@Override
public String commandDesc() {
return "Update or create subscription group";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("b", "brokerAddr", true, "create subscription group to which broker");
@@ -92,7 +88,6 @@ public class UpdateSubGroupSubCommand implements SubCommand {
return options;
}
-
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
@@ -110,31 +105,31 @@ public class UpdateSubGroupSubCommand implements SubCommand {
// consumeEnable
if (commandLine.hasOption('s')) {
subscriptionGroupConfig.setConsumeEnable(Boolean.parseBoolean(commandLine.getOptionValue('s')
- .trim()));
+ .trim()));
}
// consumeFromMinEnable
if (commandLine.hasOption('m')) {
subscriptionGroupConfig.setConsumeFromMinEnable(Boolean.parseBoolean(commandLine
- .getOptionValue('m').trim()));
+ .getOptionValue('m').trim()));
}
// consumeBroadcastEnable
if (commandLine.hasOption('d')) {
subscriptionGroupConfig.setConsumeBroadcastEnable(Boolean.parseBoolean(commandLine
- .getOptionValue('d').trim()));
+ .getOptionValue('d').trim()));
}
// retryQueueNums
if (commandLine.hasOption('q')) {
subscriptionGroupConfig.setRetryQueueNums(Integer.parseInt(commandLine.getOptionValue('q')
- .trim()));
+ .trim()));
}
// retryMaxTimes
if (commandLine.hasOption('r')) {
subscriptionGroupConfig.setRetryMaxTimes(Integer.parseInt(commandLine.getOptionValue('r')
- .trim()));
+ .trim()));
}
// brokerId
@@ -145,13 +140,13 @@ public class UpdateSubGroupSubCommand implements SubCommand {
// whichBrokerWhenConsumeSlowly
if (commandLine.hasOption('w')) {
subscriptionGroupConfig.setWhichBrokerWhenConsumeSlowly(Long.parseLong(commandLine
- .getOptionValue('w').trim()));
+ .getOptionValue('w').trim()));
}
// notifyConsumerIdsChanged
if (commandLine.hasOption('a')) {
subscriptionGroupConfig.setNotifyConsumerIdsChangedEnable(Boolean.parseBoolean(commandLine
- .getOptionValue('a').trim()));
+ .getOptionValue('a').trim()));
}
if (commandLine.hasOption('b')) {
@@ -169,7 +164,7 @@ public class UpdateSubGroupSubCommand implements SubCommand {
defaultMQAdminExt.start();
Set<String> masterSet =
- CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
try {
defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subscriptionGroupConfig);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java
index fb0061e..cade0e0 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/CheckMsgSendRTCommand.java
@@ -16,6 +16,10 @@
*/
package org.apache.rocketmq.tools.command.message;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.MixAll;
@@ -23,11 +27,6 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.List;
public class CheckMsgSendRTCommand implements SubCommand {
private static String brokerName = "";
@@ -72,16 +71,16 @@ public class CheckMsgSendRTCommand implements SubCommand {
boolean sendSuccess = false;
String topic = commandLine.getOptionValue('t').trim();
long amount = !commandLine.hasOption('a') ? 100 : Long.parseLong(commandLine
- .getOptionValue('a').trim());
+ .getOptionValue('a').trim());
long msgSize = !commandLine.hasOption('s') ? 128 : Long.parseLong(commandLine
- .getOptionValue('s').trim());
+ .getOptionValue('s').trim());
Message msg = new Message(topic, getStringBySize(msgSize).getBytes(MixAll.DEFAULT_CHARSET));
System.out.printf("%-32s %-4s %-20s %s%n",
- "#Broker Name",
- "#QID",
- "#Send Result",
- "#RT"
+ "#Broker Name",
+ "#QID",
+ "#Send Result",
+ "#RT"
);
for (int i = 0; i < amount; i++) {
start = System.currentTimeMillis();
@@ -89,7 +88,7 @@ public class CheckMsgSendRTCommand implements SubCommand {
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- int queueIndex = (Integer) arg % mqs.size();
+ int queueIndex = (Integer)arg % mqs.size();
MessageQueue queue = mqs.get(queueIndex);
brokerName = queue.getBrokerName();
queueId = queue.getQueueId();
@@ -103,20 +102,19 @@ public class CheckMsgSendRTCommand implements SubCommand {
end = System.currentTimeMillis();
}
-
if (i != 0) {
timeElapsed += end - start;
}
System.out.printf("%-32s %-4s %-20s %s%n",
- brokerName,
- queueId,
- sendSuccess,
- end - start
+ brokerName,
+ queueId,
+ sendSuccess,
+ end - start
);
}
- double rt = (double) timeElapsed / (amount - 1);
+ double rt = (double)timeElapsed / (amount - 1);
System.out.printf("Avg RT: %s%n", String.format("%.2f", rt));
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java
index 88264b5..40adec9 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java
@@ -6,23 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.message;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
public class DecodeMessageIdCommond implements SubCommand {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
index 33e6804..aad1644 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
@@ -17,6 +17,17 @@
package org.apache.rocketmq.tools.command.message;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.MixAll;
@@ -25,33 +36,77 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
public class PrintMessageByQueueCommand implements SubCommand {
+ public static long timestampFormat(final String value) {
+ long timestamp = 0;
+ try {
+ timestamp = Long.parseLong(value);
+ } catch (NumberFormatException e) {
+
+ timestamp = UtilAll.parseDate(value, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
+ }
+
+ return timestamp;
+ }
+
+ private static void calculateByTag(final List<MessageExt> msgs, final Map<String, AtomicLong> tagCalmap, final boolean calByTag) {
+ if (!calByTag)
+ return;
+
+ for (MessageExt msg : msgs) {
+ String tag = msg.getTags();
+ if (StringUtils.isNotBlank(tag)) {
+ AtomicLong count = tagCalmap.get(tag);
+ if (count == null) {
+ count = new AtomicLong();
+ tagCalmap.put(tag, count);
+ }
+ count.incrementAndGet();
+ }
+ }
+ }
+
+ private static void printCalculateByTag(final Map<String, AtomicLong> tagCalmap, final boolean calByTag) {
+ if (!calByTag)
+ return;
+
+ List<TagCountBean> list = new ArrayList<TagCountBean>();
+ for (Map.Entry<String, AtomicLong> entry : tagCalmap.entrySet()) {
+ TagCountBean tagBean = new TagCountBean(entry.getKey(), entry.getValue());
+ list.add(tagBean);
+ }
+ Collections.sort(list);
+
+ for (TagCountBean tagCountBean : list) {
+ System.out.printf("Tag: %-30s Count: %s%n", tagCountBean.getTag(), tagCountBean.getCount());
+ }
+ }
+
+ public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printMsg, boolean printBody) {
+ if (!printMsg)
+ return;
+
+ for (MessageExt msg : msgs) {
+ try {
+ System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(),
+ printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY");
+ } catch (UnsupportedEncodingException e) {
+ }
+ }
+ }
+
@Override
public String commandName() {
return "printMsgByQueue";
}
-
@Override
public String commandDesc() {
return "Print Message Detail";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("t", "topic", true, "topic name");
@@ -94,7 +149,6 @@ public class PrintMessageByQueueCommand implements SubCommand {
opt.setRequired(false);
options.addOption(opt);
-
return options;
}
@@ -104,15 +158,15 @@ public class PrintMessageByQueueCommand implements SubCommand {
try {
String charsetName =
- !commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim();
+ !commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim();
boolean printMsg =
- !commandLine.hasOption('p') ? false : Boolean.parseBoolean(commandLine.getOptionValue('p').trim());
+ !commandLine.hasOption('p') ? false : Boolean.parseBoolean(commandLine.getOptionValue('p').trim());
boolean printBody =
- !commandLine.hasOption('d') ? false : Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
+ !commandLine.hasOption('d') ? false : Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
boolean calByTag =
- !commandLine.hasOption('f') ? false : Boolean.parseBoolean(commandLine.getOptionValue('f').trim());
+ !commandLine.hasOption('f') ? false : Boolean.parseBoolean(commandLine.getOptionValue('f').trim());
String subExpression =
- !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim();
+ !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim();
String topic = commandLine.getOptionValue('t').trim();
String brokerName = commandLine.getOptionValue('a').trim();
@@ -165,70 +219,10 @@ public class PrintMessageByQueueCommand implements SubCommand {
}
}
- public static long timestampFormat(final String value) {
- long timestamp = 0;
- try {
- timestamp = Long.parseLong(value);
- } catch (NumberFormatException e) {
-
- timestamp = UtilAll.parseDate(value, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
- }
-
- return timestamp;
- }
-
-
- private static void calculateByTag(final List<MessageExt> msgs, final Map<String, AtomicLong> tagCalmap, final boolean calByTag) {
- if (!calByTag)
- return;
-
- for (MessageExt msg : msgs) {
- String tag = msg.getTags();
- if (StringUtils.isNotBlank(tag)) {
- AtomicLong count = tagCalmap.get(tag);
- if (count == null) {
- count = new AtomicLong();
- tagCalmap.put(tag, count);
- }
- count.incrementAndGet();
- }
- }
- }
-
- private static void printCalculateByTag(final Map<String, AtomicLong> tagCalmap, final boolean calByTag) {
- if (!calByTag)
- return;
-
- List<TagCountBean> list = new ArrayList<TagCountBean>();
- for (Map.Entry<String, AtomicLong> entry : tagCalmap.entrySet()) {
- TagCountBean tagBean = new TagCountBean(entry.getKey(), entry.getValue());
- list.add(tagBean);
- }
- Collections.sort(list);
-
- for (TagCountBean tagCountBean : list) {
- System.out.printf("Tag: %-30s Count: %s%n", tagCountBean.getTag(), tagCountBean.getCount());
- }
- }
-
- public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printMsg, boolean printBody) {
- if (!printMsg)
- return;
-
- for (MessageExt msg : msgs) {
- try {
- System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(),
- printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY");
- } catch (UnsupportedEncodingException e) {
- }
- }
- }
-
static class TagCountBean implements Comparable<TagCountBean> {
private String tag;
private AtomicLong count;
-
public TagCountBean(final String tag, final AtomicLong count) {
this.tag = tag;
this.count = count;
@@ -250,10 +244,9 @@ public class PrintMessageByQueueCommand implements SubCommand {
this.count = count;
}
-
@Override
public int compareTo(final TagCountBean o) {
- return (int) (o.getCount().get() - this.count.get());
+ return (int)(o.getCount().get() - this.count.get());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
index e8c9368..4f87d77 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
@@ -16,6 +16,12 @@
*/
package org.apache.rocketmq.tools.command.message;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.MixAll;
@@ -24,29 +30,41 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import java.io.UnsupportedEncodingException;
-import java.util.List;
-import java.util.Set;
+public class PrintMessageSubCommand implements SubCommand {
+
+ public static long timestampFormat(final String value) {
+ long timestamp = 0;
+ try {
+ timestamp = Long.parseLong(value);
+ } catch (NumberFormatException e) {
+ timestamp = UtilAll.parseDate(value, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
+ }
+ return timestamp;
+ }
-public class PrintMessageSubCommand implements SubCommand {
+ public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printBody) {
+ for (MessageExt msg : msgs) {
+ try {
+ System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(),
+ printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY");
+ } catch (UnsupportedEncodingException e) {
+ //
+ }
+ }
+ }
@Override
public String commandName() {
return "printMsg";
}
-
@Override
public String commandDesc() {
return "Print Message Detail";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("t", "topic", true, "topic name");
@@ -62,20 +80,20 @@ public class PrintMessageSubCommand implements SubCommand {
options.addOption(opt);
opt =
- new Option("b", "beginTimestamp ", true,
- "Begin timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
+ new Option("b", "beginTimestamp ", true,
+ "Begin timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
opt.setRequired(false);
options.addOption(opt);
opt =
- new Option("e", "endTimestamp ", true,
- "End timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
+ new Option("e", "endTimestamp ", true,
+ "End timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
opt.setRequired(false);
options.addOption(opt);
opt =
- new Option("d", "printBody ", true,
- "print body");
+ new Option("d", "printBody ", true,
+ "print body");
opt.setRequired(false);
options.addOption(opt);
@@ -90,13 +108,13 @@ public class PrintMessageSubCommand implements SubCommand {
String topic = commandLine.getOptionValue('t').trim();
String charsetName = //
- !commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim();
+ !commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim();
String subExpression = //
- !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim();
+ !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim();
boolean printBody = //
- !commandLine.hasOption('d') ? true : Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
+ !commandLine.hasOption('d') ? true : Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
consumer.start();
@@ -149,26 +167,4 @@ public class PrintMessageSubCommand implements SubCommand {
consumer.shutdown();
}
}
-
- public static long timestampFormat(final String value) {
- long timestamp = 0;
- try {
- timestamp = Long.parseLong(value);
- } catch (NumberFormatException e) {
- timestamp = UtilAll.parseDate(value, UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
- }
-
- return timestamp;
- }
-
- public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printBody) {
- for (MessageExt msg : msgs) {
- try {
- System.out.printf("MSGID: %s %s BODY: %s%n", msg.getMsgId(), msg.toString(),
- printBody ? new String(msg.getBody(), charsetName) : "NOT PRINT BODY");
- } catch (UnsupportedEncodingException e) {
- //
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
index fded7b6..38f9a72 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
@@ -16,6 +16,15 @@
*/
package org.apache.rocketmq.tools.command.message;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -30,19 +39,140 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.List;
+public class QueryMsgByIdSubCommand implements SubCommand {
+ public static void queryById(final DefaultMQAdminExt admin, final String msgId) throws MQClientException,
+ RemotingException, MQBrokerException, InterruptedException, IOException {
+ MessageExt msg = admin.viewMessage(msgId);
+ printMsg(admin, msg);
+ }
+
+ public static void printMsg(final DefaultMQAdminExt admin, final MessageExt msg) throws IOException {
+ if (msg == null) {
+ System.out.printf("%nMessage not found!");
+ return;
+ }
+
+ String bodyTmpFilePath = createBodyFile(msg);
+ String msgId = msg.getMsgId();
+ if (msg instanceof MessageClientExt) {
+ msgId = ((MessageClientExt)msg).getOffsetMsgId();
+ }
+
+ System.out.printf("%-20s %s%n",
+ "OffsetID:",
+ msgId
+ );
+
+ System.out.printf("%-20s %s%n",
+ "OffsetID:",
+ msgId
+ );
+
+ System.out.printf("%-20s %s%n",
+ "Topic:",
+ msg.getTopic()
+ );
+
+ System.out.printf("%-20s %s%n",
+ "Tags:",
+ "[" + msg.getTags() + "]"
+ );
+
+ System.out.printf("%-20s %s%n",
+ "Keys:",
+ "[" + msg.getKeys() + "]"
+ );
+
+ System.out.printf("%-20s %d%n",
+ "Queue ID:",
+ msg.getQueueId()
+ );
+
+ System.out.printf("%-20s %d%n",
+ "Queue Offset:",
+ msg.getQueueOffset()
+ );
+
+ System.out.printf("%-20s %d%n",
+ "CommitLog Offset:",
+ msg.getCommitLogOffset()
+ );
+
+ System.out.printf("%-20s %d%n",
+ "Reconsume Times:",
+ msg.getReconsumeTimes()
+ );
+
+ System.out.printf("%-20s %s%n",
+ "Born Timestamp:",
+ UtilAll.timeMillisToHumanString2(msg.getBornTimestamp())
+ );
+
+ System.out.printf("%-20s %s%n",
+ "Store Timestamp:",
+ UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp())
+ );
+
+ System.out.printf("%-20s %s%n",
+ "Born Host:",
+ RemotingHelper.parseSocketAddressAddr(msg.getBornHost())
+ );
+
+ System.out.printf("%-20s %s%n",
+ "Store Host:",
+ RemotingHelper.parseSocketAddressAddr(msg.getStoreHost())
+ );
+
+ System.out.printf("%-20s %d%n",
+ "System Flag:",
+ msg.getSysFlag()
+ );
+
+ System.out.printf("%-20s %s%n",
+ "Properties:",
+ msg.getProperties() != null ? msg.getProperties().toString() : ""
+ );
+
+ System.out.printf("%-20s %s%n",
+ "Message Body Path:",
+ bodyTmpFilePath
+ );
+
+ try {
+ List<MessageTrack> mtdList = admin.messageTrackDetail(msg);
+ if (mtdList.isEmpty()) {
+ System.out.printf("%n%nWARN: No Consumer");
+ } else {
+ System.out.printf("%n%n");
+ for (MessageTrack mt : mtdList) {
+ System.out.printf("%s", mt);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static String createBodyFile(MessageExt msg) throws IOException {
+ DataOutputStream dos = null;
+ try {
+ String bodyTmpFilePath = "/tmp/rocketmq/msgbodys";
+ File file = new File(bodyTmpFilePath);
+ if (!file.exists()) {
+ file.mkdirs();
+ }
+ bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId();
+ dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath));
+ dos.write(msg.getBody());
+ return bodyTmpFilePath;
+ } finally {
+ if (dos != null)
+ dos.close();
+ }
+ }
-public class QueryMsgByIdSubCommand implements SubCommand {
@Override
public String commandName() {
return "queryMsgById";
@@ -134,14 +264,15 @@ public class QueryMsgByIdSubCommand implements SubCommand {
private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final String consumerGroup, final String clientId, final String msgId) {
try {
ConsumeMessageDirectlyResult result =
- defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, msgId);
+ defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, msgId);
System.out.printf("%s", result);
} catch (Exception e) {
e.printStackTrace();
}
}
- private void sendMsg(final DefaultMQAdminExt defaultMQAdminExt, final DefaultMQProducer defaultMQProducer, final String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ private void sendMsg(final DefaultMQAdminExt defaultMQAdminExt, final DefaultMQProducer defaultMQProducer,
+ final String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
MessageExt msg = defaultMQAdminExt.viewMessage(msgId);
if (msg != null) {
@@ -156,136 +287,4 @@ public class QueryMsgByIdSubCommand implements SubCommand {
e.printStackTrace();
}
}
-
- public static void queryById(final DefaultMQAdminExt admin, final String msgId) throws MQClientException,
- RemotingException, MQBrokerException, InterruptedException, IOException {
- MessageExt msg = admin.viewMessage(msgId);
-
- printMsg(admin, msg);
- }
-
- public static void printMsg(final DefaultMQAdminExt admin, final MessageExt msg) throws IOException {
- if (msg == null) {
- System.out.printf("%nMessage not found!");
- return;
- }
-
- String bodyTmpFilePath = createBodyFile(msg);
- String msgId = msg.getMsgId();
- if (msg instanceof MessageClientExt) {
- msgId = ((MessageClientExt) msg).getOffsetMsgId();
- }
-
- System.out.printf("%-20s %s%n",
- "OffsetID:",
- msgId
- );
-
- System.out.printf("%-20s %s%n",
- "OffsetID:",
- msgId
- );
-
- System.out.printf("%-20s %s%n",
- "Topic:",
- msg.getTopic()
- );
-
- System.out.printf("%-20s %s%n",
- "Tags:",
- "[" + msg.getTags() + "]"
- );
-
- System.out.printf("%-20s %s%n",
- "Keys:",
- "[" + msg.getKeys() + "]"
- );
-
- System.out.printf("%-20s %d%n",
- "Queue ID:",
- msg.getQueueId()
- );
-
- System.out.printf("%-20s %d%n",
- "Queue Offset:",
- msg.getQueueOffset()
- );
-
- System.out.printf("%-20s %d%n",
- "CommitLog Offset:",
- msg.getCommitLogOffset()
- );
-
- System.out.printf("%-20s %d%n",
- "Reconsume Times:",
- msg.getReconsumeTimes()
- );
-
- System.out.printf("%-20s %s%n",
- "Born Timestamp:",
- UtilAll.timeMillisToHumanString2(msg.getBornTimestamp())
- );
-
- System.out.printf("%-20s %s%n",
- "Store Timestamp:",
- UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp())
- );
-
- System.out.printf("%-20s %s%n",
- "Born Host:",
- RemotingHelper.parseSocketAddressAddr(msg.getBornHost())
- );
-
- System.out.printf("%-20s %s%n",
- "Store Host:",
- RemotingHelper.parseSocketAddressAddr(msg.getStoreHost())
- );
-
- System.out.printf("%-20s %d%n",
- "System Flag:",
- msg.getSysFlag()
- );
-
- System.out.printf("%-20s %s%n",
- "Properties:",
- msg.getProperties() != null ? msg.getProperties().toString() : ""
- );
-
- System.out.printf("%-20s %s%n",
- "Message Body Path:",
- bodyTmpFilePath
- );
-
- try {
- List<MessageTrack> mtdList = admin.messageTrackDetail(msg);
- if (mtdList.isEmpty()) {
- System.out.printf("%n%nWARN: No Consumer");
- } else {
- System.out.printf("%n%n");
- for (MessageTrack mt : mtdList) {
- System.out.printf("%s", mt);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- private static String createBodyFile(MessageExt msg) throws IOException {
- DataOutputStream dos = null;
- try {
- String bodyTmpFilePath = "/tmp/rocketmq/msgbodys";
- File file = new File(bodyTmpFilePath);
- if (!file.exists()) {
- file.mkdirs();
- }
- bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId();
- dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath));
- dos.write(msg.getBody());
- return bodyTmpFilePath;
- } finally {
- if (dos != null)
- dos.close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
index ebfc80e..159bd6e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByKeySubCommand.java
@@ -6,26 +6,25 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.message;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
/**
*
@@ -75,14 +74,14 @@ public class QueryMsgByKeySubCommand implements SubCommand {
}
void queryByKey(final DefaultMQAdminExt admin, final String topic, final String key)
- throws MQClientException, InterruptedException {
+ throws MQClientException, InterruptedException {
admin.start();
QueryResult queryResult = admin.queryMessage(topic, key, 64, 0, Long.MAX_VALUE);
System.out.printf("%-50s %4s %40s%n",
- "#Message ID",
- "#QID",
- "#Offset");
+ "#Message ID",
+ "#QID",
+ "#Offset");
for (MessageExt msg : queryResult.getMessageList()) {
System.out.printf("%-50s %4d %40d%n", msg.getMsgId(), msg.getQueueId(), msg.getQueueOffset());
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java
index fc5fd56..2133636 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByOffsetSubCommand.java
@@ -16,6 +16,9 @@
*/
package org.apache.rocketmq.tools.command.message;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.MixAll;
@@ -23,10 +26,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
public class QueryMsgByOffsetSubCommand implements SubCommand {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
index 4e4bd61..2a6904e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
@@ -16,6 +16,14 @@
*/
package org.apache.rocketmq.tools.command.message;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
@@ -27,125 +35,83 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.List;
public class QueryMsgByUniqueKeySubCommand implements SubCommand {
- @Override
- public String commandName() {
- return "queryMsgByUniqueKey";
- }
-
- @Override
- public String commandDesc() {
- return "Query Message by Unique key";
- }
-
- @Override
- public Options buildCommandlineOptions(Options options) {
- Option opt = new Option("i", "msgId", true, "Message Id");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("g", "consumerGroup", true, "consumer group name");
- opt.setRequired(false);
- options.addOption(opt);
-
- opt = new Option("d", "clientId", true, "The consumer's client id");
- opt.setRequired(false);
- options.addOption(opt);
-
- opt = new Option("t", "topic", true, "The topic of msg");
- opt.setRequired(true);
- options.addOption(opt);
-
- return options;
- }
-
-
public static void queryById(final DefaultMQAdminExt admin, final String topic, final String msgId) throws MQClientException,
- RemotingException, MQBrokerException, InterruptedException, IOException {
+ RemotingException, MQBrokerException, InterruptedException, IOException {
MessageExt msg = admin.viewMessage(topic, msgId);
-
String bodyTmpFilePath = createBodyFile(msg);
System.out.printf("%-20s %s%n",
- "Topic:",
- msg.getTopic()
+ "Topic:",
+ msg.getTopic()
);
System.out.printf("%-20s %s%n",
- "Tags:",
- "[" + msg.getTags() + "]"
+ "Tags:",
+ "[" + msg.getTags() + "]"
);
System.out.printf("%-20s %s%n",
- "Keys:",
- "[" + msg.getKeys() + "]"
+ "Keys:",
+ "[" + msg.getKeys() + "]"
);
System.out.printf("%-20s %d%n",
- "Queue ID:",
- msg.getQueueId()
+ "Queue ID:",
+ msg.getQueueId()
);
System.out.printf("%-20s %d%n",
- "Queue Offset:",
- msg.getQueueOffset()
+ "Queue Offset:",
+ msg.getQueueOffset()
);
System.out.printf("%-20s %d%n",
- "CommitLog Offset:",
- msg.getCommitLogOffset()
+ "CommitLog Offset:",
+ msg.getCommitLogOffset()
);
System.out.printf("%-20s %d%n",
- "Reconsume Times:",
- msg.getReconsumeTimes()
+ "Reconsume Times:",
+ msg.getReconsumeTimes()
);
System.out.printf("%-20s %s%n",
- "Born Timestamp:",
- UtilAll.timeMillisToHumanString2(msg.getBornTimestamp())
+ "Born Timestamp:",
+ UtilAll.timeMillisToHumanString2(msg.getBornTimestamp())
);
System.out.printf("%-20s %s%n",
- "Store Timestamp:",
- UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp())
+ "Store Timestamp:",
+ UtilAll.timeMillisToHumanString2(msg.getStoreTimestamp())
);
System.out.printf("%-20s %s%n",
- "Born Host:",
- RemotingHelper.parseSocketAddressAddr(msg.getBornHost())
+ "Born Host:",
+ RemotingHelper.parseSocketAddressAddr(msg.getBornHost())
);
System.out.printf("%-20s %s%n",
- "Store Host:",
- RemotingHelper.parseSocketAddressAddr(msg.getStoreHost())
+ "Store Host:",
+ RemotingHelper.parseSocketAddressAddr(msg.getStoreHost())
);
System.out.printf("%-20s %d%n",
- "System Flag:",
- msg.getSysFlag()
+ "System Flag:",
+ msg.getSysFlag()
);
System.out.printf("%-20s %s%n",
- "Properties:",
- msg.getProperties() != null ? msg.getProperties().toString() : ""
+ "Properties:",
+ msg.getProperties() != null ? msg.getProperties().toString() : ""
);
System.out.printf("%-20s %s%n",
- "Message Body Path:",
- bodyTmpFilePath
+ "Message Body Path:",
+ bodyTmpFilePath
);
try {
@@ -163,6 +129,54 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
}
}
+ private static String createBodyFile(MessageExt msg) throws IOException {
+ DataOutputStream dos = null;
+ try {
+ String bodyTmpFilePath = "/tmp/rocketmq/msgbodys";
+ File file = new File(bodyTmpFilePath);
+ if (!file.exists()) {
+ file.mkdirs();
+ }
+ bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId();
+ dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath));
+ dos.write(msg.getBody());
+ return bodyTmpFilePath;
+ } finally {
+ if (dos != null)
+ dos.close();
+ }
+ }
+
+ @Override
+ public String commandName() {
+ return "queryMsgByUniqueKey";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Query Message by Unique key";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("i", "msgId", true, "Message Id");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("g", "consumerGroup", true, "consumer group name");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("d", "clientId", true, "The consumer's client id");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("t", "topic", true, "The topic of msg");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return options;
+ }
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
@@ -178,7 +192,7 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
final String consumerGroup = commandLine.getOptionValue('g').trim();
final String clientId = commandLine.getOptionValue('d').trim();
ConsumeMessageDirectlyResult result =
- defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
+ defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
System.out.printf("%s", result);
} else {
queryById(defaultMQAdminExt, topic, msgId);
@@ -189,23 +203,4 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand {
defaultMQAdminExt.shutdown();
}
}
-
-
- private static String createBodyFile(MessageExt msg) throws IOException {
- DataOutputStream dos = null;
- try {
- String bodyTmpFilePath = "/tmp/rocketmq/msgbodys";
- File file = new File(bodyTmpFilePath);
- if (!file.exists()) {
- file.mkdirs();
- }
- bodyTmpFilePath = bodyTmpFilePath + "/" + msg.getMsgId();
- dos = new DataOutputStream(new FileOutputStream(bodyTmpFilePath));
- dos.write(msg.getBody());
- return bodyTmpFilePath;
- } finally {
- if (dos != null)
- dos.close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java
index ee923c6..e25c61f 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/Store.java
@@ -17,19 +17,18 @@
package org.apache.rocketmq.tools.command.message;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.store.ConsumeQueue;
-import org.apache.rocketmq.store.MappedFile;
-import org.apache.rocketmq.store.MappedFileQueue;
-import org.apache.rocketmq.store.SelectMappedBufferResult;
-import org.apache.rocketmq.store.config.StorePathConfigHelper;
-
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.store.ConsumeQueue;
+import org.apache.rocketmq.store.MappedFile;
+import org.apache.rocketmq.store.MappedFileQueue;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
public class Store {
public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
@@ -49,7 +48,7 @@ public class Store {
this.lSize = lSize;
mapedFileQueue = new MappedFileQueue(cStorePath, cSize, null);
consumeQueueTable =
- new ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>();
+ new ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>();
}
public boolean load() {
@@ -75,11 +74,11 @@ public class Store {
for (File fileQueueId : fileQueueIdList) {
int queueId = Integer.parseInt(fileQueueId.getName());
ConsumeQueue logic = new ConsumeQueue(
- topic,
- queueId,
- StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),
- lSize,
- null);
+ topic,
+ queueId,
+ StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),
+ lSize,
+ null);
this.putConsumeQueue(topic, queueId, logic);
if (!logic.load()) {
return false;
@@ -92,7 +91,6 @@ public class Store {
return true;
}
-
private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) {
ConcurrentHashMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic);
if (null == map) {
@@ -181,16 +179,14 @@ public class Store {
e.printStackTrace();
}
-
Date storeTime = new Date(storeTimestamp);
-
long currentPhyOffset = startOffset + position;
if (physicOffset != currentPhyOffset) {
System.out.printf(storeTime
- + " [fetal error] physicOffset != currentPhyOffset. position=" + position
- + ", msgCount=" + msgCount + ", physicOffset=" + physicOffset
- + ", currentPhyOffset=" + currentPhyOffset);
+ + " [fetal error] physicOffset != currentPhyOffset. position=" + position
+ + ", msgCount=" + msgCount + ", physicOffset=" + physicOffset
+ + ", currentPhyOffset=" + currentPhyOffset);
errorCount++;
if (!openAll) {
success = false;
@@ -205,8 +201,8 @@ public class Store {
int sizePy = smb.getByteBuffer().getInt();
if (physicOffset != offsetPy) {
System.out.printf(storeTime + " [fetal error] physicOffset != offsetPy. position="
- + position + ", msgCount=" + msgCount + ", physicOffset=" + physicOffset
- + ", offsetPy=" + offsetPy);
+ + position + ", msgCount=" + msgCount + ", physicOffset=" + physicOffset
+ + ", offsetPy=" + offsetPy);
errorCount++;
if (!openAll) {
success = false;
@@ -215,8 +211,8 @@ public class Store {
}
if (totalSize != sizePy) {
System.out.printf(storeTime + " [fetal error] totalSize != sizePy. position="
- + position + ", msgCount=" + msgCount + ", totalSize=" + totalSize
- + ", sizePy=" + sizePy);
+ + position + ", msgCount=" + msgCount + ", totalSize=" + totalSize
+ + ", sizePy=" + sizePy);
errorCount++;
if (!openAll) {
success = false;
@@ -233,7 +229,7 @@ public class Store {
}
System.out.printf("end travel " + mapedFile.getFileName() + ", total msg=" + msgCount
- + ", error count=" + errorCount + ", cost:" + (System.currentTimeMillis() - startTime));
+ + ", error count=" + errorCount + ", cost:" + (System.currentTimeMillis() - startTime));
}
System.out.printf("travel " + (success ? "ok" : "fail"));
@@ -243,7 +239,7 @@ public class Store {
ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
if (null == map) {
ConcurrentHashMap<Integer, ConsumeQueue> newMap =
- new ConcurrentHashMap<Integer, ConsumeQueue>(128);
+ new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
@@ -254,11 +250,11 @@ public class Store {
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
- topic,
- queueId,
- StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),
- lSize,
- null);
+ topic,
+ queueId,
+ StorePathConfigHelper.getStorePathConsumeQueue(lStorePath),
+ lSize,
+ null);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java
index 0ae8f44..b4fb7dd 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/DeleteKvConfigCommand.java
@@ -6,23 +6,22 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.namesrv;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
/**
*
@@ -34,13 +33,11 @@ public class DeleteKvConfigCommand implements SubCommand {
return "deleteKvConfig";
}
-
@Override
public String commandDesc() {
return "Delete KV config.";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("s", "namespace", true, "set the namespace");
@@ -53,7 +50,6 @@ public class DeleteKvConfigCommand implements SubCommand {
return options;
}
-
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java
index b2c95d3..f4c95d3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java
@@ -6,27 +6,26 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.namesrv;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
public class GetNamesrvConfigCommand implements SubCommand {
@@ -67,7 +66,7 @@ public class GetNamesrvConfigCommand implements SubCommand {
for (String server : nameServerConfigs.keySet()) {
System.out.printf("============%s============\n",
- server);
+ server);
for (Object key : nameServerConfigs.get(server).keySet()) {
System.out.printf("%-50s= %s\n", key, nameServerConfigs.get(server).get(key));
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java
index a2d4f43..9d5f7a9 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommand.java
@@ -6,23 +6,22 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.namesrv;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
/**
*
@@ -34,13 +33,11 @@ public class UpdateKvConfigCommand implements SubCommand {
return "updateKvConfig";
}
-
@Override
public String commandDesc() {
return "Create or update KV config.";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("s", "namespace", true, "set the namespace");
@@ -57,7 +54,6 @@ public class UpdateKvConfigCommand implements SubCommand {
return options;
}
-
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java
index c6517d6..807636c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/UpdateNamesrvConfigCommand.java
@@ -6,27 +6,26 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.namesrv;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
public class UpdateNamesrvConfigCommand implements SubCommand {
@Override
@@ -80,7 +79,7 @@ public class UpdateNamesrvConfigCommand implements SubCommand {
defaultMQAdminExt.updateNameServerConfig(properties, serverList);
System.out.printf("update name server config success!%s\n%s : %s\n",
- serverList == null ? "" : serverList, key, value);
+ serverList == null ? "" : serverList, key, value);
return;
} catch (Exception e) {
e.printStackTrace();