You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 08:13:49 UTC

[24/34] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 3e6673c..5fdcab2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -16,6 +16,13 @@
  */
 package org.apache.rocketmq.client.impl;
 
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -25,7 +32,12 @@ import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageId;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
@@ -39,42 +51,28 @@ import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.slf4j.Logger;
 
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-
 public class MQAdminImpl {
 
     private final Logger log = ClientLogger.getLog();
     private final MQClientInstance mQClientFactory;
     private long timeoutMillis = 6000;
 
-
     public MQAdminImpl(MQClientInstance mQClientFactory) {
         this.mQClientFactory = mQClientFactory;
     }
 
-
     public long getTimeoutMillis() {
         return timeoutMillis;
     }
 
-
     public void setTimeoutMillis(long timeoutMillis) {
         this.timeoutMillis = timeoutMillis;
     }
 
-
     public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
         createTopic(key, newTopic, queueNum, 0);
     }
 
-
     public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
         try {
             TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);
@@ -129,7 +127,6 @@ public class MQAdminImpl {
         }
     }
 
-
     public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
         try {
             TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
@@ -146,7 +143,6 @@ public class MQAdminImpl {
         throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
     }
 
-
     public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
         try {
             TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
@@ -160,14 +156,13 @@ public class MQAdminImpl {
             }
         } catch (Exception e) {
             throw new MQClientException(
-                    "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), //
-                    e);
+                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), //
+                e);
         }
 
         throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
     }
 
-
     public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
         String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         if (null == brokerAddr) {
@@ -178,7 +173,7 @@ public class MQAdminImpl {
         if (brokerAddr != null) {
             try {
                 return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
-                        timeoutMillis);
+                    timeoutMillis);
             } catch (Exception e) {
                 throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
             }
@@ -187,7 +182,6 @@ public class MQAdminImpl {
         throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
     }
 
-
     public long maxOffset(MessageQueue mq) throws MQClientException {
         String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         if (null == brokerAddr) {
@@ -206,7 +200,6 @@ public class MQAdminImpl {
         throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
     }
 
-
     public long minOffset(MessageQueue mq) throws MQClientException {
         String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         if (null == brokerAddr) {
@@ -225,7 +218,6 @@ public class MQAdminImpl {
         throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
     }
 
-
     public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
         String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         if (null == brokerAddr) {
@@ -236,7 +228,7 @@ public class MQAdminImpl {
         if (brokerAddr != null) {
             try {
                 return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq.getTopic(), mq.getQueueId(),
-                        timeoutMillis);
+                    timeoutMillis);
             } catch (Exception e) {
                 throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
             }
@@ -254,18 +246,18 @@ public class MQAdminImpl {
             throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
         }
         return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
-                messageId.getOffset(), timeoutMillis);
+            messageId.getOffset(), timeoutMillis);
     }
 
     public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException,
-            InterruptedException {
+        InterruptedException {
         return queryMessage(topic, key, maxNum, begin, end, false);
     }
 
     public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws InterruptedException, MQClientException {
 
         QueryResult qr = this.queryMessage(topic, uniqKey, 32,
-                MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);
+            MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);
         if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
             return qr.getMessageList().get(0);
         } else {
@@ -274,7 +266,7 @@ public class MQAdminImpl {
     }
 
     protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end, boolean isUniqKey) throws MQClientException,
-            InterruptedException {
+        InterruptedException {
         TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
         if (null == topicRouteData) {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
@@ -304,43 +296,43 @@ public class MQAdminImpl {
                         requestHeader.setEndTimestamp(end);
 
                         this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader, timeoutMillis * 3,
-                                new InvokeCallback() {
-                                    @Override
-                                    public void operationComplete(ResponseFuture responseFuture) {
-                                        try {
-                                            RemotingCommand response = responseFuture.getResponseCommand();
-                                            if (response != null) {
-                                                switch (response.getCode()) {
-                                                    case ResponseCode.SUCCESS: {
-                                                        QueryMessageResponseHeader responseHeader = null;
-                                                        try {
-                                                            responseHeader =
-                                                                    (QueryMessageResponseHeader) response
-                                                                            .decodeCommandCustomHeader(QueryMessageResponseHeader.class);
-                                                        } catch (RemotingCommandException e) {
-                                                            log.error("decodeCommandCustomHeader exception", e);
-                                                            return;
-                                                        }
-
-                                                        List<MessageExt> wrappers =
-                                                                MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
-
-                                                        QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
-                                                        queryResultList.add(qr);
-                                                        break;
+                            new InvokeCallback() {
+                                @Override
+                                public void operationComplete(ResponseFuture responseFuture) {
+                                    try {
+                                        RemotingCommand response = responseFuture.getResponseCommand();
+                                        if (response != null) {
+                                            switch (response.getCode()) {
+                                                case ResponseCode.SUCCESS: {
+                                                    QueryMessageResponseHeader responseHeader = null;
+                                                    try {
+                                                        responseHeader =
+                                                            (QueryMessageResponseHeader)response
+                                                                .decodeCommandCustomHeader(QueryMessageResponseHeader.class);
+                                                    } catch (RemotingCommandException e) {
+                                                        log.error("decodeCommandCustomHeader exception", e);
+                                                        return;
                                                     }
-                                                    default:
-                                                        log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark());
-                                                        break;
+
+                                                    List<MessageExt> wrappers =
+                                                        MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
+
+                                                    QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
+                                                    queryResultList.add(qr);
+                                                    break;
                                                 }
-                                            } else {
-                                                log.warn("getResponseCommand return null");
+                                                default:
+                                                    log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark());
+                                                    break;
                                             }
-                                        } finally {
-                                            countDownLatch.countDown();
+                                        } else {
+                                            log.warn("getResponseCommand return null");
                                         }
+                                    } finally {
+                                        countDownLatch.countDown();
                                     }
-                                }, isUniqKey);
+                                }
+                            }, isUniqKey);
                     } catch (Exception e) {
                         log.warn("queryMessage exception", e);
                     }