You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:22 UTC
[26/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
new file mode 100644
index 0000000..99204b0
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -0,0 +1,1996 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.namesrv.TopAddressing;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.header.*;
+import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.*;
+import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.*;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQClientAPIImpl {
+
+ private final static Logger log = ClientLogger.getLog();
+ public static boolean sendSmartMsg =
+ Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
+
+ static {
+ System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+ }
+
+ private final RemotingClient remotingClient;
+ private final TopAddressing topAddressing;
+ private final ClientRemotingProcessor clientRemotingProcessor;
+ private String nameSrvAddr = null;
+ private ClientConfig clientConfig;
+
+ public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor,
+ RPCHook rpcHook, final ClientConfig clientConfig) {
+ this.clientConfig = clientConfig;
+ topAddressing = new TopAddressing(MixAll.WS_ADDR, clientConfig.getUnitName());
+ this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
+ this.clientRemotingProcessor = clientRemotingProcessor;
+
+ this.remotingClient.registerRPCHook(rpcHook);
+ this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
+
+ this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
+
+ this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
+
+ this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
+
+ this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
+
+ this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
+ }
+
+ public List<String> getNameServerAddressList() {
+ return this.remotingClient.getNameServerAddressList();
+ }
+
+ public RemotingClient getRemotingClient() {
+ return remotingClient;
+ }
+
+ public String fetchNameServerAddr() {
+ try {
+ String addrs = this.topAddressing.fetchNSAddr();
+ if (addrs != null) {
+ if (!addrs.equals(this.nameSrvAddr)) {
+ log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);
+ this.updateNameServerAddressList(addrs);
+ this.nameSrvAddr = addrs;
+ return nameSrvAddr;
+ }
+ }
+ } catch (Exception e) {
+ log.error("fetchNameServerAddr Exception", e);
+ }
+ return nameSrvAddr;
+ }
+
+ public void updateNameServerAddressList(final String addrs) {
+ List<String> lst = new ArrayList<String>();
+ String[] addrArray = addrs.split(";");
+ if (addrArray != null) {
+ for (String addr : addrArray) {
+ lst.add(addr);
+ }
+
+ this.remotingClient.updateNameServerAddressList(lst);
+ }
+ }
+
+ public void start() {
+ this.remotingClient.start();
+ }
+
+ public void shutdown() {
+ this.remotingClient.shutdown();
+ }
+
+ public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
+
+ byte[] body = RemotingSerializable.encode(config);
+ request.setBody(body);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+
+ }
+
+ public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
+ requestHeader.setTopic(topicConfig.getTopicName());
+ requestHeader.setDefaultTopic(defaultTopic);
+ requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
+ requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
+ requestHeader.setPerm(topicConfig.getPerm());
+ requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
+ requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
+ requestHeader.setOrder(topicConfig.isOrder());
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+ public SendResult sendMessage(//
+ final String addr, // 1
+ final String brokerName, // 2
+ final Message msg, // 3
+ final SendMessageRequestHeader requestHeader, // 4
+ final long timeoutMillis, // 5
+ final CommunicationMode communicationMode, // 6
+ final SendMessageContext context, // 7
+ final DefaultMQProducerImpl producer // 8
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
+ }
+
+ public SendResult sendMessage(//
+ final String addr, // 1
+ final String brokerName, // 2
+ final Message msg, // 3
+ final SendMessageRequestHeader requestHeader, // 4
+ final long timeoutMillis, // 5
+ final CommunicationMode communicationMode, // 6
+ final SendCallback sendCallback, // 7
+ final TopicPublishInfo topicPublishInfo, // 8
+ final MQClientInstance instance, // 9
+ final int retryTimesWhenSendFailed, // 10
+ final SendMessageContext context, // 11
+ final DefaultMQProducerImpl producer // 12
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand request = null;
+ if (sendSmartMsg) {
+ SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
+ request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
+ } else {
+ request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
+ }
+
+ request.setBody(msg.getBody());
+
+ switch (communicationMode) {
+ case ONEWAY:
+ this.remotingClient.invokeOneway(addr, request, timeoutMillis);
+ return null;
+ case ASYNC:
+ final AtomicInteger times = new AtomicInteger();
+ this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, context, producer);
+ return null;
+ case SYNC:
+ return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
+ default:
+ assert false;
+ break;
+ }
+
+ return null;
+ }
+
+ private SendResult sendMessageSync(//
+ final String addr, //
+ final String brokerName, //
+ final Message msg, //
+ final long timeoutMillis, //
+ final RemotingCommand request//
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+ assert response != null;
+ return this.processSendResponse(brokerName, msg, response);
+ }
+
+ private void sendMessageAsync(//
+ final String addr, //
+ final String brokerName, //
+ final Message msg, //
+ final long timeoutMillis, //
+ final RemotingCommand request, //
+ final SendCallback sendCallback, //
+ final TopicPublishInfo topicPublishInfo, //
+ final MQClientInstance instance, //
+ final int retryTimesWhenSendFailed, //
+ final AtomicInteger times, //
+ final SendMessageContext context, //
+ final DefaultMQProducerImpl producer //
+ ) throws InterruptedException, RemotingException {
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (null == sendCallback && response != null) {
+
+ try {
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
+ if (context != null && sendResult != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
+ } catch (Throwable e) {
+ //
+ }
+
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
+ return;
+ }
+
+ if (response != null) {
+ try {
+ SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
+ assert sendResult != null;
+ if (context != null) {
+ context.setSendResult(sendResult);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
+
+ try {
+ sendCallback.onSuccess(sendResult);
+ } catch (Throwable e) {
+ }
+
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
+ } catch (Exception e) {
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, e, context, false, producer);
+ }
+ } else {
+ producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
+ if (!responseFuture.isSendRequestOK()) {
+ MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else if (responseFuture.isTimeout()) {
+ MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
+ responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ } else {
+ MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
+ onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
+ }
+ }
+ }
+ });
+ }
+
+
+ private void onExceptionImpl(final String brokerName, //
+ final Message msg, //
+ final long timeoutMillis, //
+ final RemotingCommand request, //
+ final SendCallback sendCallback, //
+ final TopicPublishInfo topicPublishInfo, //
+ final MQClientInstance instance, //
+ final int timesTotal, //
+ final AtomicInteger curTimes, //
+ final Exception e, //
+ final SendMessageContext context, //
+ final boolean needRetry, //
+ final DefaultMQProducerImpl producer // 12
+ ) {
+ int tmp = curTimes.incrementAndGet();
+ if (needRetry && tmp <= timesTotal) {
+ MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
+ String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName());
+ log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
+ tmpmq.getBrokerName());
+ try {
+ request.setOpaque(RemotingCommand.createNewRequestId());
+ sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
+ timesTotal, curTimes, context, producer);
+ } catch (InterruptedException e1) {
+ onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+ context, false, producer);
+ } catch (RemotingConnectException e1) {
+ producer.updateFaultItem(brokerName, 3000, true);
+ onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+ context, true, producer);
+ } catch (RemotingTooMuchRequestException e1) {
+ onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+ context, false, producer);
+ } catch (RemotingException e1) {
+ producer.updateFaultItem(brokerName, 3000, true);
+ onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
+ context, true, producer);
+ }
+ } else {
+ if (context != null) {
+ context.setException(e);
+ context.getProducer().executeSendMessageHookAfter(context);
+ }
+ try {
+ sendCallback.onException(e);
+ } catch (Exception e2) {
+ }
+ }
+ }
+
+
+ private SendResult processSendResponse(//
+ final String brokerName, //
+ final Message msg, //
+ final RemotingCommand response//
+ ) throws MQBrokerException, RemotingCommandException {
+ switch (response.getCode()) {
+ case ResponseCode.FLUSH_DISK_TIMEOUT:
+ case ResponseCode.FLUSH_SLAVE_TIMEOUT:
+ case ResponseCode.SLAVE_NOT_AVAILABLE: {
+ // TODO LOG
+ }
+ case ResponseCode.SUCCESS: {
+ SendStatus sendStatus = SendStatus.SEND_OK;
+ switch (response.getCode()) {
+ case ResponseCode.FLUSH_DISK_TIMEOUT:
+ sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
+ break;
+ case ResponseCode.FLUSH_SLAVE_TIMEOUT:
+ sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
+ break;
+ case ResponseCode.SLAVE_NOT_AVAILABLE:
+ sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
+ break;
+ case ResponseCode.SUCCESS:
+ sendStatus = SendStatus.SEND_OK;
+ break;
+ default:
+ assert false;
+ break;
+ }
+
+ SendMessageResponseHeader responseHeader =
+ (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+
+ MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());
+
+ SendResult sendResult = new SendResult(sendStatus,
+ MessageClientIDSetter.getUniqID(msg),
+ responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
+ sendResult.setTransactionId(responseHeader.getTransactionId());
+ String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
+ String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
+ if (regionId == null || regionId.isEmpty()) {
+ regionId = MixAll.DEFAULT_TRACE_REGION_ID;
+ }
+ if (traceOn != null && traceOn.equals("false")) {
+ sendResult.setTraceOn(false);
+ } else {
+ sendResult.setTraceOn(true);
+ }
+ sendResult.setRegionId(regionId);
+ return sendResult;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public PullResult pullMessage(//
+ final String addr, //
+ final PullMessageRequestHeader requestHeader, //
+ final long timeoutMillis, //
+ final CommunicationMode communicationMode, //
+ final PullCallback pullCallback//
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
+
+ switch (communicationMode) {
+ case ONEWAY:
+ assert false;
+ return null;
+ case ASYNC:
+ this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
+ return null;
+ case SYNC:
+ return this.pullMessageSync(addr, request, timeoutMillis);
+ default:
+ assert false;
+ break;
+ }
+
+ return null;
+ }
+
+
+ private void pullMessageAsync(//
+ final String addr, // 1
+ final RemotingCommand request, //
+ final long timeoutMillis, //
+ final PullCallback pullCallback//
+ ) throws RemotingException, InterruptedException {
+ this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (response != null) {
+ try {
+ PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
+ assert pullResult != null;
+ pullCallback.onSuccess(pullResult);
+ } catch (Exception e) {
+ pullCallback.onException(e);
+ }
+ } else {
+ if (!responseFuture.isSendRequestOK()) {
+ pullCallback.onException(new MQClientException("send request failed", responseFuture.getCause()));
+ } else if (responseFuture.isTimeout()) {
+ pullCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
+ responseFuture.getCause()));
+ } else {
+ pullCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause()));
+ }
+ }
+ }
+ });
+ }
+
+ private PullResult pullMessageSync(//
+ final String addr, // 1
+ final RemotingCommand request, // 2
+ final long timeoutMillis// 3
+ ) throws RemotingException, InterruptedException, MQBrokerException {
+ RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+ assert response != null;
+ return this.processPullResponse(response);
+ }
+
+ private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
+ PullStatus pullStatus = PullStatus.NO_NEW_MSG;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS:
+ pullStatus = PullStatus.FOUND;
+ break;
+ case ResponseCode.PULL_NOT_FOUND:
+ pullStatus = PullStatus.NO_NEW_MSG;
+ break;
+ case ResponseCode.PULL_RETRY_IMMEDIATELY:
+ pullStatus = PullStatus.NO_MATCHED_MSG;
+ break;
+ case ResponseCode.PULL_OFFSET_MOVED:
+ pullStatus = PullStatus.OFFSET_ILLEGAL;
+ break;
+
+ default:
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ PullMessageResponseHeader responseHeader =
+ (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+
+ return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
+ responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
+ }
+
+ public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException {
+ ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
+ requestHeader.setOffset(phyoffset);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
+ MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true);
+ return messageExt;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException {
+ SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ requestHeader.setTimestamp(timestamp);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ SearchOffsetResponseHeader responseHeader =
+ (SearchOffsetResponseHeader) response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
+ return responseHeader.getOffset();
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException {
+ GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ GetMaxOffsetResponseHeader responseHeader =
+ (GetMaxOffsetResponseHeader) response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
+
+ return responseHeader.getOffset();
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public List<String> getConsumerIdListByGroup(//
+ final String addr, //
+ final String consumerGroup, //
+ final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ MQBrokerException, InterruptedException {
+ GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ if (response.getBody() != null) {
+ GetConsumerListByGroupResponseBody body =
+ GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
+ return body.getConsumerIdList();
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException {
+ GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ GetMinOffsetResponseHeader responseHeader =
+ (GetMinOffsetResponseHeader) response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
+
+ return responseHeader.getOffset();
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException {
+ GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ GetEarliestMsgStoretimeResponseHeader responseHeader =
+ (GetEarliestMsgStoretimeResponseHeader) response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
+
+ return responseHeader.getTimestamp();
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public long queryConsumerOffset(//
+ final String addr, //
+ final QueryConsumerOffsetRequestHeader requestHeader, //
+ final long timeoutMillis//
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ QueryConsumerOffsetResponseHeader responseHeader =
+ (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+
+ return responseHeader.getOffset();
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public void updateConsumerOffset(//
+ final String addr, //
+ final UpdateConsumerOffsetRequestHeader requestHeader, //
+ final long timeoutMillis//
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public void updateConsumerOffsetOneway(//
+ final String addr, //
+ final UpdateConsumerOffsetRequestHeader requestHeader, //
+ final long timeoutMillis//
+ ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,
+ InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
+
+ this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
+ }
+
+
+ public void sendHearbeat(//
+ final String addr, //
+ final HeartbeatData heartbeatData, //
+ final long timeoutMillis//
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
+
+ request.setBody(heartbeatData.encode());
+ RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public void unregisterClient(//
+ final String addr, //
+ final String clientID, //
+ final String producerGroup, //
+ final String consumerGroup, //
+ final long timeoutMillis//
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
+ requestHeader.setClientID(clientID);
+ requestHeader.setProducerGroup(producerGroup);
+ requestHeader.setConsumerGroup(consumerGroup);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public void endTransactionOneway(//
+ final String addr, //
+ final EndTransactionRequestHeader requestHeader, //
+ final String remark, //
+ final long timeoutMillis//
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
+
+ request.setRemark(remark);
+ this.remotingClient.invokeOneway(addr, request, timeoutMillis);
+ }
+
+
+ public void queryMessage(
+ final String addr,
+ final QueryMessageRequestHeader requestHeader,
+ final long timeoutMillis,
+ final InvokeCallback invokeCallback,
+ final Boolean isUnqiueKey
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader);
+ request.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, isUnqiueKey.toString());
+ this.remotingClient.invokeAsync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis,
+ invokeCallback);
+ }
+
+
+ public boolean registerClient(final String addr, final HeartbeatData heartbeat, final long timeoutMillis)
+ throws RemotingException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
+
+ request.setBody(heartbeat.encode());
+ RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+ return response.getCode() == ResponseCode.SUCCESS;
+ }
+
+
+ public void consumerSendMessageBack(
+ final String addr,
+ final MessageExt msg,
+ final String consumerGroup,
+ final int delayLevel,
+ final long timeoutMillis,
+ final int maxConsumeRetryTimes
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
+
+ requestHeader.setGroup(consumerGroup);
+ requestHeader.setOriginTopic(msg.getTopic());
+ requestHeader.setOffset(msg.getCommitLogOffset());
+ requestHeader.setDelayLevel(delayLevel);
+ requestHeader.setOriginMsgId(msg.getMsgId());
+ requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public Set<MessageQueue> lockBatchMQ(//
+ final String addr, //
+ final LockBatchRequestBody requestBody, //
+ final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
+
+ request.setBody(requestBody.encode());
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
+ Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();
+ return messageQueues;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public void unlockBatchMQ(//
+ final String addr, //
+ final UnlockBatchRequestBody requestBody, //
+ final long timeoutMillis, //
+ final boolean oneway//
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
+
+ request.setBody(requestBody.encode());
+
+ if (oneway) {
+ this.remotingClient.invokeOneway(addr, request, timeoutMillis);
+ } else {
+ RemotingCommand response = this.remotingClient
+ .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+ }
+
+
+ public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, final long timeoutMillis) throws InterruptedException,
+ RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ GetTopicStatsInfoRequestHeader requestHeader = new GetTopicStatsInfoRequestHeader();
+ requestHeader.setTopic(topic);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ TopicStatsTable topicStatsTable = TopicStatsTable.decode(response.getBody(), TopicStatsTable.class);
+ return topicStatsTable;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis)
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
+ MQBrokerException {
+ return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
+ }
+
+
+ public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis)
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
+ MQBrokerException {
+ GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setTopic(topic);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class);
+ return consumeStats;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, final long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+ MQBrokerException {
+ GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader();
+ requestHeader.setProducerGroup(producerGroup);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_PRODUCER_CONNECTION_LIST, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return ProducerConnection.decode(response.getBody(), ProducerConnection.class);
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, final long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+ MQBrokerException {
+ GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ ConsumerConnection consumerConnection = ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
+ return consumerConnection;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public KVTable getBrokerRuntimeInfo(final String addr, final long timeoutMillis) throws RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return KVTable.decode(response.getBody(), KVTable.class);
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public void updateBrokerConfig(final String addr, final Properties properties, final long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+ MQBrokerException, UnsupportedEncodingException {
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_BROKER_CONFIG, null);
+
+ String str = MixAll.properties2String(properties);
+ if (str != null && str.length() > 0) {
+ request.setBody(str.getBytes(MixAll.DEFAULT_CHARSET));
+ RemotingCommand response = this.remotingClient
+ .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+ }
+
+
+ public Properties getBrokerConfig(final String addr, final long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+ MQBrokerException, UnsupportedEncodingException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONFIG, null);
+
+ RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return MixAll.string2Properties(new String(response.getBody(), MixAll.DEFAULT_CHARSET));
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+ public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
+
+ RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ ClusterInfo responseBody = ClusterInfo.decode(response.getBody(), ClusterInfo.class);
+ return responseBody;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+ GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
+ requestHeader.setTopic(topic);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.TOPIC_NOT_EXIST: {
+ // TODO LOG
+ break;
+ }
+ case ResponseCode.SUCCESS: {
+ byte[] body = response.getBody();
+ if (body != null) {
+ return TopicRouteData.decode(body, TopicRouteData.class);
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+ GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
+ requestHeader.setTopic(topic);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.TOPIC_NOT_EXIST: {
+ if (!topic.equals(MixAll.DEFAULT_TOPIC))
+ log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
+ break;
+ }
+ case ResponseCode.SUCCESS: {
+ byte[] body = response.getBody();
+ if (body != null) {
+ return TopicRouteData.decode(body, TopicRouteData.class);
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public TopicList getTopicListFromNameServer(final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER, null);
+
+ RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ byte[] body = response.getBody();
+ if (body != null) {
+ TopicList topicList = TopicList.decode(body, TopicList.class);
+ return topicList;
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, final long timeoutMillis) throws RemotingCommandException,
+ RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
+ WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader();
+ requestHeader.setBrokerName(brokerName);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.WIPE_WRITE_PERM_OF_BROKER, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ WipeWritePermOfBrokerResponseHeader responseHeader =
+ (WipeWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
+ return responseHeader.getWipeTopicCount();
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
+ requestHeader.setTopic(topic);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public void deleteTopicInNameServer(final String addr, final String topic, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
+ requestHeader.setTopic(topic);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public void deleteSubscriptionGroup(final String addr, final String groupName, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader();
+ requestHeader.setGroupName(groupName);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public String getKVConfigValue(final String namespace, final String key, final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+ GetKVConfigRequestHeader requestHeader = new GetKVConfigRequestHeader();
+ requestHeader.setNamespace(namespace);
+ requestHeader.setKey(key);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ GetKVConfigResponseHeader responseHeader =
+ (GetKVConfigResponseHeader) response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class);
+ return responseHeader.getValue();
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public void putKVConfigValue(final String namespace, final String key, final String value, final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+ PutKVConfigRequestHeader requestHeader = new PutKVConfigRequestHeader();
+ requestHeader.setNamespace(namespace);
+ requestHeader.setKey(key);
+ requestHeader.setValue(value);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUT_KV_CONFIG, requestHeader);
+
+ List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
+ if (nameServerAddressList != null) {
+ RemotingCommand errResponse = null;
+ for (String namesrvAddr : nameServerAddressList) {
+ RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ break;
+ }
+ default:
+ errResponse = response;
+ }
+ }
+
+ if (errResponse != null) {
+ throw new MQClientException(errResponse.getCode(), errResponse.getRemark());
+ }
+ }
+ }
+
+
+ public void deleteKVConfigValue(final String namespace, final String key, final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+ DeleteKVConfigRequestHeader requestHeader = new DeleteKVConfigRequestHeader();
+ requestHeader.setNamespace(namespace);
+ requestHeader.setKey(key);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_KV_CONFIG, requestHeader);
+
+ List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
+ if (nameServerAddressList != null) {
+ RemotingCommand errResponse = null;
+ for (String namesrvAddr : nameServerAddressList) {
+ RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ break;
+ }
+ default:
+ errResponse = response;
+ }
+ }
+ if (errResponse != null) {
+ throw new MQClientException(errResponse.getCode(), errResponse.getRemark());
+ }
+ }
+ }
+
+
+ public KVTable getKVListByNamespace(final String namespace, final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+ GetKVListByNamespaceRequestHeader requestHeader = new GetKVListByNamespaceRequestHeader();
+ requestHeader.setNamespace(namespace);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KVLIST_BY_NAMESPACE, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return KVTable.decode(response.getBody(), KVTable.class);
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
+ final long timestamp, final boolean isForce, final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+ return invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, false);
+ }
+
+
+ public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
+ final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
+ throws RemotingException, MQClientException, InterruptedException {
+ ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setGroup(group);
+ requestHeader.setTimestamp(timestamp);
+ requestHeader.setForce(isForce);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
+ if (isC) {
+ request.setLanguage(LanguageCode.CPP);
+ }
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ if (response.getBody() != null) {
+ ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
+ return body.getOffsetTable();
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group,
+ final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setGroup(group);
+ requestHeader.setClientAddr(clientAddr);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ if (response.getBody() != null) {
+ GetConsumerStatusBody body = GetConsumerStatusBody.decode(response.getBody(), GetConsumerStatusBody.class);
+ return body.getConsumerTable();
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public GroupList queryTopicConsumeByWho(final String addr, final String topic, final long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+ MQBrokerException {
+ QueryTopicConsumeByWhoRequestHeader requestHeader = new QueryTopicConsumeByWhoRequestHeader();
+ requestHeader.setTopic(topic);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ GroupList groupList = GroupList.decode(response.getBody(), GroupList.class);
+ return groupList;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group, final long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+ MQBrokerException {
+ QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setGroup(group);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
+ return consumeTimeSpanBody.getConsumeTimeSpanSet();
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public TopicList getTopicsByCluster(final String cluster, final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+ GetTopicsByClusterRequestHeader requestHeader = new GetTopicsByClusterRequestHeader();
+ requestHeader.setCluster(cluster);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPICS_BY_CLUSTER, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ byte[] body = response.getBody();
+ if (body != null) {
+ TopicList topicList = TopicList.decode(body, TopicList.class);
+ return topicList;
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public void registerMessageFilterClass(final String addr, //
+ final String consumerGroup, //
+ final String topic, //
+ final String className, //
+ final int classCRC, //
+ final byte[] classBody, //
+ final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ InterruptedException, MQBrokerException {
+ RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setClassName(className);
+ requestHeader.setTopic(topic);
+ requestHeader.setClassCRC(classCRC);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_MESSAGE_FILTER_CLASS, requestHeader);
+ request.setBody(classBody);
+ RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
+
+ public TopicList getSystemTopicList(final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null);
+
+ RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ byte[] body = response.getBody();
+ if (body != null) {
+ TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
+ if (topicList.getTopicList() != null && !topicList.getTopicList().isEmpty()
+ && !UtilAll.isBlank(topicList.getBrokerAddr())) {
+ TopicList tmp = getSystemTopicListFromBroker(topicList.getBrokerAddr(), timeoutMillis);
+ if (tmp.getTopicList() != null && !tmp.getTopicList().isEmpty()) {
+ topicList.getTopicList().addAll(tmp.getTopicList());
+ }
+ }
+ return topicList;
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public TopicList getSystemTopicListFromBroker(final String addr, final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER, null);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ byte[] body = response.getBody();
+ if (body != null) {
+ TopicList topicList = TopicList.decode(body, TopicList.class);
+ return topicList;
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public boolean cleanExpiredConsumeQueue(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, null);
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return true;
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public boolean cleanUnusedTopicByAddr(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null);
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return true;
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+ public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId, boolean jstack,
+ final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setClientId(clientId);
+ requestHeader.setJstackEnable(jstack);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ byte[] body = response.getBody();
+ if (body != null) {
+ ConsumerRunningInfo info = ConsumerRunningInfo.decode(body, ConsumerRunningInfo.class);
+ return info;
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, //
+ String consumerGroup, //
+ String clientId, //
+ String msgId, //
+ final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setClientId(clientId);
+ requestHeader.setMsgId(msgId);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ byte[] body = response.getBody();
+ if (body != null) {
+ ConsumeMessageDirectlyResult info = ConsumeMessageDirectlyResult.decode(body, ConsumeMessageDirectlyResult.class);
+ return info;
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+ public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group, Set<String> filterGroup,
+ long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ InterruptedException {
+ QueryCorrectionOffsetHeader requestHeader = new QueryCorrectionOffsetHeader();
+ requestHeader.setCompareGroup(group);
+ requestHeader.setTopic(topic);
+ if (filterGroup != null) {
+ StringBuilder sb = new StringBuilder();
+ String splitor = "";
+ for (String s : filterGroup) {
+ sb.append(splitor).append(s);
+ splitor = ",";
+ }
+ requestHeader.setFilterGroups(sb.toString());
+ }
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CORRECTION_OFFSET, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ if (response.getBody() != null) {
+ QueryCorrectionOffsetBody body = QueryCorrectionOffsetBody.decode(response.getBody(), QueryCorrectionOffsetBody.class);
+ return body.getCorrectionOffsets();
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+ public TopicList getUnitTopicList(final boolean containRetry, final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_UNIT_TOPIC_LIST, null);
+
+ RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ byte[] body = response.getBody();
+ if (body != null) {
+ TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
+ if (!containRetry) {
+ Iterator<String> it = topicList.getTopicList().iterator();
+ while (it.hasNext()) {
+ String topic = it.next();
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
+ it.remove();
+ }
+ }
+
+ return topicList;
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public TopicList getHasUnitSubTopicList(final boolean containRetry, final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST, null);
+
+ RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ byte[] body = response.getBody();
+ if (body != null) {
+ TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
+ if (!containRetry) {
+ Iterator<String> it = topicList.getTopicList().iterator();
+ while (it.hasNext()) {
+ String topic = it.next();
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
+ it.remove();
+ }
+ }
+ return topicList;
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public TopicList getHasUnitSubUnUnitTopicList(final boolean containRetry, final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST, null);
+
+ RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ byte[] body = response.getBody();
+ if (body != null) {
+ TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
+ if (!containRetry) {
+ Iterator<String> it = topicList.getTopicList().iterator();
+ while (it.hasNext()) {
+ String topic = it.next();
+ if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
+ it.remove();
+ }
+ }
+ return topicList;
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public void cloneGroupOffset(final String addr, final String srcGroup, final String destGroup, final String topic,
+ final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ CloneGroupOffsetRequestHeader requestHeader = new CloneGroupOffsetRequestHeader();
+ requestHeader.setSrcGroup(srcGroup);
+ requestHeader.setDestGroup(destGroup);
+ requestHeader.setTopic(topic);
+ requestHeader.setOffline(isOffline);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLONE_GROUP_OFFSET, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ return;
+ }
+ default:
+ break;
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+
+ public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey, long timeoutMillis)
+ throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ InterruptedException {
+ ViewBrokerStatsDataRequestHeader request
<TRUNCATED>