You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:59 UTC
[68/99] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index df4fe89..5056010 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -16,6 +16,11 @@
*/
package org.apache.rocketmq.tools.admin;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -25,17 +30,27 @@ import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
-import org.apache.rocketmq.remoting.exception.*;
-
-import java.io.UnsupportedEncodingException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
public interface MQAdminExt extends MQAdmin {
void start() throws MQClientException;
@@ -43,53 +58,53 @@ public interface MQAdminExt extends MQAdmin {
void shutdown();
void updateBrokerConfig(final String brokerAddr, final Properties properties) throws RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException;
+ RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException;
Properties getBrokerConfig(final String brokerAddr) throws RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException;
+ RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException;
void createAndUpdateTopicConfig(final String addr, final TopicConfig config) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException;
+ InterruptedException, MQClientException;
void createAndUpdateSubscriptionGroupConfig(final String addr, final SubscriptionGroupConfig config) throws RemotingException,
- MQBrokerException, InterruptedException, MQClientException;
+ MQBrokerException, InterruptedException, MQClientException;
SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group);
TopicConfig examineTopicConfig(final String addr, final String topic);
TopicStatsTable examineTopicStats(final String topic) throws RemotingException, MQClientException, InterruptedException,
- MQBrokerException;
+ MQBrokerException;
TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException;
TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException;
KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, InterruptedException, MQBrokerException;
+ RemotingTimeoutException, InterruptedException, MQBrokerException;
ConsumeStats examineConsumeStats(final String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
- MQBrokerException;
+ MQBrokerException;
ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) throws RemotingException, MQClientException,
- InterruptedException, MQBrokerException;
+ InterruptedException, MQBrokerException;
ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException;
+ RemotingSendRequestException, RemotingConnectException;
TopicRouteData examineTopicRouteInfo(final String topic) throws RemotingException, MQClientException, InterruptedException;
ConsumerConnection examineConsumerConnectionInfo(final String consumerGroup) throws RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException,
- MQClientException;
+ RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException,
+ MQClientException;
ProducerConnection examineProducerConnectionInfo(final String producerGroup, final String topic) throws RemotingException,
- MQClientException, InterruptedException, MQBrokerException;
+ MQClientException, InterruptedException, MQBrokerException;
List<String> getNameServerAddressList();
int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException,
- RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException;
+ RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException;
void putKVConfig(final String namespace, final String key, final String value);
@@ -98,91 +113,94 @@ public interface MQAdminExt extends MQAdmin {
KVTable getKVListByNamespace(final String namespace) throws RemotingException, MQClientException, InterruptedException;
void deleteTopicInBroker(final Set<String> addrs, final String topic) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException;
+ InterruptedException, MQClientException;
void deleteTopicInNameServer(final Set<String> addrs, final String topic) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException;
+ InterruptedException, MQClientException;
void deleteSubscriptionGroup(final String addr, String groupName) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException;
+ InterruptedException, MQClientException;
void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException;
+ InterruptedException, MQClientException;
void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException,
- MQClientException;
+ MQClientException;
List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException;
+ InterruptedException, MQClientException;
Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException,
- MQBrokerException, InterruptedException, MQClientException;
+ MQBrokerException, InterruptedException, MQClientException;
void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException;
+ InterruptedException, MQClientException;
GroupList queryTopicConsumeByWho(final String topic) throws RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException, MQClientException;
+ RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException, MQClientException;
List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException,
- RemotingException, MQClientException;
+ RemotingException, MQClientException;
boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, MQClientException, InterruptedException;
+ RemotingTimeoutException, MQClientException, InterruptedException;
boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, MQClientException, InterruptedException;
+ RemotingTimeoutException, MQClientException, InterruptedException;
boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, MQClientException, InterruptedException;
-
+ RemotingTimeoutException, MQClientException, InterruptedException;
boolean cleanUnusedTopicByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, MQClientException, InterruptedException;
+ RemotingTimeoutException, MQClientException, InterruptedException;
ConsumerRunningInfo getConsumerRunningInfo(final String consumerGroup, final String clientId, final boolean jstack)
- throws RemotingException, MQClientException, InterruptedException;
+ throws RemotingException, MQClientException, InterruptedException;
ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup,
- String clientId,
- String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
+ String clientId,
+ String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup,
- String clientId,
- String topic,
- String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
+ String clientId,
+ String topic,
+ String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
- MQBrokerException;
+ MQBrokerException;
void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException,
- MQClientException, InterruptedException, MQBrokerException;
+ MQClientException, InterruptedException, MQBrokerException;
BrokerStatsData viewBrokerStatsData(final String brokerAddr, final String statsName, final String statsKey)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
- InterruptedException;
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException,
+ InterruptedException;
Set<String> getClusterList(final String topic) throws RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, MQClientException, InterruptedException;
+ RemotingTimeoutException, MQClientException, InterruptedException;
- ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, MQClientException, InterruptedException;
+ ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder,
+ long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException;
Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException;
- SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
- RemotingConnectException, MQBrokerException;
+ SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
+ long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException;
- TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
- RemotingConnectException, MQBrokerException;
+ TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
+ long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException;
- void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, long offset) throws RemotingException, InterruptedException, MQBrokerException;
+ void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq,
+ long offset) throws RemotingException, InterruptedException, MQBrokerException;
/**
* Update name server config.
@@ -193,7 +211,6 @@ public interface MQAdminExt extends MQAdmin {
*
* @param properties
* @param nameServers
- *
* @throws InterruptedException
* @throws RemotingConnectException
* @throws UnsupportedEncodingException
@@ -203,8 +220,8 @@ public interface MQAdminExt extends MQAdmin {
* @throws MQBrokerException
*/
void updateNameServerConfig(final Properties properties, final List<String> nameServers) throws InterruptedException, RemotingConnectException,
- UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException,
- MQClientException, MQBrokerException;
+ UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException,
+ MQClientException, MQBrokerException;
/**
* Get name server config.
@@ -213,9 +230,7 @@ public interface MQAdminExt extends MQAdmin {
* <br> If param(nameServers) is null or empty, will use name servers from ns!
*
* @param nameServers
- *
* @return The fetched name server config
- *
* @throws InterruptedException
* @throws RemotingTimeoutException
* @throws RemotingSendRequestException
@@ -224,6 +239,6 @@ public interface MQAdminExt extends MQAdmin {
* @throws UnsupportedEncodingException
*/
Map<String, Properties> getNameServerConfig(final List<String> nameServers) throws InterruptedException,
- RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
- MQClientException, UnsupportedEncodingException;
+ RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
+ MQClientException, UnsupportedEncodingException;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/admin/api/MessageTrack.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/api/MessageTrack.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/MessageTrack.java
index 324b661..4445482 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/api/MessageTrack.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/MessageTrack.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.admin.api;
@@ -22,40 +22,33 @@ public class MessageTrack {
private TrackType trackType;
private String exceptionDesc;
-
public String getConsumerGroup() {
return consumerGroup;
}
-
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
-
public TrackType getTrackType() {
return trackType;
}
-
public void setTrackType(TrackType trackType) {
this.trackType = trackType;
}
-
public String getExceptionDesc() {
return exceptionDesc;
}
-
public void setExceptionDesc(String exceptionDesc) {
this.exceptionDesc = exceptionDesc;
}
-
@Override
public String toString() {
return "MessageTrack [consumerGroup=" + consumerGroup + ", trackType=" + trackType
- + ", exceptionDesc=" + exceptionDesc + "]";
+ + ", exceptionDesc=" + exceptionDesc + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java
index 36345f9..df70523 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.admin.api;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
index c675d9a..8b86ab8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
@@ -16,6 +16,14 @@
*/
package org.apache.rocketmq.tools.command;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
@@ -25,16 +33,13 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.MQAdminExt;
-import java.util.*;
-
-
public class CommandUtil {
public static Map<String/*master addr*/, List<String>/*slave addr*/> fetchMasterAndSlaveDistinguish(
- final MQAdminExt adminExt, final String clusterName)
- throws InterruptedException, RemotingConnectException,
- RemotingTimeoutException, RemotingSendRequestException,
- MQBrokerException {
+ final MQAdminExt adminExt, final String clusterName)
+ throws InterruptedException, RemotingConnectException,
+ RemotingTimeoutException, RemotingSendRequestException,
+ MQBrokerException {
Map<String, List<String>> masterAndSlaveMap = new HashMap<String, List<String>>(4);
ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
@@ -42,7 +47,7 @@ public class CommandUtil {
if (brokerNameSet == null) {
System.out
- .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct.");
+ .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct.");
return masterAndSlaveMap;
}
@@ -58,7 +63,7 @@ public class CommandUtil {
for (Long id : brokerData.getBrokerAddrs().keySet()) {
if (brokerData.getBrokerAddrs().get(id) == null
- || id.longValue() == MixAll.MASTER_ID) {
+ || id.longValue() == MixAll.MASTER_ID) {
continue;
}
@@ -70,8 +75,8 @@ public class CommandUtil {
}
public static Set<String> fetchMasterAddrByClusterName(final MQAdminExt adminExt, final String clusterName)
- throws InterruptedException, RemotingConnectException, RemotingTimeoutException,
- RemotingSendRequestException, MQBrokerException {
+ throws InterruptedException, RemotingConnectException, RemotingTimeoutException,
+ RemotingSendRequestException, MQBrokerException {
Set<String> masterSet = new HashSet<String>();
ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
@@ -91,15 +96,15 @@ public class CommandUtil {
}
} else {
System.out
- .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct.");
+ .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct.");
}
return masterSet;
}
public static Set<String> fetchMasterAndSlaveAddrByClusterName(final MQAdminExt adminExt, final String clusterName)
- throws InterruptedException, RemotingConnectException, RemotingTimeoutException,
- RemotingSendRequestException, MQBrokerException {
+ throws InterruptedException, RemotingConnectException, RemotingTimeoutException,
+ RemotingSendRequestException, MQBrokerException {
Set<String> masterSet = new HashSet<String>();
ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
@@ -116,29 +121,27 @@ public class CommandUtil {
}
} else {
System.out
- .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct.");
+ .printf("[error] Make sure the specified clusterName exists or the nameserver which connected is correct.");
}
return masterSet;
}
-
public static Set<String> fetchBrokerNameByClusterName(final MQAdminExt adminExt, final String clusterName)
- throws Exception {
+ throws Exception {
ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
Set<String> brokerNameSet = clusterInfoSerializeWrapper.getClusterAddrTable().get(clusterName);
if (brokerNameSet.isEmpty()) {
throw new Exception(
- "Make sure the specified clusterName exists or the nameserver which connected is correct.");
+ "Make sure the specified clusterName exists or the nameserver which connected is correct.");
}
return brokerNameSet;
}
-
public static String fetchBrokerNameByAddr(final MQAdminExt adminExt, final String addr) throws Exception {
ClusterInfo clusterInfoSerializeWrapper = adminExt.examineBrokerClusterInfo();
HashMap<String/* brokerName */, BrokerData> brokerAddrTable =
- clusterInfoSerializeWrapper.getBrokerAddrTable();
+ clusterInfoSerializeWrapper.getBrokerAddrTable();
Iterator<Map.Entry<String, BrokerData>> it = brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, BrokerData> entry = it.next();
@@ -147,8 +150,7 @@ public class CommandUtil {
return entry.getKey();
}
throw new Exception(
- "Make sure the specified broker addr exists or the nameserver which connected is correct.");
+ "Make sure the specified broker addr exists or the nameserver which connected is correct.");
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index d11dd23..a1753c1 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -6,45 +6,71 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.command.broker.BrokerConsumeStatsSubCommad;
+import org.apache.rocketmq.tools.command.broker.BrokerStatusSubCommand;
+import org.apache.rocketmq.tools.command.broker.CleanExpiredCQSubCommand;
+import org.apache.rocketmq.tools.command.broker.CleanUnusedTopicCommand;
+import org.apache.rocketmq.tools.command.broker.GetBrokerConfigCommand;
+import org.apache.rocketmq.tools.command.broker.SendMsgStatusCommand;
+import org.apache.rocketmq.tools.command.broker.UpdateBrokerConfigSubCommand;
import org.apache.rocketmq.tools.command.cluster.CLusterSendMsgRTCommand;
import org.apache.rocketmq.tools.command.cluster.ClusterListSubCommand;
import org.apache.rocketmq.tools.command.connection.ConsumerConnectionSubCommand;
import org.apache.rocketmq.tools.command.connection.ProducerConnectionSubCommand;
-import org.apache.rocketmq.tools.command.consumer.*;
-import org.apache.rocketmq.tools.command.namesrv.*;
+import org.apache.rocketmq.tools.command.consumer.ConsumerProgressSubCommand;
+import org.apache.rocketmq.tools.command.consumer.ConsumerStatusSubCommand;
+import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand;
+import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
+import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
+import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
+import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
+import org.apache.rocketmq.tools.command.message.PrintMessageSubCommand;
+import org.apache.rocketmq.tools.command.message.QueryMsgByIdSubCommand;
+import org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand;
+import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
+import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
+import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
+import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
+import org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand;
+import org.apache.rocketmq.tools.command.namesrv.UpdateNamesrvConfigCommand;
+import org.apache.rocketmq.tools.command.namesrv.WipeWritePermSubCommand;
import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand;
import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.tools.command.broker.*;
-import org.apache.rocketmq.tools.command.message.*;
-import org.apache.rocketmq.tools.command.topic.*;
+import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
+import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand;
+import org.apache.rocketmq.tools.command.topic.TopicClusterSubCommand;
+import org.apache.rocketmq.tools.command.topic.TopicListSubCommand;
+import org.apache.rocketmq.tools.command.topic.TopicRouteSubCommand;
+import org.apache.rocketmq.tools.command.topic.TopicStatusSubCommand;
+import org.apache.rocketmq.tools.command.topic.UpdateOrderConfCommand;
+import org.apache.rocketmq.tools.command.topic.UpdateTopicPermSubCommand;
+import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-
-
public class MQAdminStartup {
protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();
@@ -55,7 +81,6 @@ public class MQAdminStartup {
public static void main0(String[] args, RPCHook rpcHook) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
-
//PackageConflictDetect.detectFastjson();
initCommand();
@@ -86,11 +111,10 @@ public class MQAdminStartup {
if (cmd != null) {
String[] subargs = parseSubArgs(args);
-
Options options = ServerUtil.buildCommandlineOptions(new Options());
final CommandLine commandLine =
- ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
- new PosixParser());
+ ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
+ new PosixParser());
if (null == commandLine) {
System.exit(-1);
return;
@@ -124,7 +148,6 @@ public class MQAdminStartup {
initCommand(new TopicStatusSubCommand());
initCommand(new TopicClusterSubCommand());
-
initCommand(new BrokerStatusSubCommand());
initCommand(new QueryMsgByIdSubCommand());
initCommand(new QueryMsgByKeySubCommand());
@@ -136,7 +159,6 @@ public class MQAdminStartup {
initCommand(new SendMsgStatusCommand());
initCommand(new BrokerConsumeStatsSubCommad());
-
initCommand(new ProducerConnectionSubCommand());
initCommand(new ConsumerConnectionSubCommand());
initCommand(new ConsumerProgressSubCommand());
@@ -172,7 +194,7 @@ public class MQAdminStartup {
private static void initLogback() throws JoranException {
String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
- LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommand.java
index 744685e..2035276 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/SubCommand.java
@@ -6,30 +6,26 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command;
-import org.apache.rocketmq.remoting.RPCHook;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
-
+import org.apache.rocketmq.remoting.RPCHook;
public interface SubCommand {
public String commandName();
-
public String commandDesc();
-
public Options buildCommandlineOptions(final Options options);
-
public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
index 57ca907..485b58c 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerConsumeStatsSubCommad.java
@@ -16,6 +16,14 @@
*/
package org.apache.rocketmq.tools.command.broker;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
@@ -24,12 +32,6 @@ import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.*;
-
public class BrokerConsumeStatsSubCommad implements SubCommand {
@@ -86,14 +88,14 @@ public class BrokerConsumeStatsSubCommad implements SubCommand {
ConsumeStatsList consumeStatsList = defaultMQAdminExt.fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
System.out.printf("%-32s %-32s %-32s %-4s %-20s %-20s %-20s %s%n",
- "#Topic",
- "#Group",
- "#Broker Name",
- "#QID",
- "#Broker Offset",
- "#Consumer Offset",
- "#Diff",
- "#LastTime");
+ "#Topic",
+ "#Group",
+ "#Broker Name",
+ "#QID",
+ "#Broker Offset",
+ "#Consumer Offset",
+ "#Diff",
+ "#LastTime");
for (Map<String, List<ConsumeStats>> map : consumeStatsList.getConsumeStatsList()) {
for (Map.Entry<String, List<ConsumeStats>> entry : map.entrySet()) {
String group = entry.getKey();
@@ -117,14 +119,14 @@ public class BrokerConsumeStatsSubCommad implements SubCommand {
}
if (offsetWrapper.getLastTimestamp() > 0)
System.out.printf("%-32s %-32s %-32s %-4d %-20d %-20d %-20d %s%n",
- UtilAll.frontStringAtLeast(mq.getTopic(), 32),
- group,
- UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
- mq.getQueueId(),
- offsetWrapper.getBrokerOffset(),
- offsetWrapper.getConsumerOffset(),
- diff,
- lastTime
+ UtilAll.frontStringAtLeast(mq.getTopic(), 32),
+ group,
+ UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
+ mq.getQueueId(),
+ offsetWrapper.getBrokerOffset(),
+ offsetWrapper.getConsumerOffset(),
+ diff,
+ lastTime
);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java
index 3f5ff79..2fad2d1 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommand.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.tools.command.broker;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.remoting.RPCHook;
@@ -25,15 +32,6 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-
public class BrokerStatusSubCommand implements SubCommand {
@@ -42,13 +40,11 @@ public class BrokerStatusSubCommand implements SubCommand {
return "brokerStatus";
}
-
@Override
public String commandDesc() {
return "Fetch broker runtime status data";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("b", "brokerAddr", true, "Broker address");
@@ -77,7 +73,7 @@ public class BrokerStatusSubCommand implements SubCommand {
printBrokerRuntimeStats(defaultMQAdminExt, brokerAddr, false);
} else if (clusterName != null) {
Set<String> masterSet =
- CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName);
+ CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName);
for (String ba : masterSet) {
try {
printBrokerRuntimeStats(defaultMQAdminExt, ba, true);
@@ -87,7 +83,6 @@ public class BrokerStatusSubCommand implements SubCommand {
}
}
-
} catch (Exception e) {
e.printStackTrace();
} finally {
@@ -95,7 +90,8 @@ public class BrokerStatusSubCommand implements SubCommand {
}
}
- public void printBrokerRuntimeStats(final DefaultMQAdminExt defaultMQAdminExt, final String brokerAddr, final boolean printBroker) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ public void printBrokerRuntimeStats(final DefaultMQAdminExt defaultMQAdminExt, final String brokerAddr,
+ final boolean printBroker) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
KVTable kvTable = defaultMQAdminExt.fetchBrokerRuntimeStats(brokerAddr);
TreeMap<String, String> tmp = new TreeMap<String, String>();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java
index 71aa78b..d15ad64 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommand.java
@@ -6,24 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.broker;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
public class CleanExpiredCQSubCommand implements SubCommand {
@@ -32,13 +31,11 @@ public class CleanExpiredCQSubCommand implements SubCommand {
return "cleanExpiredCQ";
}
-
@Override
public String commandDesc() {
return "Clean expired ConsumeQueue on broker.";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("b", "brokerAddr", true, "Broker address");
@@ -52,7 +49,6 @@ public class CleanExpiredCQSubCommand implements SubCommand {
return options;
}
-
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java
index 0e4c4b4..ca5778b 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommand.java
@@ -6,24 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.broker;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
public class CleanUnusedTopicCommand implements SubCommand {
@@ -32,13 +31,11 @@ public class CleanUnusedTopicCommand implements SubCommand {
return "cleanUnusedTopic";
}
-
@Override
public String commandDesc() {
return "Clean unused topic on broker.";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("b", "brokerAddr", true, "Broker address");
@@ -52,7 +49,6 @@ public class CleanUnusedTopicCommand implements SubCommand {
return options;
}
-
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
index 2956264..d0a271e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
@@ -17,6 +17,13 @@
package org.apache.rocketmq.tools.command.broker;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -26,14 +33,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.io.UnsupportedEncodingException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
public class GetBrokerConfigCommand implements SubCommand {
@Override
@@ -72,29 +71,29 @@ public class GetBrokerConfigCommand implements SubCommand {
defaultMQAdminExt.start();
getAndPrint(defaultMQAdminExt,
- String.format("============%s============\n", brokerAddr),
- brokerAddr);
+ String.format("============%s============\n", brokerAddr),
+ brokerAddr);
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
defaultMQAdminExt.start();
Map<String, List<String>> masterAndSlaveMap
- = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName);
+ = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExt, clusterName);
for (String masterAddr : masterAndSlaveMap.keySet()) {
getAndPrint(
- defaultMQAdminExt,
- String.format("============Master: %s============\n", masterAddr),
- masterAddr
+ defaultMQAdminExt,
+ String.format("============Master: %s============\n", masterAddr),
+ masterAddr
);
for (String slaveAddr : masterAndSlaveMap.get(masterAddr)) {
getAndPrint(
- defaultMQAdminExt,
- String.format("============My Master: %s=====Slave: %s============\n", masterAddr, slaveAddr),
- slaveAddr
+ defaultMQAdminExt,
+ String.format("============My Master: %s=====Slave: %s============\n", masterAddr, slaveAddr),
+ slaveAddr
);
}
}
@@ -108,9 +107,9 @@ public class GetBrokerConfigCommand implements SubCommand {
}
protected void getAndPrint(final MQAdminExt defaultMQAdminExt, final String printPrefix, final String addr)
- throws InterruptedException, RemotingConnectException,
- UnsupportedEncodingException, RemotingTimeoutException,
- MQBrokerException, RemotingSendRequestException {
+ throws InterruptedException, RemotingConnectException,
+ UnsupportedEncodingException, RemotingTimeoutException,
+ MQBrokerException, RemotingSendRequestException {
System.out.print(printPrefix);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java
index d40ba21..d770d12 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java
@@ -16,33 +16,41 @@
*/
package org.apache.rocketmq.tools.command.broker;
+import java.io.UnsupportedEncodingException;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import java.io.UnsupportedEncodingException;
+public class SendMsgStatusCommand implements SubCommand {
+ private static Message buildMessage(final String topic, final int messageSize) throws UnsupportedEncodingException {
+ Message msg = new Message();
+ msg.setTopic(topic);
-public class SendMsgStatusCommand implements SubCommand {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < messageSize; i += 11) {
+ sb.append("hello jodie");
+ }
+ msg.setBody(sb.toString().getBytes(MixAll.DEFAULT_CHARSET));
+ return msg;
+ }
@Override
public String commandName() {
return "sendMsgStatus";
}
-
@Override
public String commandDesc() {
return "send msg to broker.";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("b", "brokerName", true, "Broker Name");
@@ -60,7 +68,6 @@ public class SendMsgStatusCommand implements SubCommand {
return options;
}
-
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
final DefaultMQProducer producer = new DefaultMQProducer("PID_SMSC", rpcHook);
@@ -85,17 +92,4 @@ public class SendMsgStatusCommand implements SubCommand {
producer.shutdown();
}
}
-
-
- private static Message buildMessage(final String topic, final int messageSize) throws UnsupportedEncodingException {
- Message msg = new Message();
- msg.setTopic(topic);
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < messageSize; i += 11) {
- sb.append("hello jodie");
- }
- msg.setBody(sb.toString().getBytes(MixAll.DEFAULT_CHARSET));
- return msg;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
index 1de9457..8718c9e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommand.java
@@ -6,28 +6,26 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.tools.command.broker;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Properties;
-import java.util.Set;
-
public class UpdateBrokerConfigSubCommand implements SubCommand {
@@ -36,13 +34,11 @@ public class UpdateBrokerConfigSubCommand implements SubCommand {
return "updateBrokerConfig";
}
-
@Override
public String commandDesc() {
return "Update broker's config";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("b", "brokerAddr", true, "update which broker");
@@ -64,7 +60,6 @@ public class UpdateBrokerConfigSubCommand implements SubCommand {
return options;
}
-
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
@@ -92,7 +87,7 @@ public class UpdateBrokerConfigSubCommand implements SubCommand {
defaultMQAdminExt.start();
Set<String> masterSet =
- CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String brokerAddr : masterSet) {
try {
defaultMQAdminExt.updateBrokerConfig(brokerAddr, properties);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
index a8bd3a8..1ae6d52 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/CLusterSendMsgRTCommand.java
@@ -17,6 +17,16 @@
package org.apache.rocketmq.tools.command.cluster;
+import java.math.BigDecimal;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.TreeSet;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
@@ -24,13 +34,6 @@ import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.math.BigDecimal;
-import java.text.SimpleDateFormat;
-import java.util.*;
public class CLusterSendMsgRTCommand implements SubCommand {
@@ -90,24 +93,24 @@ public class CLusterSendMsgRTCommand implements SubCommand {
ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterAddr = clusterInfoSerializeWrapper
- .getClusterAddrTable();
+ .getClusterAddrTable();
Set<String> clusterNames = null;
long amount = !commandLine.hasOption('a') ? 50 : Long.parseLong(commandLine
- .getOptionValue('a').trim());
+ .getOptionValue('a').trim());
long size = !commandLine.hasOption('s') ? 128 : Long.parseLong(commandLine
- .getOptionValue('s').trim());
+ .getOptionValue('s').trim());
long interval = !commandLine.hasOption('i') ? 10 : Long.parseLong(commandLine
- .getOptionValue('i').trim());
+ .getOptionValue('i').trim());
boolean printAsTlog = !commandLine.hasOption('p') ? false : Boolean
- .parseBoolean(commandLine.getOptionValue('p').trim());
+ .parseBoolean(commandLine.getOptionValue('p').trim());
String machineRoom = !commandLine.hasOption('m') ? "noname" : commandLine
- .getOptionValue('m').trim();
+ .getOptionValue('m').trim();
if (commandLine.hasOption('c')) {
clusterNames = new TreeSet<String>();
@@ -118,11 +121,11 @@ public class CLusterSendMsgRTCommand implements SubCommand {
if (!printAsTlog) {
System.out.printf("%-24s %-24s %-4s %-8s %-8s%n",
- "#Cluster Name",
- "#Broker Name",
- "#RT",
- "#successCount",
- "#failCount"
+ "#Cluster Name",
+ "#Broker Name",
+ "#RT",
+ "#successCount",
+ "#failCount"
);
}
@@ -158,19 +161,19 @@ public class CLusterSendMsgRTCommand implements SubCommand {
}
}
- double rt = (double) elapsed / (amount - 1);
+ double rt = (double)elapsed / (amount - 1);
if (!printAsTlog) {
System.out.printf("%-24s %-24s %-8s %-16s %-16s%n",
- clusterName,
- brokerName,
- String.format("%.2f", rt),
- successCount,
- failCount
+ clusterName,
+ brokerName,
+ String.format("%.2f", rt),
+ successCount,
+ failCount
);
} else {
System.out.printf(String.format("%s|%s|%s|%s|%s%n", getCurTime(),
- machineRoom, clusterName, brokerName,
- new BigDecimal(rt).setScale(0, BigDecimal.ROUND_HALF_UP)));
+ machineRoom, clusterName, brokerName,
+ new BigDecimal(rt).setScale(0, BigDecimal.ROUND_HALF_UP)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
index b649af1..eb250cf 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.tools.command.cluster;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.KVTable;
@@ -26,15 +33,6 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
public class ClusterListSubCommand implements SubCommand {
@@ -43,13 +41,11 @@ public class ClusterListSubCommand implements SubCommand {
return "clusterList";
}
-
@Override
public String commandDesc() {
return "List all of clusters";
}
-
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("m", "moreStats", false, "Print more stats");
@@ -89,7 +85,8 @@ public class ClusterListSubCommand implements SubCommand {
} else {
this.printClusterBaseInfo(defaultMQAdminExt);
}
- } while (enableInterval);
+ }
+ while (enableInterval);
} catch (Exception e) {
e.printStackTrace();
} finally {
@@ -98,17 +95,17 @@ public class ClusterListSubCommand implements SubCommand {
}
private void printClusterMoreStats(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException,
- RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException {
+ RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException {
ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
System.out.printf("%-16s %-32s %14s %14s %14s %14s%n",
- "#Cluster Name",
- "#Broker Name",
- "#InTotalYest",
- "#OutTotalYest",
- "#InTotalToday",
- "#OutTotalToday"
+ "#Cluster Name",
+ "#Broker Name",
+ "#InTotalYest",
+ "#OutTotalYest",
+ "#InTotalToday",
+ "#OutTotalToday"
);
Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator();
@@ -149,12 +146,12 @@ public class ClusterListSubCommand implements SubCommand {
}
System.out.printf("%-16s %-32s %14d %14d %14d %14d%n",
- clusterName,
- brokerName,
- inTotalYest,
- outTotalYest,
- inTotalToday,
- outTotalToday
+ clusterName,
+ brokerName,
+ inTotalYest,
+ outTotalYest,
+ inTotalToday,
+ outTotalToday
);
}
}
@@ -167,21 +164,21 @@ public class ClusterListSubCommand implements SubCommand {
}
private void printClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException,
- RemotingSendRequestException, InterruptedException, MQBrokerException {
+ RemotingSendRequestException, InterruptedException, MQBrokerException {
ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n",
- "#Cluster Name",
- "#Broker Name",
- "#BID",
- "#Addr",
- "#Version",
- "#InTPS(LOAD)",
- "#OutTPS(LOAD)",
- "#PCWait(ms)",
- "#Hour",
- "#SPACE"
+ "#Cluster Name",
+ "#Broker Name",
+ "#BID",
+ "#Addr",
+ "#Version",
+ "#InTPS(LOAD)",
+ "#OutTPS(LOAD)",
+ "#PCWait(ms)",
+ "#Hour",
+ "#SPACE"
);
Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator();
@@ -254,16 +251,16 @@ public class ClusterListSubCommand implements SubCommand {
}
System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n",
- clusterName,
- brokerName,
- next1.getKey().longValue(),
- next1.getValue(),
- version,
- String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills),
- String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills),
- pageCacheLockTimeMills,
- String.format("%2.2f", hour),
- String.format("%.4f", space)
+ clusterName,
+ brokerName,
+ next1.getKey().longValue(),
+ next1.getValue(),
+ version,
+ String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills),
+ String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills),
+ pageCacheLockTimeMills,
+ String.format("%2.2f", hour),
+ String.format("%.4f", space)
);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java
index 355e894..7f7f88d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommand.java
@@ -16,6 +16,11 @@
*/
package org.apache.rocketmq.tools.command.connection;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
@@ -23,13 +28,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-
public class ConsumerConnectionSubCommand implements SubCommand {
@@ -65,15 +63,14 @@ public class ConsumerConnectionSubCommand implements SubCommand {
ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(group);
-
int i = 1;
for (Connection conn : cc.getConnectionSet()) {
System.out.printf("%03d %-32s %-22s %-8s %s%n",
- i++,
- conn.getClientId(),
- conn.getClientAddr(),
- conn.getLanguage(),
- MQVersion.getVersionDesc(conn.getVersion())
+ i++,
+ conn.getClientId(),
+ conn.getClientAddr(),
+ conn.getLanguage(),
+ MQVersion.getVersionDesc(conn.getVersion())
);
}
@@ -84,9 +81,9 @@ public class ConsumerConnectionSubCommand implements SubCommand {
Entry<String, SubscriptionData> entry = it.next();
SubscriptionData sd = entry.getValue();
System.out.printf("%03d Topic: %-40s SubExpression: %s%n",
- i++,
- sd.getTopic(),
- sd.getSubString()
+ i++,
+ sd.getTopic(),
+ sd.getSubString()
);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java
index 0b5b0ab..387c9c8 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommand.java
@@ -16,16 +16,15 @@
*/
package org.apache.rocketmq.tools.command.connection;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-
public class ProducerConnectionSubCommand implements SubCommand {
@@ -69,11 +68,11 @@ public class ProducerConnectionSubCommand implements SubCommand {
int i = 1;
for (Connection conn : pc.getConnectionSet()) {
System.out.printf("%04d %-32s %-22s %-8s %s%n",
- i++,
- conn.getClientId(),
- conn.getClientAddr(),
- conn.getLanguage(),
- MQVersion.getVersionDesc(conn.getVersion())
+ i++,
+ conn.getClientId(),
+ conn.getClientAddr(),
+ conn.getLanguage(),
+ MQVersion.getVersionDesc(conn.getVersion())
);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
index 56e0853..3e70614 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.tools.command.consumer;
+import java.util.Collections;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
@@ -30,17 +37,8 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
import org.slf4j.Logger;
-import java.util.Collections;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-
-
public class ConsumerProgressSubCommand implements SubCommand {
private final Logger log = ClientLogger.getLog();
@@ -78,13 +76,13 @@ public class ConsumerProgressSubCommand implements SubCommand {
Collections.sort(mqList);
System.out.printf("%-32s %-32s %-4s %-20s %-20s %-20s %s%n",
- "#Topic",
- "#Broker Name",
- "#QID",
- "#Broker Offset",
- "#Consumer Offset",
- "#Diff",
- "#LastTime");
+ "#Topic",
+ "#Broker Name",
+ "#QID",
+ "#Broker Offset",
+ "#Consumer Offset",
+ "#Diff",
+ "#LastTime");
long diffTotal = 0L;
for (MessageQueue mq : mqList) {
@@ -97,13 +95,13 @@ public class ConsumerProgressSubCommand implements SubCommand {
} catch (Exception e) {
}
System.out.printf("%-32s %-32s %-4d %-20d %-20d %-20d %s%n",
- UtilAll.frontStringAtLeast(mq.getTopic(), 32),
- UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
- mq.getQueueId(),
- offsetWrapper.getBrokerOffset(),
- offsetWrapper.getConsumerOffset(),
- diff,
- lastTime
+ UtilAll.frontStringAtLeast(mq.getTopic(), 32),
+ UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
+ mq.getQueueId(),
+ offsetWrapper.getBrokerOffset(),
+ offsetWrapper.getConsumerOffset(),
+ diff,
+ lastTime
);
}
@@ -112,13 +110,13 @@ public class ConsumerProgressSubCommand implements SubCommand {
System.out.printf("Diff Total: %d%n", diffTotal);
} else {
System.out.printf("%-32s %-6s %-24s %-5s %-14s %-7s %s%n",
- "#Group",
- "#Count",
- "#Version",
- "#Type",
- "#Model",
- "#TPS",
- "#Diff Total"
+ "#Group",
+ "#Count",
+ "#Version",
+ "#Type",
+ "#Model",
+ "#TPS",
+ "#Diff Total"
);
TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
for (String topic : topicList.getTopicList()) {
@@ -143,7 +141,7 @@ public class ConsumerProgressSubCommand implements SubCommand {
groupConsumeInfo.setGroup(consumerGroup);
if (consumeStats != null) {
- groupConsumeInfo.setConsumeTps((int) consumeStats.getConsumeTps());
+ groupConsumeInfo.setConsumeTps((int)consumeStats.getConsumeTps());
groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
}
@@ -155,13 +153,13 @@ public class ConsumerProgressSubCommand implements SubCommand {
}
System.out.printf("%-32s %-6d %-24s %-5s %-14s %-7d %d%n",
- UtilAll.frontStringAtLeast(groupConsumeInfo.getGroup(), 32),
- groupConsumeInfo.getCount(),
- groupConsumeInfo.getCount() > 0 ? groupConsumeInfo.versionDesc() : "OFFLINE",
- groupConsumeInfo.consumeTypeDesc(),
- groupConsumeInfo.messageModelDesc(),
- groupConsumeInfo.getConsumeTps(),
- groupConsumeInfo.getDiffTotal()
+ UtilAll.frontStringAtLeast(groupConsumeInfo.getGroup(), 32),
+ groupConsumeInfo.getCount(),
+ groupConsumeInfo.getCount() > 0 ? groupConsumeInfo.versionDesc() : "OFFLINE",
+ groupConsumeInfo.consumeTypeDesc(),
+ groupConsumeInfo.messageModelDesc(),
+ groupConsumeInfo.getConsumeTps(),
+ groupConsumeInfo.getDiffTotal()
);
} catch (Exception e) {
log.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " + consumerGroup, e);
@@ -177,7 +175,6 @@ public class ConsumerProgressSubCommand implements SubCommand {
}
}
-
class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
private String group;
private int version;
@@ -187,7 +184,6 @@ class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
private int consumeTps;
private long diffTotal;
-
public String getGroup() {
return group;
}
@@ -245,37 +241,31 @@ class GroupConsumeInfo implements Comparable<GroupConsumeInfo> {
return diffTotal;
}
-
public void setDiffTotal(long diffTotal) {
this.diffTotal = diffTotal;
}
-
@Override
public int compareTo(GroupConsumeInfo o) {
if (this.count != o.count) {
return o.count - this.count;
}
- return (int) (o.diffTotal - diffTotal);
+ return (int)(o.diffTotal - diffTotal);
}
-
public int getConsumeTps() {
return consumeTps;
}
-
public void setConsumeTps(int consumeTps) {
this.consumeTps = consumeTps;
}
-
public int getVersion() {
return version;
}
-
public void setVersion(int version) {
this.version = version;
}