You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/29 12:30:38 UTC
[10/28] incubator-rocketmq git commit: Reformat code globally second
time
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 6fc7335..51a8a27 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -85,7 +85,6 @@ public class HAService {
return result;
}
-
public void notifyTransferSome(final long offset) {
for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
@@ -180,7 +179,9 @@ public class HAService {
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
- /** {@inheritDoc} */
+ /**
+ * {@inheritDoc}
+ */
@Override
public void shutdown(final boolean interrupt) {
super.shutdown(interrupt);
@@ -192,7 +193,9 @@ public class HAService {
}
}
- /** {@inheritDoc} */
+ /**
+ * {@inheritDoc}
+ */
@Override
public void run() {
log.info(this.getServiceName() + " service started");
@@ -235,7 +238,9 @@ public class HAService {
log.info(this.getServiceName() + " service end");
}
- /** {@inheritDoc} */
+ /**
+ * {@inheritDoc}
+ */
@Override
public String getServiceName() {
return AcceptSocketService.class.getSimpleName();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
index 862e620..edc2476 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java
@@ -209,7 +209,7 @@ public class IndexFile {
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
- for (int nextIndexToRead = slotValue;;) {
+ for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java
index 3195448..44021cd 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java
@@ -20,7 +20,6 @@ import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
public class IndexHeader {
public static final int INDEX_HEADER_SIZE = 40;
private static int beginTimestampIndex = 0;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
index c434df5..e562c2a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java
@@ -35,7 +35,9 @@ import org.slf4j.LoggerFactory;
public class IndexService {
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- /** Maximum times to attempt index file creation. */
+ /**
+ * Maximum times to attempt index file creation.
+ */
private static final int MAX_TRY_IDX_CREATE = 3;
private final DefaultMessageStore defaultMessageStore;
private final int hashSlotNum;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
index 64b4097..a3240a4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
@@ -38,7 +38,6 @@ public class BrokerStats {
this.defaultMessageStore = defaultMessageStore;
}
-
public void record() {
this.msgPutTotalYesterdayMorning = this.msgPutTotalTodayMorning;
this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 1515eb4..64f76ca 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -123,9 +123,11 @@ public class BrokerStatsManager {
public void incTopicPutNums(final String topic) {
this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1);
}
+
public void incTopicPutNums(final String topic, int num, int times) {
this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, num, times);
}
+
public void incTopicPutSize(final String topic, final int size) {
this.statsTable.get(TOPIC_PUT_SIZE).addValue(topic, size, 1);
}
@@ -156,9 +158,11 @@ public class BrokerStatsManager {
public void incBrokerPutNums() {
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet();
}
+
public void incBrokerPutNums(final int incValue) {
this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
}
+
public void incBrokerGetNums(final int incValue) {
this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
}
@@ -173,12 +177,14 @@ public class BrokerStatsManager {
return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps();
}
- public void recordDiskFallBehindTime(final String group, final String topic, final int queueId, final long fallBehind) {
+ public void recordDiskFallBehindTime(final String group, final String topic, final int queueId,
+ final long fallBehind) {
final String statsKey = String.format("%d@%s@%s", queueId, topic, group);
this.momentStatsItemSetFallTime.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
}
- public void recordDiskFallBehindSize(final String group, final String topic, final int queueId, final long fallBehind) {
+ public void recordDiskFallBehindSize(final String group, final String topic, final int queueId,
+ final long fallBehind) {
final String statsKey = String.format("%d@%s@%s", queueId, topic, group);
this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
index eaa18d5..7f88d36 100644
--- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java
@@ -6,18 +6,17 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.store;
-
import java.io.File;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -39,7 +38,6 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-
public class AppendCallbackTest {
AppendMessageCallback callback;
@@ -47,7 +45,7 @@ public class AppendCallbackTest {
CommitLog.MessageExtBatchEncoder batchEncoder = new CommitLog.MessageExtBatchEncoder(10 * 1024 * 1024);
@Before
- public void init() throws Exception{
+ public void init() throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
@@ -62,16 +60,15 @@ public class AppendCallbackTest {
}
@After
- public void destroy(){
+ public void destroy() {
UtilAll.deleteFile(new File(System.getProperty("user.home") + File.separator + "unitteststore"));
}
-
@Test
- public void testAppendMessageBatchEndOfFile() throws Exception{
- List<Message> messages = new ArrayList<>();
+ public void testAppendMessageBatchEndOfFile() throws Exception {
+ List<Message> messages = new ArrayList<>();
String topic = "test-topic";
- int queue= 0;
+ int queue = 0;
for (int i = 0; i < 10; i++) {
Message msg = new Message();
msg.setBody("body".getBytes());
@@ -83,8 +80,8 @@ public class AppendCallbackTest {
messageExtBatch.setTopic(topic);
messageExtBatch.setQueueId(queue);
messageExtBatch.setBornTimestamp(System.currentTimeMillis());
- messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1",123));
- messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1",124));
+ messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 123));
+ messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
@@ -99,11 +96,12 @@ public class AppendCallbackTest {
assertTrue(result.getMsgId().length() > 0); //should have already constructed some message ids
}
+
@Test
public void testAppendMessageBatchSucc() throws Exception {
- List<Message> messages = new ArrayList<>();
+ List<Message> messages = new ArrayList<>();
String topic = "test-topic";
- int queue= 0;
+ int queue = 0;
for (int i = 0; i < 10; i++) {
Message msg = new Message();
msg.setBody("body".getBytes());
@@ -115,8 +113,8 @@ public class AppendCallbackTest {
messageExtBatch.setTopic(topic);
messageExtBatch.setQueueId(queue);
messageExtBatch.setBornTimestamp(System.currentTimeMillis());
- messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1",123));
- messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1",124));
+ messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 123));
+ messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124));
messageExtBatch.setBody(MessageDecoder.encodeMessages(messages));
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
@@ -131,7 +129,7 @@ public class AppendCallbackTest {
assertEquals(messages.size(), allresult.getMsgNum());
Set<String> msgIds = new HashSet<>();
- for (String msgId: allresult.getMsgId().split(",")) {
+ for (String msgId : allresult.getMsgId().split(",")) {
assertEquals(32, msgId.length());
msgIds.add(msgId);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java
index 6c2f5ad..e213a02 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java
@@ -36,7 +36,6 @@ public class ConsumeQueueExtTest {
private static final int cqExtFileSize = 10 * unitSizeWithBitMap;
private static final int unitCount = 20;
-
protected ConsumeQueueExt genExt() {
return new ConsumeQueueExt(
topic, queueId, storePath, cqExtFileSize, bitMapLength
@@ -65,7 +64,7 @@ public class ConsumeQueueExtTest {
}
protected void putSth(ConsumeQueueExt consumeQueueExt, boolean getAfterPut,
- boolean unitSameSize, int unitCount) {
+ boolean unitSameSize, int unitCount) {
for (int i = 0; i < unitCount; i++) {
ConsumeQueueExt.CqExtUnit putUnit =
unitSameSize ? genUnit(true) : genUnit(i % 2 == 0);
@@ -236,7 +235,7 @@ public class ConsumeQueueExtTest {
}
@After
- public void destroy(){
+ public void destroy() {
UtilAll.deleteFile(new File(storePath));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index d07a768..b03f2fc 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -83,9 +83,8 @@ public class ConsumeQueueTest {
return msg;
}
-
public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize,
- boolean enableCqExt, int cqExtFileSize) {
+ boolean enableCqExt, int cqExtFileSize) {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize);
messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize);
@@ -112,7 +111,7 @@ public class ConsumeQueueTest {
new MessageArrivingListener() {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
- long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
+ long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
}
}
, brokerConfig);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index a81f328..28d7478 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -56,7 +56,7 @@ public class DefaultMessageStoreTest {
File file = new File(messageStoreConfig.getStorePathRootDir());
UtilAll.deleteFile(file);
}
-
+
public MessageStore buildMessageStore() throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10);
@@ -149,7 +149,6 @@ public class DefaultMessageStoreTest {
GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null);
assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32);
-
GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null);
assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
index cb0210f..3c03ee7 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.test.listener.AbstractListener;
public class RMQSqlConsumer extends RMQNormalConsumer {
private static Logger logger = Logger.getLogger(RMQSqlConsumer.class);
private MessageSelector selector;
+
public RMQSqlConsumer(String nsAddr, String topic, MessageSelector selector,
String consumerGroup, AbstractListener listener) {
super(nsAddr, topic, "*", consumerGroup, listener);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 64911fb..e1b8c91 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -55,7 +55,8 @@ public class IntegrationTestBase {
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override public void run() {
+ @Override
+ public void run() {
try {
for (BrokerController brokerController : BROKER_CONTROLLERS) {
if (brokerController != null) {
@@ -78,7 +79,7 @@ public class IntegrationTestBase {
for (File file : TMPE_FILES) {
UtilAll.deleteFile(file);
}
- } catch (Exception e){
+ } catch (Exception e) {
logger.error("Shutdown error", e);
}
}
@@ -149,7 +150,7 @@ public class IntegrationTestBase {
return brokerController;
}
- public static boolean initTopic(String topic, String nsAddr, String clusterName,int queueNumbers){
+ public static boolean initTopic(String topic, String nsAddr, String clusterName, int queueNumbers) {
long startTime = System.currentTimeMillis();
boolean createResult;
@@ -159,7 +160,7 @@ public class IntegrationTestBase {
break;
} else if (System.currentTimeMillis() - startTime > topicCreateTime) {
Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic,
- System.currentTimeMillis() - startTime));
+ System.currentTimeMillis() - startTime));
break;
} else {
TestUtils.waitForMoment(500);
@@ -171,7 +172,7 @@ public class IntegrationTestBase {
}
public static boolean initTopic(String topic, String nsAddr, String clusterName) {
- return initTopic(topic, nsAddr, clusterName,8);
+ return initTopic(topic, nsAddr, clusterName, 8);
}
public static void deleteFile(File file) {
@@ -188,5 +189,5 @@ public class IntegrationTestBase {
file.delete();
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
index 47cde74..9d8aeb3 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
@@ -62,7 +62,6 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT {
consumer1.getConsumerGroup(), topic, "*", new RMQOrderListener());
TestUtils.waitForSeconds(waitTime);
-
List<MessageQueue> mqs = producer.getMessageQueue();
MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
producer.send(mqMsgs.getMsgsWithMQ());
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
index 115595d..15d91a1 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
@@ -67,7 +67,7 @@ public class SqlFilterIT extends BaseConf {
consumer.getListener().waitForMessageConsume(msgSize * 2, consumeTime);
assertThat(producer.getAllMsgBody())
.containsAllIn(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
- consumer.getListener().getAllMsgBody()));
+ consumer.getListener().getAllMsgBody()));
assertThat(consumer.getListener().getAllMsgBody().size()).isEqualTo(msgSize * 2);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
index e372a1b..6fb34af 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java
@@ -77,7 +77,6 @@ public class BatchSendIT extends BaseConf {
}
}
-
@Test
public void testBatchSend_CheckProperties() throws Exception {
List<Message> messageList = new ArrayList<>();
@@ -91,7 +90,6 @@ public class BatchSendIT extends BaseConf {
message.setBody("body".getBytes());
messageList.add(message);
-
DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
SendResult sendResult = producer.send(messageList);
Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus());
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 409ea33..eb45de2 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -119,12 +119,14 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ public MessageExt viewMessage(
+ String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return defaultMQAdminExtImpl.viewMessage(offsetMsgId);
}
@Override
- public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException,
+ public QueryResult queryMessage(String topic, String key, int maxNum, long begin,
+ long end) throws MQClientException,
InterruptedException {
return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, end);
}
@@ -140,7 +142,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public void updateBrokerConfig(String brokerAddr, Properties properties) throws RemotingConnectException, RemotingSendRequestException,
+ public void updateBrokerConfig(String brokerAddr,
+ Properties properties) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
defaultMQAdminExtImpl.updateBrokerConfig(brokerAddr, properties);
}
@@ -158,7 +161,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) throws RemotingException,
+ public void createAndUpdateSubscriptionGroupConfig(String addr,
+ SubscriptionGroupConfig config) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
defaultMQAdminExtImpl.createAndUpdateSubscriptionGroupConfig(addr, config);
}
@@ -174,7 +178,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public TopicStatsTable examineTopicStats(String topic) throws RemotingException, MQClientException, InterruptedException,
+ public TopicStatsTable examineTopicStats(
+ String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
return defaultMQAdminExtImpl.examineTopicStats(topic);
}
@@ -185,24 +190,28 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException {
+ public TopicList fetchTopicsByCLuster(
+ String clusterName) throws RemotingException, MQClientException, InterruptedException {
return this.defaultMQAdminExtImpl.fetchTopicsByCLuster(clusterName);
}
@Override
- public KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
+ public KVTable fetchBrokerRuntimeStats(
+ final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
return this.defaultMQAdminExtImpl.fetchBrokerRuntimeStats(brokerAddr);
}
@Override
- public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
+ public ConsumeStats examineConsumeStats(
+ String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
return examineConsumeStats(consumerGroup, null);
}
@Override
- public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException,
+ public ConsumeStats examineConsumeStats(String consumerGroup,
+ String topic) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic);
}
@@ -214,18 +223,21 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public TopicRouteData examineTopicRouteInfo(String topic) throws RemotingException, MQClientException, InterruptedException {
+ public TopicRouteData examineTopicRouteInfo(
+ String topic) throws RemotingException, MQClientException, InterruptedException {
return defaultMQAdminExtImpl.examineTopicRouteInfo(topic);
}
@Override
- public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup) throws InterruptedException, MQBrokerException,
+ public ConsumerConnection examineConsumerConnectionInfo(
+ String consumerGroup) throws InterruptedException, MQBrokerException,
RemotingException, MQClientException {
return defaultMQAdminExtImpl.examineConsumerConnectionInfo(consumerGroup);
}
@Override
- public ProducerConnection examineProducerConnectionInfo(String producerGroup, final String topic) throws RemotingException,
+ public ProducerConnection examineProducerConnectionInfo(String producerGroup,
+ final String topic) throws RemotingException,
MQClientException, InterruptedException, MQBrokerException {
return defaultMQAdminExtImpl.examineProducerConnectionInfo(producerGroup, topic);
}
@@ -247,46 +259,54 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public String getKVConfig(String namespace, String key) throws RemotingException, MQClientException, InterruptedException {
+ public String getKVConfig(String namespace,
+ String key) throws RemotingException, MQClientException, InterruptedException {
return defaultMQAdminExtImpl.getKVConfig(namespace, key);
}
@Override
- public KVTable getKVListByNamespace(String namespace) throws RemotingException, MQClientException, InterruptedException {
+ public KVTable getKVListByNamespace(
+ String namespace) throws RemotingException, MQClientException, InterruptedException {
return defaultMQAdminExtImpl.getKVListByNamespace(namespace);
}
@Override
- public void deleteTopicInBroker(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException,
+ public void deleteTopicInBroker(Set<String> addrs,
+ String topic) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
defaultMQAdminExtImpl.deleteTopicInBroker(addrs, topic);
}
@Override
- public void deleteTopicInNameServer(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException,
+ public void deleteTopicInNameServer(Set<String> addrs,
+ String topic) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
defaultMQAdminExtImpl.deleteTopicInNameServer(addrs, topic);
}
@Override
- public void deleteSubscriptionGroup(String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException,
+ public void deleteSubscriptionGroup(String addr,
+ String groupName) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName);
}
@Override
- public void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException,
+ public void createAndUpdateKvConfig(String namespace, String key,
+ String value) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
defaultMQAdminExtImpl.createAndUpdateKvConfig(namespace, key, value);
}
@Override
- public void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException,
+ public void deleteKvConfig(String namespace,
+ String key) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
defaultMQAdminExtImpl.deleteKvConfig(namespace, key);
}
- public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force)
+ public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
+ boolean force)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return defaultMQAdminExtImpl.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
}
@@ -297,49 +317,57 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return resetOffsetByTimestamp(topic, group, timestamp, isForce, false);
}
- public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC)
+ public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
+ boolean isC)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return defaultMQAdminExtImpl.resetOffsetByTimestamp(topic, group, timestamp, isForce, isC);
}
@Override
- public void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException,
+ public void resetOffsetNew(String consumerGroup, String topic,
+ long timestamp) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
this.defaultMQAdminExtImpl.resetOffsetNew(consumerGroup, topic, timestamp);
}
@Override
- public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException,
+ public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
+ String clientAddr) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
return defaultMQAdminExtImpl.getConsumeStatus(topic, group, clientAddr);
}
@Override
- public void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException,
+ public void createOrUpdateOrderConf(String key, String value,
+ boolean isCluster) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
defaultMQAdminExtImpl.createOrUpdateOrderConf(key, value, isCluster);
}
@Override
- public GroupList queryTopicConsumeByWho(String topic) throws InterruptedException, MQBrokerException, RemotingException,
+ public GroupList queryTopicConsumeByWho(
+ String topic) throws InterruptedException, MQBrokerException, RemotingException,
MQClientException {
return this.defaultMQAdminExtImpl.queryTopicConsumeByWho(topic);
}
@Override
- public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException,
+ public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic,
+ final String group) throws InterruptedException, MQBrokerException,
RemotingException, MQClientException {
return this.defaultMQAdminExtImpl.queryConsumeTimeSpan(topic, group);
}
@Override
- public boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+ public boolean cleanExpiredConsumerQueue(
+ String cluster) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException {
return defaultMQAdminExtImpl.cleanExpiredConsumerQueue(cluster);
}
@Override
- public boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+ public boolean cleanExpiredConsumerQueueByAddr(
+ String addr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException {
return defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr(addr);
}
@@ -357,7 +385,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) throws RemotingException,
+ public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId,
+ boolean jstack) throws RemotingException,
MQClientException, InterruptedException {
return defaultMQAdminExtImpl.getConsumerRunningInfo(consumerGroup, clientId, jstack);
}
@@ -369,25 +398,29 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId, final String topic,
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId,
+ final String topic,
final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
}
@Override
- public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
+ public List<MessageTrack> messageTrackDetail(
+ MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
return this.defaultMQAdminExtImpl.messageTrackDetail(msg);
}
@Override
- public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException,
+ public void cloneGroupOffset(String srcGroup, String destGroup, String topic,
+ boolean isOffline) throws RemotingException,
MQClientException, InterruptedException, MQBrokerException {
this.defaultMQAdminExtImpl.cloneGroupOffset(srcGroup, destGroup, topic, isOffline);
}
@Override
- public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey) throws RemotingConnectException,
+ public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
+ String statsKey) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
return this.defaultMQAdminExtImpl.viewBrokerStatsData(brokerAddr, statsName, statsKey);
}
@@ -406,7 +439,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
+ public Set<String> getTopicClusterList(
+ final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
return this.defaultMQAdminExtImpl.getTopicClusterList(topic);
}
@@ -472,7 +506,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
}
@Override
- public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index, int count, String consumerGroup)
+ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index,
+ int count, String consumerGroup)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
return this.defaultMQAdminExtImpl.queryConsumeQueue(
brokerAddr, topic, queueId, index, count, consumerGroup
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 12aea8a..c93c400 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -160,7 +160,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public void updateBrokerConfig(String brokerAddr, Properties properties) throws RemotingConnectException, RemotingSendRequestException,
+ public void updateBrokerConfig(String brokerAddr,
+ Properties properties) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
this.mqClientInstance.getMQClientAPIImpl().updateBrokerConfig(brokerAddr, properties, timeoutMillis);
}
@@ -178,7 +179,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) throws RemotingException,
+ public void createAndUpdateSubscriptionGroupConfig(String addr,
+ SubscriptionGroupConfig config) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().createSubscriptionGroup(addr, config, timeoutMillis);
}
@@ -194,7 +196,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public TopicStatsTable examineTopicStats(String topic) throws RemotingException, MQClientException, InterruptedException,
+ public TopicStatsTable examineTopicStats(
+ String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
TopicStatsTable topicStatsTable = new TopicStatsTable();
@@ -220,24 +223,28 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException {
+ public TopicList fetchTopicsByCLuster(
+ String clusterName) throws RemotingException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicsByCluster(clusterName, timeoutMillis);
}
@Override
- public KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
+ public KVTable fetchBrokerRuntimeStats(
+ final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getBrokerRuntimeInfo(brokerAddr, timeoutMillis);
}
@Override
- public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
+ public ConsumeStats examineConsumeStats(
+ String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
return examineConsumeStats(consumerGroup, null);
}
@Override
- public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException,
+ public ConsumeStats examineConsumeStats(String consumerGroup,
+ String topic) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException {
String retryTopic = MixAll.getRetryTopic(consumerGroup);
TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic);
@@ -269,12 +276,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public TopicRouteData examineTopicRouteInfo(String topic) throws RemotingException, MQClientException, InterruptedException {
+ public TopicRouteData examineTopicRouteInfo(
+ String topic) throws RemotingException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
}
@Override
- public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ public MessageExt viewMessage(String topic,
+ String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
MessageDecoder.decodeMessageId(msgId);
return this.viewMessage(msgId);
@@ -285,7 +294,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup) throws InterruptedException, MQBrokerException,
+ public ConsumerConnection examineConsumerConnectionInfo(
+ String consumerGroup) throws InterruptedException, MQBrokerException,
RemotingException, MQClientException {
ConsumerConnection result = new ConsumerConnection();
String topic = MixAll.getRetryTopic(consumerGroup);
@@ -308,7 +318,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public ProducerConnection examineProducerConnectionInfo(String producerGroup, final String topic) throws RemotingException,
+ public ProducerConnection examineProducerConnectionInfo(String producerGroup,
+ final String topic) throws RemotingException,
MQClientException, InterruptedException, MQBrokerException {
ProducerConnection result = new ProducerConnection();
List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas();
@@ -345,17 +356,20 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public String getKVConfig(String namespace, String key) throws RemotingException, MQClientException, InterruptedException {
+ public String getKVConfig(String namespace,
+ String key) throws RemotingException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(namespace, key, timeoutMillis);
}
@Override
- public KVTable getKVListByNamespace(String namespace) throws RemotingException, MQClientException, InterruptedException {
+ public KVTable getKVListByNamespace(
+ String namespace) throws RemotingException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().getKVListByNamespace(namespace, timeoutMillis);
}
@Override
- public void deleteTopicInBroker(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException,
+ public void deleteTopicInBroker(Set<String> addrs,
+ String topic) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
for (String addr : addrs) {
this.mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, timeoutMillis);
@@ -363,7 +377,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public void deleteTopicInNameServer(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException,
+ public void deleteTopicInNameServer(Set<String> addrs,
+ String topic) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
if (addrs == null) {
String ns = this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr();
@@ -375,25 +390,29 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public void deleteSubscriptionGroup(String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException,
+ public void deleteSubscriptionGroup(String addr,
+ String groupName) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, timeoutMillis);
}
@Override
- public void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException,
+ public void createAndUpdateKvConfig(String namespace, String key,
+ String value) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(namespace, key, value, timeoutMillis);
}
@Override
- public void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException,
+ public void deleteKvConfig(String namespace,
+ String key) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
this.mqClientInstance.getMQClientAPIImpl().deleteKVConfigValue(namespace, key, timeoutMillis);
}
@Override
- public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force)
+ public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp,
+ boolean force)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List<RollbackStats> rollbackStatsList = new ArrayList<RollbackStats>();
@@ -444,7 +463,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException,
+ public void resetOffsetNew(String consumerGroup, String topic,
+ long timestamp) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
try {
this.resetOffsetByTimestamp(topic, consumerGroup, timestamp, true);
@@ -457,7 +477,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
}
- public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC)
+ public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
+ boolean isC)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
@@ -478,7 +499,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return allOffsetTable;
}
- private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue queue, OffsetWrapper offsetWrapper,
+ private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue queue,
+ OffsetWrapper offsetWrapper,
long timestamp, boolean force) throws RemotingException, InterruptedException, MQBrokerException {
long resetOffset;
if (timestamp == -1) {
@@ -511,7 +533,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException,
+ public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
+ String clientAddr) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
@@ -525,7 +548,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return Collections.EMPTY_MAP;
}
- public void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException,
+ public void createOrUpdateOrderConf(String key, String value,
+ boolean isCluster) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
if (isCluster) {
@@ -564,7 +588,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public GroupList queryTopicConsumeByWho(String topic) throws InterruptedException, MQBrokerException, RemotingException,
+ public GroupList queryTopicConsumeByWho(
+ String topic) throws InterruptedException, MQBrokerException, RemotingException,
MQClientException {
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
@@ -581,7 +606,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException,
+ public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic,
+ final String group) throws InterruptedException, MQBrokerException,
RemotingException, MQClientException {
List<QueueTimeSpan> spanSet = new ArrayList<QueueTimeSpan>();
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
@@ -595,7 +621,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+ public boolean cleanExpiredConsumerQueue(
+ String cluster) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException {
boolean result = false;
try {
@@ -614,7 +641,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return result;
}
- public boolean cleanExpiredConsumerQueueByCluster(ClusterInfo clusterInfo, String cluster) throws RemotingConnectException,
+ public boolean cleanExpiredConsumerQueueByCluster(ClusterInfo clusterInfo,
+ String cluster) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
boolean result = false;
String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster);
@@ -625,7 +653,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+ public boolean cleanExpiredConsumerQueueByAddr(
+ String addr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException {
boolean result = mqClientInstance.getMQClientAPIImpl().cleanExpiredConsumeQueue(addr, timeoutMillis);
log.warn("clean expired ConsumeQueue on target " + addr + " broker " + result);
@@ -671,7 +700,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) throws RemotingException,
+ public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId,
+ boolean jstack) throws RemotingException,
MQClientException, InterruptedException {
String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
@@ -698,7 +728,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId, final String topic,
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId,
+ final String topic,
final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
MessageExt msg = this.viewMessage(topic, msgId);
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
@@ -712,7 +743,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
+ public List<MessageTrack> messageTrackDetail(
+ MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
List<MessageTrack> result = new ArrayList<MessageTrack>();
@@ -794,7 +826,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return result;
}
- public boolean consumed(final MessageExt msg, final String group) throws RemotingException, MQClientException, InterruptedException,
+ public boolean consumed(final MessageExt msg,
+ final String group) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException {
ConsumeStats cstats = this.examineConsumeStats(group);
@@ -822,7 +855,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException,
+ public void cloneGroupOffset(String srcGroup, String destGroup, String topic,
+ boolean isOffline) throws RemotingException,
MQClientException, InterruptedException, MQBrokerException {
String retryTopic = MixAll.getRetryTopic(srcGroup);
TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic);
@@ -836,7 +870,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey) throws RemotingConnectException,
+ public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName,
+ String statsKey) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().viewBrokerStatsData(brokerAddr, statsName, statsKey, timeoutMillis);
}
@@ -855,7 +890,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException,
+ public Set<String> getTopicClusterList(
+ final String topic) throws InterruptedException, MQBrokerException, MQClientException,
RemotingException {
Set<String> clusterSet = new HashSet<String>();
ClusterInfo clusterInfo = examineBrokerClusterInfo();
@@ -873,13 +909,15 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException,
+ public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
+ long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr, timeoutMillis);
}
@Override
- public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException,
+ public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr,
+ long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis);
}
@@ -915,12 +953,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ public MessageExt viewMessage(
+ String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.mqClientInstance.getMQAdminImpl().viewMessage(msgId);
}
@Override
- public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException,
+ public QueryResult queryMessage(String topic, String key, int maxNum, long begin,
+ long end) throws MQClientException,
InterruptedException {
return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
}
@@ -953,7 +993,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
}
@Override
- public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index, int count, String consumerGroup)
+ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index,
+ int count, String consumerGroup)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
return this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue(
brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 82add92..16b4427 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -64,42 +64,51 @@ public interface MQAdminExt extends MQAdmin {
Properties getBrokerConfig(final String brokerAddr) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException;
- void createAndUpdateTopicConfig(final String addr, final TopicConfig config) throws RemotingException, MQBrokerException,
+ void createAndUpdateTopicConfig(final String addr,
+ final TopicConfig config) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
- void createAndUpdateSubscriptionGroupConfig(final String addr, final SubscriptionGroupConfig config) throws RemotingException,
+ void createAndUpdateSubscriptionGroupConfig(final String addr,
+ final SubscriptionGroupConfig config) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group);
TopicConfig examineTopicConfig(final String addr, final String topic);
- TopicStatsTable examineTopicStats(final String topic) throws RemotingException, MQClientException, InterruptedException,
+ TopicStatsTable examineTopicStats(
+ final String topic) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException;
TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException;
- TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException;
+ TopicList fetchTopicsByCLuster(
+ String clusterName) throws RemotingException, MQClientException, InterruptedException;
- KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
+ KVTable fetchBrokerRuntimeStats(
+ final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException;
- ConsumeStats examineConsumeStats(final String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
+ ConsumeStats examineConsumeStats(
+ final String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException;
- ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) throws RemotingException, MQClientException,
+ ConsumeStats examineConsumeStats(final String consumerGroup,
+ final String topic) throws RemotingException, MQClientException,
InterruptedException, MQBrokerException;
ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException;
- TopicRouteData examineTopicRouteInfo(final String topic) throws RemotingException, MQClientException, InterruptedException;
+ TopicRouteData examineTopicRouteInfo(
+ final String topic) throws RemotingException, MQClientException, InterruptedException;
ConsumerConnection examineConsumerConnectionInfo(final String consumerGroup) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException,
MQClientException;
- ProducerConnection examineProducerConnectionInfo(final String producerGroup, final String topic) throws RemotingException,
+ ProducerConnection examineProducerConnectionInfo(final String producerGroup,
+ final String topic) throws RemotingException,
MQClientException, InterruptedException, MQBrokerException;
List<String> getNameServerAddressList();
@@ -109,20 +118,24 @@ public interface MQAdminExt extends MQAdmin {
void putKVConfig(final String namespace, final String key, final String value);
- String getKVConfig(final String namespace, final String key) throws RemotingException, MQClientException, InterruptedException;
+ String getKVConfig(final String namespace,
+ final String key) throws RemotingException, MQClientException, InterruptedException;
- KVTable getKVListByNamespace(final String namespace) throws RemotingException, MQClientException, InterruptedException;
+ KVTable getKVListByNamespace(
+ final String namespace) throws RemotingException, MQClientException, InterruptedException;
void deleteTopicInBroker(final Set<String> addrs, final String topic) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
- void deleteTopicInNameServer(final Set<String> addrs, final String topic) throws RemotingException, MQBrokerException,
+ void deleteTopicInNameServer(final Set<String> addrs,
+ final String topic) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void deleteSubscriptionGroup(final String addr, String groupName) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
- void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException,
+ void createAndUpdateKvConfig(String namespace, String key,
+ String value) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException,
@@ -137,16 +150,19 @@ public interface MQAdminExt extends MQAdmin {
void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
- Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException,
+ Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group,
+ String clientAddr) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
- void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException,
+ void createOrUpdateOrderConf(String key, String value,
+ boolean isCluster) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;
GroupList queryTopicConsumeByWho(final String topic) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException, MQClientException;
- List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException,
+ List<QueueTimeSpan> queryConsumeTimeSpan(final String topic,
+ final String group) throws InterruptedException, MQBrokerException,
RemotingException, MQClientException;
boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException,
@@ -173,7 +189,8 @@ public interface MQAdminExt extends MQAdmin {
String topic,
String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException;
- List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
+ List<MessageTrack> messageTrackDetail(
+ MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
MQBrokerException;
void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException,
@@ -190,7 +207,8 @@ public interface MQAdminExt extends MQAdmin {
long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQClientException, InterruptedException;
- Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException;
+ Set<String> getTopicClusterList(
+ final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException;
SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
@@ -209,18 +227,9 @@ public interface MQAdminExt extends MQAdmin {
* Command Code : RequestCode.UPDATE_NAMESRV_CONFIG
*
* <br> If param(nameServers) is null or empty, will use name servers from ns!
- *
- * @param properties
- * @param nameServers
- * @throws InterruptedException
- * @throws RemotingConnectException
- * @throws UnsupportedEncodingException
- * @throws RemotingSendRequestException
- * @throws RemotingTimeoutException
- * @throws MQClientException
- * @throws MQBrokerException
*/
- void updateNameServerConfig(final Properties properties, final List<String> nameServers) throws InterruptedException, RemotingConnectException,
+ void updateNameServerConfig(final Properties properties,
+ final List<String> nameServers) throws InterruptedException, RemotingConnectException,
UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException,
MQClientException, MQBrokerException;
@@ -230,14 +239,7 @@ public interface MQAdminExt extends MQAdmin {
* Command Code : RequestCode.GET_NAMESRV_CONFIG
* <br> If param(nameServers) is null or empty, will use name servers from ns!
*
- * @param nameServers
* @return The fetched name server config
- * @throws InterruptedException
- * @throws RemotingTimeoutException
- * @throws RemotingSendRequestException
- * @throws RemotingConnectException
- * @throws MQClientException
- * @throws UnsupportedEncodingException
*/
Map<String, Properties> getNameServerConfig(final List<String> nameServers) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
@@ -252,15 +254,9 @@ public interface MQAdminExt extends MQAdmin {
* @param index start offset
* @param count how many
* @param consumerGroup group
- * @return
- * @throws InterruptedException
- * @throws RemotingTimeoutException
- * @throws RemotingSendRequestException
- * @throws RemotingConnectException
- * @throws MQClientException
*/
QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr,
- final String topic, final int queueId,
- final long index, final int count, final String consumerGroup)
+ final String topic, final int queueId,
+ final long index, final int count, final String consumerGroup)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
index 919f673..11a3604 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java
@@ -60,7 +60,8 @@ public class GetBrokerConfigCommand implements SubCommand {
}
@Override
- public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) throws SubCommandException {
+ public void execute(final CommandLine commandLine, final Options options,
+ final RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
index c7b8ac5..6a0cd71 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
@@ -61,7 +61,8 @@ public class ClusterListSubCommand implements SubCommand {
}
@Override
- public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
+ public void execute(final CommandLine commandLine, final Options options,
+ RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
@@ -164,7 +165,8 @@ public class ClusterListSubCommand implements SubCommand {
}
}
- private void printClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException,
+ private void printClusterBaseInfo(
+ final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException,
RemotingSendRequestException, InterruptedException, MQBrokerException {
ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
@@ -253,16 +255,16 @@ public class ClusterListSubCommand implements SubCommand {
}
System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n",
- clusterName,
- brokerName,
- next1.getKey(),
- next1.getValue(),
- version,
- String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills),
- String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills),
- pageCacheLockTimeMills,
- String.format("%2.2f", hour),
- String.format("%.4f", space)
+ clusterName,
+ brokerName,
+ next1.getKey(),
+ next1.getValue(),
+ version,
+ String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills),
+ String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills),
+ pageCacheLockTimeMills,
+ String.format("%2.2f", hour),
+ String.format("%.4f", space)
);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
index 910eb1c..7316526 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java
@@ -90,7 +90,8 @@ public class UpdateSubGroupSubCommand implements SubCommand {
}
@Override
- public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
+ public void execute(final CommandLine commandLine, final Options options,
+ RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java
index 64f634e..d233b65 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java
@@ -45,7 +45,8 @@ public class DecodeMessageIdCommond implements SubCommand {
}
@Override
- public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException {
+ public void execute(final CommandLine commandLine, final Options options,
+ RPCHook rpcHook) throws SubCommandException {
String messageId = commandLine.getOptionValue('i').trim();
try {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
index ac51267..46c5f74 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
@@ -52,7 +52,8 @@ public class PrintMessageByQueueCommand implements SubCommand {
return timestamp;
}
- private static void calculateByTag(final List<MessageExt> msgs, final Map<String, AtomicLong> tagCalmap, final boolean calByTag) {
+ private static void calculateByTag(final List<MessageExt> msgs, final Map<String, AtomicLong> tagCalmap,
+ final boolean calByTag) {
if (!calByTag)
return;
@@ -85,7 +86,8 @@ public class PrintMessageByQueueCommand implements SubCommand {
}
}
- public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printMsg, boolean printBody) {
+ public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printMsg,
+ boolean printBody) {
if (!printMsg)
return;
@@ -162,11 +164,11 @@ public class PrintMessageByQueueCommand implements SubCommand {
String charsetName =
!commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim();
boolean printMsg =
- commandLine.hasOption('p') && Boolean.parseBoolean(commandLine.getOptionValue('p').trim());
+ commandLine.hasOption('p') && Boolean.parseBoolean(commandLine.getOptionValue('p').trim());
boolean printBody =
- commandLine.hasOption('d') && Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
+ commandLine.hasOption('d') && Boolean.parseBoolean(commandLine.getOptionValue('d').trim());
boolean calByTag =
- commandLine.hasOption('f') && Boolean.parseBoolean(commandLine.getOptionValue('f').trim());
+ commandLine.hasOption('f') && Boolean.parseBoolean(commandLine.getOptionValue('f').trim());
String subExpression =
!commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
index 05ae003..39abbc9 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
@@ -262,7 +262,8 @@ public class QueryMsgByIdSubCommand implements SubCommand {
}
}
- private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final String consumerGroup, final String clientId, final String msgId) {
+ private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final String consumerGroup, final String clientId,
+ final String msgId) {
try {
ConsumeMessageDirectlyResult result =
defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, msgId);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
index 5c93ad7..0103b50 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
@@ -39,7 +39,8 @@ import org.apache.rocketmq.tools.command.SubCommandException;
public class QueryMsgByUniqueKeySubCommand implements SubCommand {
- public static void queryById(final DefaultMQAdminExt admin, final String topic, final String msgId) throws MQClientException,
+ public static void queryById(final DefaultMQAdminExt admin, final String topic,
+ final String msgId) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException, IOException {
MessageExt msg = admin.viewMessage(topic, msgId);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java
index ce63616..22ce867 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java
@@ -46,7 +46,8 @@ public class GetNamesrvConfigCommand implements SubCommand {
}
@Override
- public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) throws SubCommandException {
+ public void execute(final CommandLine commandLine, final Options options,
+ final RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {