You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:22 UTC
[15/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java
index 66f7159..86e7848 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommand.java
@@ -6,24 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.namesrv;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.command.SubCommand;
+import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-
-import java.util.List;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
public class WipeWritePermSubCommand implements SubCommand {
@@ -32,13 +31,11 @@ public class WipeWritePermSubCommand implements SubCommand {
return "wipeWritePerm";
}
-
@Override
public String commandDesc() {
return "Wipe write perm of broker in all name server";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("b", "brokerName", true, "broker name");
@@ -47,7 +44,6 @@ public class WipeWritePermSubCommand implements SubCommand {
return options;
}
-
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
@@ -63,14 +59,14 @@ public class WipeWritePermSubCommand implements SubCommand {
try {
int wipeTopicCount = defaultMQAdminExt.wipeWritePermOfBroker(namesrvAddr, brokerName);
System.out.printf("wipe write perm of broker[%s] in name server[%s] OK, %d%n",
- brokerName,
- namesrvAddr,
- wipeTopicCount
+ brokerName,
+ namesrvAddr,
+ wipeTopicCount
);
} catch (Exception e) {
System.out.printf("wipe write perm of broker[%s] in name server[%s] Failed%n",
- brokerName,
- namesrvAddr
+ brokerName,
+ namesrvAddr
);
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java
index 3cb7e3f..fe239aa 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/CloneGroupOffsetCommand.java
@@ -6,17 +6,21 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.offset;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.BrokerData;
@@ -24,11 +28,6 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Set;
public class CloneGroupOffsetCommand implements SubCommand {
@Override
@@ -94,7 +93,7 @@ public class CloneGroupOffsetCommand implements SubCommand {
}
}
System.out.printf("clone group offset success. srcGroup[%s], destGroup=[%s], topic[%s]",
- srcGroup, destGroup, topic);
+ srcGroup, destGroup, topic);
} catch (Exception e) {
e.printStackTrace();
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommand.java
index 1623f52..68b62e1 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommand.java
@@ -17,16 +17,15 @@
package org.apache.rocketmq.tools.command.offset;
+import java.util.Map;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Map;
public class GetConsumerStatusCommand implements SubCommand {
@Override
@@ -70,15 +69,15 @@ public class GetConsumerStatusCommand implements SubCommand {
defaultMQAdminExt.start();
Map<String, Map<MessageQueue, Long>> consumerStatusTable =
- defaultMQAdminExt.getConsumeStatus(topic, group, originClientId);
+ defaultMQAdminExt.getConsumeStatus(topic, group, originClientId);
System.out.printf("get consumer status from client. group=%s, topic=%s, originClientId=%s%n",
- group, topic, originClientId);
+ group, topic, originClientId);
System.out.printf("%-50s %-15s %-15s %-20s%n",
- "#clientId",
- "#brokerName",
- "#queueId",
- "#offset");
+ "#clientId",
+ "#brokerName",
+ "#queueId",
+ "#offset");
for (Map.Entry<String, Map<MessageQueue, Long>> entry : consumerStatusTable.entrySet()) {
String clientId = entry.getKey();
@@ -86,10 +85,10 @@ public class GetConsumerStatusCommand implements SubCommand {
for (Map.Entry<MessageQueue, Long> entry1 : mqTable.entrySet()) {
MessageQueue mq = entry1.getKey();
System.out.printf("%-50s %-15s %-15d %-20d%n",
- UtilAll.frontStringAtLeast(clientId, 50),
- mq.getBrokerName(),
- mq.getQueueId(),
- mqTable.get(mq));
+ UtilAll.frontStringAtLeast(clientId, 50),
+ mq.getBrokerName(),
+ mq.getQueueId(),
+ mqTable.get(mq));
}
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
index 5eb30b5..e07a7c8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
@@ -17,6 +17,12 @@
package org.apache.rocketmq.tools.command.offset;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -25,22 +31,14 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-
-import java.util.Iterator;
-import java.util.Map;
-
public class ResetOffsetByTimeCommand implements SubCommand {
public static void main(String[] args) {
ResetOffsetByTimeCommand cmd = new ResetOffsetByTimeCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[]{"-t Jodie_rest_test", "-g CID_Jodie_rest_test", "-s -1", "-f true"};
+ String[] subargs = new String[] {"-t Jodie_rest_test", "-g CID_Jodie_rest_test", "-s -1", "-f true"};
final CommandLine commandLine =
- ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+ ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
@@ -120,20 +118,20 @@ public class ResetOffsetByTimeCommand implements SubCommand {
}
System.out.printf("rollback consumer offset by specified group[%s], topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n",
- group, topic, force, timeStampStr, timestamp);
+ group, topic, force, timeStampStr, timestamp);
System.out.printf("%-40s %-40s %-40s%n",
- "#brokerName",
- "#queueId",
- "#offset");
+ "#brokerName",
+ "#queueId",
+ "#offset");
Iterator<Map.Entry<MessageQueue, Long>> iterator = offsetTable.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<MessageQueue, Long> entry = iterator.next();
System.out.printf("%-40s %-40d %-40d%n",
- UtilAll.frontStringAtLeast(entry.getKey().getBrokerName(), 32),
- entry.getKey().getQueueId(),
- entry.getValue());
+ UtilAll.frontStringAtLeast(entry.getKey().getBrokerName(), 32),
+ entry.getKey().getQueueId(),
+ entry.getValue());
}
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
index 0f15f69..9b30474 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
@@ -6,17 +6,22 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.offset;
+import java.util.Date;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
@@ -25,19 +30,40 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Date;
-import java.util.List;
-
/**
*
*
*/
public class ResetOffsetByTimeOldCommand implements SubCommand {
+ public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String consumerGroup, String topic, long timestamp, boolean force,
+ String timeStampStr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ List<RollbackStats> rollbackStatsList = defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
+ System.out.printf(
+ "rollback consumer offset by specified consumerGroup[%s], topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n",
+ consumerGroup, topic, force, timeStampStr, timestamp);
+
+ System.out.printf("%-20s %-20s %-20s %-20s %-20s %-20s%n",
+ "#brokerName",
+ "#queueId",
+ "#brokerOffset",
+ "#consumerOffset",
+ "#timestampOffset",
+ "#rollbackOffset"
+ );
+
+ for (RollbackStats rollbackStats : rollbackStatsList) {
+ System.out.printf("%-20s %-20d %-20d %-20d %-20d %-20d%n",
+ UtilAll.frontStringAtLeast(rollbackStats.getBrokerName(), 32),
+ rollbackStats.getQueueId(),
+ rollbackStats.getBrokerOffset(),
+ rollbackStats.getConsumerOffset(),
+ rollbackStats.getTimestampOffset(),
+ rollbackStats.getRollbackOffset()
+ );
+ }
+ }
+
@Override
public String commandName() {
return "resetOffsetByTimeOld";
@@ -104,32 +130,4 @@ public class ResetOffsetByTimeOldCommand implements SubCommand {
defaultMQAdminExt.shutdown();
}
}
-
- public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String consumerGroup, String topic, long timestamp, boolean force,
- String timeStampStr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- List<RollbackStats> rollbackStatsList = defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
- System.out.printf(
- "rollback consumer offset by specified consumerGroup[%s], topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n",
- consumerGroup, topic, force, timeStampStr, timestamp);
-
- System.out.printf("%-20s %-20s %-20s %-20s %-20s %-20s%n",
- "#brokerName",
- "#queueId",
- "#brokerOffset",
- "#consumerOffset",
- "#timestampOffset",
- "#rollbackOffset"
- );
-
- for (RollbackStats rollbackStats : rollbackStatsList) {
- System.out.printf("%-20s %-20d %-20d %-20d %-20d %-20d%n",
- UtilAll.frontStringAtLeast(rollbackStats.getBrokerName(), 32),
- rollbackStats.getQueueId(),
- rollbackStats.getBrokerOffset(),
- rollbackStats.getConsumerOffset(),
- rollbackStats.getTimestampOffset(),
- rollbackStats.getRollbackOffset()
- );
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java
index 90a361c..81a7f78 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/stats/StatsAllSubCommand.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.tools.command.stats;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
@@ -32,82 +35,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
public class StatsAllSubCommand implements SubCommand {
- @Override
- public String commandName() {
- return "statsAll";
- }
-
- @Override
- public String commandDesc() {
- return "Topic and Consumer tps stats";
- }
-
- @Override
- public Options buildCommandlineOptions(Options options) {
- Option opt = new Option("a", "activeTopic", false, "print active topic only");
- opt.setRequired(false);
- options.addOption(opt);
-
- opt = new Option("t", "topic", true, "print select topic only");
- opt.setRequired(false);
- options.addOption(opt);
-
- return options;
- }
-
- @Override
- public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
- DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
-
- defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
-
- try {
- defaultMQAdminExt.start();
-
- TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
-
- System.out.printf("%-32s %-32s %12s %11s %11s %14s %14s%n",
- "#Topic",
- "#Consumer Group",
- "#Accumulation",
- "#InTPS",
- "#OutTPS",
- "#InMsg24Hour",
- "#OutMsg24Hour"
- );
-
- boolean activeTopic = commandLine.hasOption('a');
- String selectTopic = commandLine.getOptionValue('t');
-
- for (String topic : topicList.getTopicList()) {
- if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
- continue;
- }
-
- if (selectTopic != null && selectTopic != "" && !topic.equals(selectTopic)) {
- continue;
- }
-
- try {
- printTopicDetail(defaultMQAdminExt, topic, activeTopic);
- } catch (Exception e) {
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- defaultMQAdminExt.shutdown();
- }
- }
-
public static void printTopicDetail(final DefaultMQAdminExt admin, final String topic, final boolean activeTopic)
- throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
TopicRouteData topicRouteData = admin.examineTopicRouteInfo(topic);
GroupList groupList = admin.queryTopicConsumeByWho(topic);
@@ -116,7 +47,6 @@ public class StatsAllSubCommand implements SubCommand {
long inMsgCntToday = 0;
-
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
@@ -161,16 +91,16 @@ public class StatsAllSubCommand implements SubCommand {
}
if (!activeTopic || (inMsgCntToday > 0) ||
- (outMsgCntToday > 0)) {
+ (outMsgCntToday > 0)) {
System.out.printf("%-32s %-32s %12d %11.2f %11.2f %14d %14d%n",
- UtilAll.frontStringAtLeast(topic, 32),
- UtilAll.frontStringAtLeast(group, 32),
- accumulate,
- inTPS,
- outTPS,
- inMsgCntToday,
- outMsgCntToday
+ UtilAll.frontStringAtLeast(topic, 32),
+ UtilAll.frontStringAtLeast(group, 32),
+ accumulate,
+ inTPS,
+ outTPS,
+ inMsgCntToday,
+ outMsgCntToday
);
}
}
@@ -178,13 +108,13 @@ public class StatsAllSubCommand implements SubCommand {
if (!activeTopic || (inMsgCntToday > 0)) {
System.out.printf("%-32s %-32s %12d %11.2f %11s %14d %14s%n",
- UtilAll.frontStringAtLeast(topic, 32),
- "",
- 0,
- inTPS,
- "",
- inMsgCntToday,
- "NO_CONSUMER"
+ UtilAll.frontStringAtLeast(topic, 32),
+ "",
+ 0,
+ inTPS,
+ "",
+ inMsgCntToday,
+ "NO_CONSUMER"
);
}
}
@@ -205,4 +135,72 @@ public class StatsAllSubCommand implements SubCommand {
return 0;
}
+
+ @Override
+ public String commandName() {
+ return "statsAll";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Topic and Consumer tps stats";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("a", "activeTopic", false, "print active topic only");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("t", "topic", true, "print select topic only");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ defaultMQAdminExt.start();
+
+ TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
+
+ System.out.printf("%-32s %-32s %12s %11s %11s %14s %14s%n",
+ "#Topic",
+ "#Consumer Group",
+ "#Accumulation",
+ "#InTPS",
+ "#OutTPS",
+ "#InMsg24Hour",
+ "#OutMsg24Hour"
+ );
+
+ boolean activeTopic = commandLine.hasOption('a');
+ String selectTopic = commandLine.getOptionValue('t');
+
+ for (String topic : topicList.getTopicList()) {
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+ continue;
+ }
+
+ if (selectTopic != null && selectTopic != "" && !topic.equals(selectTopic)) {
+ continue;
+ }
+
+ try {
+ printTopicDetail(defaultMQAdminExt, topic, activeTopic);
+ } catch (Exception e) {
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
index 09d8011..709aada 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.tools.command.topic;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -24,15 +31,6 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
public class AllocateMQSubCommand implements SubCommand {
@Override
@@ -40,13 +38,11 @@ public class AllocateMQSubCommand implements SubCommand {
return "allocateMQ";
}
-
@Override
public String commandDesc() {
return "Allocate MQ";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("t", "topic", true, "topic name");
@@ -60,7 +56,6 @@ public class AllocateMQSubCommand implements SubCommand {
return options;
}
-
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
@@ -81,7 +76,6 @@ public class AllocateMQSubCommand implements SubCommand {
final AllocateMessageQueueAveragely averagely = new AllocateMessageQueueAveragely();
-
RebalanceResult rr = new RebalanceResult();
for (String i : ipList) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java
index 0749e36..69cbc99 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/DeleteTopicSubCommand.java
@@ -6,16 +6,22 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.RPCHook;
@@ -24,32 +30,41 @@ import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
/**
*
*
*/
public class DeleteTopicSubCommand implements SubCommand {
+ public static void deleteTopic(final DefaultMQAdminExt adminExt,
+ final String clusterName,
+ final String topic
+ ) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
+
+ Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
+ adminExt.deleteTopicInBroker(masterSet, topic);
+ System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName);
+
+ Set<String> nameServerSet = null;
+ if (adminExt.getNamesrvAddr() != null) {
+ String[] ns = adminExt.getNamesrvAddr().trim().split(";");
+ nameServerSet = new HashSet(Arrays.asList(ns));
+ }
+
+ adminExt.deleteTopicInNameServer(nameServerSet, topic);
+ System.out.printf("delete topic [%s] from NameServer success.%n", topic);
+ }
+
@Override
public String commandName() {
return "deleteTopic";
}
-
@Override
public String commandDesc() {
return "Delete topic from broker and NameServer.";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("t", "topic", true, "topic name");
@@ -63,29 +78,6 @@ public class DeleteTopicSubCommand implements SubCommand {
return options;
}
-
- public static void deleteTopic(final DefaultMQAdminExt adminExt,
- final String clusterName,
- final String topic
- ) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
-
- Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
- adminExt.deleteTopicInBroker(masterSet, topic);
- System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName);
-
-
- Set<String> nameServerSet = null;
- if (adminExt.getNamesrvAddr() != null) {
- String[] ns = adminExt.getNamesrvAddr().trim().split(";");
- nameServerSet = new HashSet(Arrays.asList(ns));
- }
-
-
- adminExt.deleteTopicInNameServer(nameServerSet, topic);
- System.out.printf("delete topic [%s] from NameServer success.%n", topic);
- }
-
-
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RebalanceResult.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RebalanceResult.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RebalanceResult.java
index 9f6c0b0..6e6e4ff 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RebalanceResult.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RebalanceResult.java
@@ -6,22 +6,21 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
-import org.apache.rocketmq.common.message.MessageQueue;
-
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.common.message.MessageQueue;
public class RebalanceResult {
private Map<String/*ip*/, List<MessageQueue>> result = new HashMap<String, List<MessageQueue>>();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java
index 2bdedd6..6a267a5 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicClusterSubCommand.java
@@ -16,15 +16,13 @@
*/
package org.apache.rocketmq.tools.command.topic;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.command.SubCommand;
+import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-
-import java.util.Set;
-
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
/**
*
@@ -37,13 +35,11 @@ public class TopicClusterSubCommand implements SubCommand {
return "topicClusterList";
}
-
@Override
public String commandDesc() {
return "get cluster info for topic";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("t", "topic", true, "topic name");
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java
index 42184fb..5e23a96 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java
@@ -16,6 +16,12 @@
*/
package org.apache.rocketmq.tools.command.topic;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
@@ -28,14 +34,6 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-
/**
*
@@ -72,15 +70,15 @@ public class TopicListSubCommand implements SubCommand {
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
System.out.printf("%-20s %-48s %-48s%n",
- "#Cluster Name",
- "#Topic",
- "#Consumer Group"
+ "#Cluster Name",
+ "#Topic",
+ "#Consumer Group"
);
TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
for (String topic : topicList.getTopicList()) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)
- || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
+ || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
continue;
}
@@ -89,7 +87,7 @@ public class TopicListSubCommand implements SubCommand {
try {
clusterName =
- this.findTopicBelongToWhichCluster(topic, clusterInfo, defaultMQAdminExt);
+ this.findTopicBelongToWhichCluster(topic, clusterInfo, defaultMQAdminExt);
groupList = defaultMQAdminExt.queryTopicConsumeByWho(topic);
} catch (Exception e) {
}
@@ -101,9 +99,9 @@ public class TopicListSubCommand implements SubCommand {
for (String group : groupList.getGroupList()) {
System.out.printf("%-20s %-48s %-48s%n",
- UtilAll.frontStringAtLeast(clusterName, 20),
- UtilAll.frontStringAtLeast(topic, 48),
- UtilAll.frontStringAtLeast(group, 48)
+ UtilAll.frontStringAtLeast(clusterName, 20),
+ UtilAll.frontStringAtLeast(topic, 48),
+ UtilAll.frontStringAtLeast(group, 48)
);
}
}
@@ -121,8 +119,8 @@ public class TopicListSubCommand implements SubCommand {
}
private String findTopicBelongToWhichCluster(final String topic, final ClusterInfo clusterInfo,
- final DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQClientException,
- InterruptedException {
+ final DefaultMQAdminExt defaultMQAdminExt) throws RemotingException, MQClientException,
+ InterruptedException {
TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
BrokerData brokerData = topicRouteData.getBrokerDatas().get(0);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java
index 6f89b22..b7a180f 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java
@@ -16,14 +16,13 @@
*/
package org.apache.rocketmq.tools.command.topic;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
/**
*
@@ -36,13 +35,11 @@ public class TopicRouteSubCommand implements SubCommand {
return "topicRoute";
}
-
@Override
public String commandDesc() {
return "Examine topic route info";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("t", "topic", true, "topic name");
@@ -52,7 +49,6 @@ public class TopicRouteSubCommand implements SubCommand {
return options;
}
-
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
index 73b98c9..76d9cbc 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
@@ -6,16 +6,22 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
@@ -23,14 +29,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
/**
*
@@ -43,13 +41,11 @@ public class TopicStatusSubCommand implements SubCommand {
return "topicStatus";
}
-
@Override
public String commandDesc() {
return "Examine topic Status info";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("t", "topic", true, "topic name");
@@ -58,7 +54,6 @@ public class TopicStatusSubCommand implements SubCommand {
return options;
}
-
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
@@ -75,11 +70,11 @@ public class TopicStatusSubCommand implements SubCommand {
Collections.sort(mqList);
System.out.printf("%-32s %-4s %-20s %-20s %s%n",
- "#Broker Name",
- "#QID",
- "#Min Offset",
- "#Max Offset",
- "#Last Updated"
+ "#Broker Name",
+ "#QID",
+ "#Min Offset",
+ "#Max Offset",
+ "#Last Updated"
);
for (MessageQueue mq : mqList) {
@@ -91,11 +86,11 @@ public class TopicStatusSubCommand implements SubCommand {
}
System.out.printf("%-32s %-4d %-20d %-20d %s%n",
- UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
- mq.getQueueId(),
- topicOffset.getMinOffset(),
- topicOffset.getMaxOffset(),
- humanTimestamp
+ UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
+ mq.getQueueId(),
+ topicOffset.getMinOffset(),
+ topicOffset.getMaxOffset(),
+ humanTimestamp
);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java
index 4cc88eb..f9f4f1f 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateOrderConfCommand.java
@@ -6,26 +6,25 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.topic;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
/**
*
@@ -38,13 +37,11 @@ public class UpdateOrderConfCommand implements SubCommand {
return "updateOrderConf";
}
-
@Override
public String commandDesc() {
return "Create or update or delete order conf";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("t", "topic", true, "topic name");
@@ -62,7 +59,6 @@ public class UpdateOrderConfCommand implements SubCommand {
return options;
}
-
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
@@ -76,7 +72,7 @@ public class UpdateOrderConfCommand implements SubCommand {
defaultMQAdminExt.start();
String orderConf =
- defaultMQAdminExt.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, topic);
+ defaultMQAdminExt.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, topic);
System.out.printf("get orderConf success. topic=[%s], orderConf=[%s] ", topic, orderConf);
return;
@@ -93,7 +89,7 @@ public class UpdateOrderConfCommand implements SubCommand {
defaultMQAdminExt.createOrUpdateOrderConf(topic, orderConf, true);
System.out.printf("update orderConf success. topic=[%s], orderConf=[%s]", topic,
- orderConf.toString());
+ orderConf.toString());
return;
} else if ("delete".equals(type)) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java
index cd119a0..fb7ab21 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicPermSubCommand.java
@@ -16,6 +16,11 @@
*/
package org.apache.rocketmq.tools.command.topic;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -24,13 +29,6 @@ import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.List;
-import java.util.Set;
-
public class UpdateTopicPermSubCommand implements SubCommand {
@@ -39,13 +37,11 @@ public class UpdateTopicPermSubCommand implements SubCommand {
return "updateTopicPerm";
}
-
@Override
public String commandDesc() {
return "Update topic perm";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
@@ -67,7 +63,6 @@ public class UpdateTopicPermSubCommand implements SubCommand {
return options;
}
-
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
@@ -106,7 +101,7 @@ public class UpdateTopicPermSubCommand implements SubCommand {
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
Set<String> masterSet =
- CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
System.out.printf("update topic perm from %s to %s in %s success.%n", oldPerm, perm, addr);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
index 25dd1f3..d4437b1 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateTopicSubCommand.java
@@ -16,6 +16,10 @@
*/
package org.apache.rocketmq.tools.command.topic;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.RPCHook;
@@ -23,12 +27,6 @@ import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Set;
-
public class UpdateTopicSubCommand implements SubCommand {
@@ -37,13 +35,11 @@ public class UpdateTopicSubCommand implements SubCommand {
return "updateTopic";
}
-
@Override
public String commandDesc() {
return "Update or create topic";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
@@ -85,7 +81,6 @@ public class UpdateTopicSubCommand implements SubCommand {
return options;
}
-
@Override
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
@@ -142,7 +137,7 @@ public class UpdateTopicSubCommand implements SubCommand {
String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
System.out.printf(String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",
- isOrder, orderConf.toString()));
+ isOrder, orderConf.toString()));
}
System.out.printf("create topic to %s success.%n", addr);
System.out.printf("%s", topicConfig);
@@ -154,7 +149,7 @@ public class UpdateTopicSubCommand implements SubCommand {
defaultMQAdminExt.start();
Set<String> masterSet =
- CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
System.out.printf("create topic to %s success.%n", addr);
@@ -162,18 +157,18 @@ public class UpdateTopicSubCommand implements SubCommand {
if (isOrder) {
Set<String> brokerNameSet =
- CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
+ CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
StringBuilder orderConf = new StringBuilder();
String splitor = "";
for (String s : brokerNameSet) {
orderConf.append(splitor).append(s).append(":")
- .append(topicConfig.getWriteQueueNums());
+ .append(topicConfig.getWriteQueueNums());
splitor = ";";
}
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
- orderConf.toString(), true);
+ orderConf.toString(), true);
System.out.printf(String.format("set cluster orderConf. isOrder=%s, orderConf=[%s]",
- isOrder, orderConf.toString()));
+ isOrder, orderConf.toString()));
}
System.out.printf("%s", topicConfig);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListener.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListener.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListener.java
index 9bcb2df..63b81f8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListener.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/DefaultMonitorListener.java
@@ -6,60 +6,52 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.monitor;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import org.slf4j.Logger;
-
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.TreeMap;
-
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.slf4j.Logger;
public class DefaultMonitorListener implements MonitorListener {
private final static String LOG_PREFIX = "[MONITOR] ";
private final static String LOG_NOTIFY = LOG_PREFIX + " [NOTIFY] ";
private final Logger log = ClientLogger.getLog();
-
public DefaultMonitorListener() {
}
-
@Override
public void beginRound() {
log.info(LOG_PREFIX + "=========================================beginRound");
}
-
@Override
public void reportUndoneMsgs(UndoneMsgs undoneMsgs) {
log.info(String.format(LOG_PREFIX + "reportUndoneMsgs: %s", undoneMsgs));
}
-
@Override
public void reportFailedMsgs(FailedMsgs failedMsgs) {
log.info(String.format(LOG_PREFIX + "reportFailedMsgs: %s", failedMsgs));
}
-
@Override
public void reportDeleteMsgsEvent(DeleteMsgsEvent deleteMsgsEvent) {
log.info(String.format(LOG_PREFIX + "reportDeleteMsgsEvent: %s", deleteMsgsEvent));
}
-
@Override
public void reportConsumerRunningInfo(TreeMap<String, ConsumerRunningInfo> criTable) {
@@ -67,12 +59,11 @@ public class DefaultMonitorListener implements MonitorListener {
boolean result = ConsumerRunningInfo.analyzeSubscription(criTable);
if (!result) {
log.info(String.format(LOG_NOTIFY
- + "reportConsumerRunningInfo: ConsumerGroup: %s, Subscription different", criTable
- .firstEntry().getValue().getProperties().getProperty("consumerGroup")));
+ + "reportConsumerRunningInfo: ConsumerGroup: %s, Subscription different", criTable
+ .firstEntry().getValue().getProperties().getProperty("consumerGroup")));
}
}
-
{
Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator();
while (it.hasNext()) {
@@ -80,16 +71,15 @@ public class DefaultMonitorListener implements MonitorListener {
String result = ConsumerRunningInfo.analyzeProcessQueue(next.getKey(), next.getValue());
if (!result.isEmpty()) {
log.info(String.format(LOG_NOTIFY
- + "reportConsumerRunningInfo: ConsumerGroup: %s, ClientId: %s, %s",
- criTable.firstEntry().getValue().getProperties().getProperty("consumerGroup"),
- next.getKey(),
- result));
+ + "reportConsumerRunningInfo: ConsumerGroup: %s, ClientId: %s, %s",
+ criTable.firstEntry().getValue().getProperties().getProperty("consumerGroup"),
+ next.getKey(),
+ result));
}
}
}
}
-
@Override
public void endRound() {
log.info(LOG_PREFIX + "=========================================endRound");
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/DeleteMsgsEvent.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/DeleteMsgsEvent.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/DeleteMsgsEvent.java
index 5db446f..3270286 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/DeleteMsgsEvent.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/DeleteMsgsEvent.java
@@ -6,48 +6,42 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.monitor;
import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
-
public class DeleteMsgsEvent {
private OffsetMovedEvent offsetMovedEvent;
private long eventTimestamp;
-
public OffsetMovedEvent getOffsetMovedEvent() {
return offsetMovedEvent;
}
-
public void setOffsetMovedEvent(OffsetMovedEvent offsetMovedEvent) {
this.offsetMovedEvent = offsetMovedEvent;
}
-
public long getEventTimestamp() {
return eventTimestamp;
}
-
public void setEventTimestamp(long eventTimestamp) {
this.eventTimestamp = eventTimestamp;
}
-
@Override
public String toString() {
return "DeleteMsgsEvent [offsetMovedEvent=" + offsetMovedEvent + ", eventTimestamp=" + eventTimestamp
- + "]";
+ + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/FailedMsgs.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/FailedMsgs.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/FailedMsgs.java
index 4c4e91c..bf63984 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/FailedMsgs.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/FailedMsgs.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.monitor;
@@ -22,40 +22,33 @@ public class FailedMsgs {
private String topic;
private long failedMsgsTotalRecently;
-
public String getConsumerGroup() {
return consumerGroup;
}
-
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
-
public String getTopic() {
return topic;
}
-
public void setTopic(String topic) {
this.topic = topic;
}
-
public long getFailedMsgsTotalRecently() {
return failedMsgsTotalRecently;
}
-
public void setFailedMsgsTotalRecently(long failedMsgsTotalRecently) {
this.failedMsgsTotalRecently = failedMsgsTotalRecently;
}
-
@Override
public String toString() {
return "FailedMsgs [consumerGroup=" + consumerGroup + ", topic=" + topic
- + ", failedMsgsTotalRecently=" + failedMsgsTotalRecently + "]";
+ + ", failedMsgsTotalRecently=" + failedMsgsTotalRecently + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorConfig.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorConfig.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorConfig.java
index fbe6c3c..e60d317 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorConfig.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorConfig.java
@@ -6,42 +6,37 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.monitor;
import org.apache.rocketmq.common.MixAll;
-
public class MonitorConfig {
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY,
- System.getenv(MixAll.NAMESRV_ADDR_ENV));
+ System.getenv(MixAll.NAMESRV_ADDR_ENV));
private int roundInterval = 1000 * 60;
-
public String getNamesrvAddr() {
return namesrvAddr;
}
-
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
-
public int getRoundInterval() {
return roundInterval;
}
-
public void setRoundInterval(int roundInterval) {
this.roundInterval = roundInterval;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorListener.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorListener.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorListener.java
index a60a273..17f85ce 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorListener.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorListener.java
@@ -17,9 +17,8 @@
package org.apache.rocketmq.tools.monitor;
-import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
-
import java.util.TreeMap;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
public interface MonitorListener {
void beginRound();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
index 8c368fe..d9c4cf8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java
@@ -6,17 +6,26 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.monitor;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
@@ -43,17 +52,10 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
public class MonitorService {
private final Logger log = ClientLogger.getLog();
private final ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorService"));
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("MonitorService"));
private final MonitorConfig monitorConfig;
@@ -61,10 +63,9 @@ public class MonitorService {
private final DefaultMQAdminExt defaultMQAdminExt;
private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer(
- MixAll.TOOLS_CONSUMER_GROUP);
+ MixAll.TOOLS_CONSUMER_GROUP);
private final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(
- MixAll.MONITOR_CONSUMER_GROUP);
-
+ MixAll.MONITOR_CONSUMER_GROUP);
public MonitorService(MonitorConfig monitorConfig, MonitorListener monitorListener, RPCHook rpcHook) {
this.monitorConfig = monitorConfig;
@@ -87,10 +88,10 @@ public class MonitorService {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
+ ConsumeConcurrentlyContext context) {
try {
OffsetMovedEvent ome =
- OffsetMovedEvent.decode(msgs.get(0).getBody(), OffsetMovedEvent.class);
+ OffsetMovedEvent.decode(msgs.get(0).getBody(), OffsetMovedEvent.class);
DeleteMsgsEvent deleteMsgsEvent = new DeleteMsgsEvent();
deleteMsgsEvent.setOffsetMovedEvent(ome);
@@ -107,27 +108,18 @@ public class MonitorService {
}
}
-
- private String instanceName() {
- String name =
- System.currentTimeMillis() + new Random().nextInt() + this.monitorConfig.getNamesrvAddr();
-
- return "MonitorService_" + name.hashCode();
- }
-
public static void main(String[] args) throws MQClientException {
main0(args, null);
}
public static void main0(String[] args, RPCHook rpcHook) throws MQClientException {
final MonitorService monitorService =
- new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook);
+ new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook);
monitorService.start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
-
@Override
public void run() {
synchronized (this) {
@@ -140,6 +132,13 @@ public class MonitorService {
}, "ShutdownHook"));
}
+ private String instanceName() {
+ String name =
+ System.currentTimeMillis() + new Random().nextInt() + this.monitorConfig.getNamesrvAddr();
+
+ return "MonitorService_" + name.hashCode();
+ }
+
public void start() throws MQClientException {
this.defaultMQPullConsumer.start();
this.defaultMQAdminExt.start();
@@ -181,7 +180,6 @@ public class MonitorService {
// log.error("reportUndoneMsgs Exception", e);
}
-
try {
this.reportConsumerRunningInfo(consumerGroup);
} catch (Exception e) {
@@ -228,7 +226,6 @@ public class MonitorService {
}
}
-
{
Iterator<Entry<String, ConsumeStats>> it = csByTopic.entrySet().iterator();
while (it.hasNext()) {
@@ -245,7 +242,7 @@ public class MonitorService {
}
public void reportConsumerRunningInfo(final String consumerGroup) throws InterruptedException,
- MQBrokerException, RemotingException, MQClientException {
+ MQBrokerException, RemotingException, MQClientException {
ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
TreeMap<String, ConsumerRunningInfo> infoMap = new TreeMap<String, ConsumerRunningInfo>();
for (Connection c : cc.getConnectionSet()) {
@@ -257,7 +254,7 @@ public class MonitorService {
try {
ConsumerRunningInfo info =
- defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false);
+ defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false);
infoMap.put(clientId, info);
} catch (Exception e) {
}
@@ -296,7 +293,7 @@ public class MonitorService {
switch (pull.getPullStatus()) {
case FOUND:
long delay =
- pull.getMsgFoundList().get(0).getStoreTimestamp() - ow.getLastTimestamp();
+ pull.getMsgFoundList().get(0).getStoreTimestamp() - ow.getLastTimestamp();
if (delay > delayMax) {
delayMax = delay;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/monitor/UndoneMsgs.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/UndoneMsgs.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/UndoneMsgs.java
index ac549af..abc0cb9 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/UndoneMsgs.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/UndoneMsgs.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.monitor;
@@ -27,61 +27,50 @@ public class UndoneMsgs {
private long undoneMsgsDelayTimeMills;
-
public String getConsumerGroup() {
return consumerGroup;
}
-
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
-
public String getTopic() {
return topic;
}
-
public void setTopic(String topic) {
this.topic = topic;
}
-
public long getUndoneMsgsTotal() {
return undoneMsgsTotal;
}
-
public void setUndoneMsgsTotal(long undoneMsgsTotal) {
this.undoneMsgsTotal = undoneMsgsTotal;
}
-
public long getUndoneMsgsSingleMQ() {
return undoneMsgsSingleMQ;
}
-
public void setUndoneMsgsSingleMQ(long undoneMsgsSingleMQ) {
this.undoneMsgsSingleMQ = undoneMsgsSingleMQ;
}
-
public long getUndoneMsgsDelayTimeMills() {
return undoneMsgsDelayTimeMills;
}
-
public void setUndoneMsgsDelayTimeMills(long undoneMsgsDelayTimeMills) {
this.undoneMsgsDelayTimeMills = undoneMsgsDelayTimeMills;
}
-
@Override
public String toString() {
return "UndoneMsgs [consumerGroup=" + consumerGroup + ", topic=" + topic + ", undoneMsgsTotal="
- + undoneMsgsTotal + ", undoneMsgsSingleMQ=" + undoneMsgsSingleMQ
- + ", undoneMsgsDelayTimeMills=" + undoneMsgsDelayTimeMills + "]";
+ + undoneMsgsTotal + ", undoneMsgsSingleMQ=" + undoneMsgsSingleMQ
+ + ", undoneMsgsDelayTimeMills=" + undoneMsgsDelayTimeMills + "]";
}
}