You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:15:19 UTC
[88/99] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 3e6673c..5fdcab2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.client.impl;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -25,7 +32,12 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageId;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
@@ -39,42 +51,28 @@ import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-
public class MQAdminImpl {
private final Logger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private long timeoutMillis = 6000;
-
public MQAdminImpl(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
}
-
public long getTimeoutMillis() {
return timeoutMillis;
}
-
public void setTimeoutMillis(long timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}
-
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
}
-
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
try {
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);
@@ -129,7 +127,6 @@ public class MQAdminImpl {
}
}
-
public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
try {
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
@@ -146,7 +143,6 @@ public class MQAdminImpl {
throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
}
-
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
try {
TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
@@ -160,14 +156,13 @@ public class MQAdminImpl {
}
} catch (Exception e) {
throw new MQClientException(
- "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), //
- e);
+ "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), //
+ e);
}
throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
}
-
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
@@ -178,7 +173,7 @@ public class MQAdminImpl {
if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
- timeoutMillis);
+ timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
@@ -187,7 +182,6 @@ public class MQAdminImpl {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
-
public long maxOffset(MessageQueue mq) throws MQClientException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
@@ -206,7 +200,6 @@ public class MQAdminImpl {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
-
public long minOffset(MessageQueue mq) throws MQClientException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
@@ -225,7 +218,6 @@ public class MQAdminImpl {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
-
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
@@ -236,7 +228,7 @@ public class MQAdminImpl {
if (brokerAddr != null) {
try {
return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq.getTopic(), mq.getQueueId(),
- timeoutMillis);
+ timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
}
@@ -254,18 +246,18 @@ public class MQAdminImpl {
throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
}
return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
- messageId.getOffset(), timeoutMillis);
+ messageId.getOffset(), timeoutMillis);
}
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException,
- InterruptedException {
+ InterruptedException {
return queryMessage(topic, key, maxNum, begin, end, false);
}
public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws InterruptedException, MQClientException {
QueryResult qr = this.queryMessage(topic, uniqKey, 32,
- MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);
+ MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);
if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
return qr.getMessageList().get(0);
} else {
@@ -274,7 +266,7 @@ public class MQAdminImpl {
}
protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end, boolean isUniqKey) throws MQClientException,
- InterruptedException {
+ InterruptedException {
TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
if (null == topicRouteData) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
@@ -304,43 +296,43 @@ public class MQAdminImpl {
requestHeader.setEndTimestamp(end);
this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader, timeoutMillis * 3,
- new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- try {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- QueryMessageResponseHeader responseHeader = null;
- try {
- responseHeader =
- (QueryMessageResponseHeader) response
- .decodeCommandCustomHeader(QueryMessageResponseHeader.class);
- } catch (RemotingCommandException e) {
- log.error("decodeCommandCustomHeader exception", e);
- return;
- }
-
- List<MessageExt> wrappers =
- MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
-
- QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
- queryResultList.add(qr);
- break;
+ new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ try {
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (response != null) {
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ QueryMessageResponseHeader responseHeader = null;
+ try {
+ responseHeader =
+ (QueryMessageResponseHeader)response
+ .decodeCommandCustomHeader(QueryMessageResponseHeader.class);
+ } catch (RemotingCommandException e) {
+ log.error("decodeCommandCustomHeader exception", e);
+ return;
}
- default:
- log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark());
- break;
+
+ List<MessageExt> wrappers =
+ MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
+
+ QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
+ queryResultList.add(qr);
+ break;
}
- } else {
- log.warn("getResponseCommand return null");
+ default:
+ log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark());
+ break;
}
- } finally {
- countDownLatch.countDown();
+ } else {
+ log.warn("getResponseCommand return null");
}
+ } finally {
+ countDownLatch.countDown();
}
- }, isUniqKey);
+ }
+ }, isUniqKey);
} catch (Exception e) {
log.warn("queryMessage exception", e);
}