You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:18 UTC

[27/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
new file mode 100644
index 0000000..b6255c4
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl;
+
+import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQAdminImpl {
+
+    private final Logger log = ClientLogger.getLog();
+    private final MQClientInstance mQClientFactory;
+    private long timeoutMillis = 6000;
+
+
+    public MQAdminImpl(MQClientInstance mQClientFactory) {
+        this.mQClientFactory = mQClientFactory;
+    }
+
+
+    public long getTimeoutMillis() {
+        return timeoutMillis;
+    }
+
+
+    public void setTimeoutMillis(long timeoutMillis) {
+        this.timeoutMillis = timeoutMillis;
+    }
+
+
+    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+        createTopic(key, newTopic, queueNum, 0);
+    }
+
+
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+        try {
+            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);
+            List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
+            if (brokerDataList != null && !brokerDataList.isEmpty()) {
+                Collections.sort(brokerDataList);
+
+                boolean createOKAtLeastOnce = false;
+                MQClientException exception = null;
+
+                StringBuilder orderTopicString = new StringBuilder();
+
+                for (BrokerData brokerData : brokerDataList) {
+                    String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+                    if (addr != null) {
+                        TopicConfig topicConfig = new TopicConfig(newTopic);
+                        topicConfig.setReadQueueNums(queueNum);
+                        topicConfig.setWriteQueueNums(queueNum);
+                        topicConfig.setTopicSysFlag(topicSysFlag);
+
+                        boolean createOK = false;
+                        for (int i = 0; i < 5; i++) {
+                            try {
+                                this.mQClientFactory.getMQClientAPIImpl().createTopic(addr, key, topicConfig, timeoutMillis);
+                                createOK = true;
+                                createOKAtLeastOnce = true;
+                                break;
+                            } catch (Exception e) {
+                                if (4 == i) {
+                                    exception = new MQClientException("create topic to broker exception", e);
+                                }
+                            }
+                        }
+
+                        if (createOK) {
+                            orderTopicString.append(brokerData.getBrokerName());
+                            orderTopicString.append(":");
+                            orderTopicString.append(queueNum);
+                            orderTopicString.append(";");
+                        }
+                    }
+                }
+
+                if (exception != null && !createOKAtLeastOnce) {
+                    throw exception;
+                }
+            } else {
+                throw new MQClientException("Not found broker, maybe key is wrong", null);
+            }
+        } catch (Exception e) {
+            throw new MQClientException("create new topic failed", e);
+        }
+    }
+
+
+    public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
+        try {
+            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
+            if (topicRouteData != null) {
+                TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
+                if (topicPublishInfo != null && topicPublishInfo.ok()) {
+                    return topicPublishInfo.getMessageQueueList();
+                }
+            }
+        } catch (Exception e) {
+            throw new MQClientException("Can not find Message Queue for this topic, " + topic, e);
+        }
+
+        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
+    }
+
+
+    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
+        try {
+            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
+            if (topicRouteData != null) {
+                Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
+                if (!mqList.isEmpty()) {
+                    return mqList;
+                } else {
+                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
+                }
+            }
+        } catch (Exception e) {
+            throw new MQClientException(
+                    "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), //
+                    e);
+        }
+
+        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
+    }
+
+
+    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+        if (null == brokerAddr) {
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+        }
+
+        if (brokerAddr != null) {
+            try {
+                return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
+                        timeoutMillis);
+            } catch (Exception e) {
+                throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
+            }
+        }
+
+        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+    }
+
+
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+        if (null == brokerAddr) {
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+        }
+
+        if (brokerAddr != null) {
+            try {
+                return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
+            } catch (Exception e) {
+                throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
+            }
+        }
+
+        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+    }
+
+
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+        if (null == brokerAddr) {
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+        }
+
+        if (brokerAddr != null) {
+            try {
+                return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
+            } catch (Exception e) {
+                throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
+            }
+        }
+
+        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+    }
+
+
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+        if (null == brokerAddr) {
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
+        }
+
+        if (brokerAddr != null) {
+            try {
+                return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq.getTopic(), mq.getQueueId(),
+                        timeoutMillis);
+            } catch (Exception e) {
+                throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
+            }
+        }
+
+        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+    }
+
+    public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+
+        MessageId messageId = null;
+        try {
+            messageId = MessageDecoder.decodeMessageId(msgId);
+        } catch (Exception e) {
+            throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
+        }
+        return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
+                messageId.getOffset(), timeoutMillis);
+    }
+
+    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException,
+            InterruptedException {
+        return queryMessage(topic, key, maxNum, begin, end, false);
+    }
+
+    public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws InterruptedException, MQClientException {
+
+        QueryResult qr = this.queryMessage(topic, uniqKey, 32,
+                MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);
+        if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
+            return qr.getMessageList().get(0);
+        } else {
+            return null;
+        }
+    }
+
+    protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end, boolean isUniqKey) throws MQClientException,
+            InterruptedException {
+        TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
+        if (null == topicRouteData) {
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
+            topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
+        }
+
+        if (topicRouteData != null) {
+            List<String> brokerAddrs = new LinkedList<String>();
+            for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
+                String addr = brokerData.selectBrokerAddr();
+                if (addr != null) {
+                    brokerAddrs.add(addr);
+                }
+            }
+
+            if (!brokerAddrs.isEmpty()) {
+                final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size());
+                final List<QueryResult> queryResultList = new LinkedList<QueryResult>();
+
+                for (String addr : brokerAddrs) {
+                    try {
+                        QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader();
+                        requestHeader.setTopic(topic);
+                        requestHeader.setKey(key);
+                        requestHeader.setMaxNum(maxNum);
+                        requestHeader.setBeginTimestamp(begin);
+                        requestHeader.setEndTimestamp(end);
+
+                        this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader, timeoutMillis * 3,
+                                new InvokeCallback() {
+                                    @Override
+                                    public void operationComplete(ResponseFuture responseFuture) {
+                                        try {
+                                            RemotingCommand response = responseFuture.getResponseCommand();
+                                            if (response != null) {
+                                                switch (response.getCode()) {
+                                                    case ResponseCode.SUCCESS: {
+                                                        QueryMessageResponseHeader responseHeader = null;
+                                                        try {
+                                                            responseHeader =
+                                                                    (QueryMessageResponseHeader) response
+                                                                            .decodeCommandCustomHeader(QueryMessageResponseHeader.class);
+                                                        } catch (RemotingCommandException e) {
+                                                            log.error("decodeCommandCustomHeader exception", e);
+                                                            return;
+                                                        }
+
+                                                        List<MessageExt> wrappers =
+                                                                MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
+
+                                                        QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
+                                                        queryResultList.add(qr);
+                                                        break;
+                                                    }
+                                                    default:
+                                                        log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark());
+                                                        break;
+                                                }
+                                            } else {
+                                                log.warn("getResponseCommand return null");
+                                            }
+                                        } finally {
+                                            countDownLatch.countDown();
+                                        }
+                                    }
+                                }, isUniqKey);
+                    } catch (Exception e) {
+                        log.warn("queryMessage exception", e);
+                    }
+
+                }
+
+                boolean ok = countDownLatch.await(timeoutMillis * 4, TimeUnit.MILLISECONDS);
+                if (!ok) {
+                    log.warn("queryMessage, maybe some broker failed");
+                }
+
+                long indexLastUpdateTimestamp = 0;
+                List<MessageExt> messageList = new LinkedList<MessageExt>();
+                for (QueryResult qr : queryResultList) {
+                    if (qr.getIndexLastUpdateTimestamp() > indexLastUpdateTimestamp) {
+                        indexLastUpdateTimestamp = qr.getIndexLastUpdateTimestamp();
+                    }
+
+                    for (MessageExt msgExt : qr.getMessageList()) {
+                        if (isUniqKey) {
+                            if (msgExt.getMsgId().equals(key)) {
+
+                                if (messageList.size() > 0) {
+
+                                    if (messageList.get(0).getStoreTimestamp() > msgExt.getStoreTimestamp()) {
+
+                                        messageList.clear();
+                                        messageList.add(msgExt);
+                                    }
+
+                                } else {
+
+                                    messageList.add(msgExt);
+                                }
+                            } else {
+                                log.warn("queryMessage by uniqKey, find message key not matched, maybe hash duplicate {}", msgExt.toString());
+                            }
+                        } else {
+                            String keys = msgExt.getKeys();
+                            if (keys != null) {
+                                boolean matched = false;
+                                String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR);
+                                if (keyArray != null) {
+                                    for (String k : keyArray) {
+                                        if (key.equals(k)) {
+                                            matched = true;
+                                            break;
+                                        }
+                                    }
+                                }
+
+                                if (matched) {
+                                    messageList.add(msgExt);
+                                } else {
+                                    log.warn("queryMessage, find message key not matched, maybe hash duplicate {}", msgExt.toString());
+                                }
+                            }
+                        }
+                    }
+                }
+
+                if (!messageList.isEmpty()) {
+                    return new QueryResult(indexLastUpdateTimestamp, messageList);
+                } else {
+                    throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by key finished, but no message.");
+                }
+            }
+        }
+
+        throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info");
+    }
+}