You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:32 UTC
[36/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java
deleted file mode 100644
index 3d5ba28..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java
+++ /dev/null
@@ -1,1996 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl;
-
-import com.alibaba.rocketmq.client.ClientConfig;
-import com.alibaba.rocketmq.client.consumer.PullCallback;
-import com.alibaba.rocketmq.client.consumer.PullResult;
-import com.alibaba.rocketmq.client.consumer.PullStatus;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.hook.SendMessageContext;
-import com.alibaba.rocketmq.client.impl.consumer.PullResultExt;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl;
-import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.client.producer.SendCallback;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.client.producer.SendStatus;
-import com.alibaba.rocketmq.common.MQVersion;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.TopicConfig;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.admin.ConsumeStats;
-import com.alibaba.rocketmq.common.admin.TopicStatsTable;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.common.namesrv.TopAddressing;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.body.*;
-import com.alibaba.rocketmq.common.protocol.header.*;
-import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.namesrv.*;
-import com.alibaba.rocketmq.common.protocol.heartbeat.HeartbeatData;
-import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
-import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
-import com.alibaba.rocketmq.remoting.InvokeCallback;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.RemotingClient;
-import com.alibaba.rocketmq.remoting.exception.*;
-import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
-import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
-import com.alibaba.rocketmq.remoting.netty.ResponseFuture;
-import com.alibaba.rocketmq.remoting.protocol.LanguageCode;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-import org.slf4j.Logger;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
-/**
- * @author shijia.wxr
- */
-public class MQClientAPIImpl {
-
- private final static Logger log = ClientLogger.getLog();
- public static boolean sendSmartMsg =
- Boolean.parseBoolean(System.getProperty("com.alibaba.rocketmq.client.sendSmartMsg", "true"));
-
- static {
- System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
- }
-
- private final RemotingClient remotingClient;
- private final TopAddressing topAddressing;
- private final ClientRemotingProcessor clientRemotingProcessor;
- private String nameSrvAddr = null;
- private ClientConfig clientConfig;
-
- public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor,
- RPCHook rpcHook, final ClientConfig clientConfig) {
- this.clientConfig = clientConfig;
- topAddressing = new TopAddressing(MixAll.WS_ADDR, clientConfig.getUnitName());
- this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
- this.clientRemotingProcessor = clientRemotingProcessor;
-
- this.remotingClient.registerRPCHook(rpcHook);
- this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
-
- this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
-
- this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
-
- this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
-
- this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
-
- this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
- }
-
- public List<String> getNameServerAddressList() {
- return this.remotingClient.getNameServerAddressList();
- }
-
- public RemotingClient getRemotingClient() {
- return remotingClient;
- }
-
- public String fetchNameServerAddr() {
- try {
- String addrs = this.topAddressing.fetchNSAddr();
- if (addrs != null) {
- if (!addrs.equals(this.nameSrvAddr)) {
- log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);
- this.updateNameServerAddressList(addrs);
- this.nameSrvAddr = addrs;
- return nameSrvAddr;
- }
- }
- } catch (Exception e) {
- log.error("fetchNameServerAddr Exception", e);
- }
- return nameSrvAddr;
- }
-
- public void updateNameServerAddressList(final String addrs) {
- List<String> lst = new ArrayList<String>();
- String[] addrArray = addrs.split(";");
- if (addrArray != null) {
- for (String addr : addrArray) {
- lst.add(addr);
- }
-
- this.remotingClient.updateNameServerAddressList(lst);
- }
- }
-
- public void start() {
- this.remotingClient.start();
- }
-
- public void shutdown() {
- this.remotingClient.shutdown();
- }
-
- public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
-
- byte[] body = RemotingSerializable.encode(config);
- request.setBody(body);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
-
- }
-
- public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
- requestHeader.setTopic(topicConfig.getTopicName());
- requestHeader.setDefaultTopic(defaultTopic);
- requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
- requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
- requestHeader.setPerm(topicConfig.getPerm());
- requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
- requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
- requestHeader.setOrder(topicConfig.isOrder());
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
- public SendResult sendMessage(//
- final String addr, // 1
- final String brokerName, // 2
- final Message msg, // 3
- final SendMessageRequestHeader requestHeader, // 4
- final long timeoutMillis, // 5
- final CommunicationMode communicationMode, // 6
- final SendMessageContext context, // 7
- final DefaultMQProducerImpl producer // 8
- ) throws RemotingException, MQBrokerException, InterruptedException {
- return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
- }
-
- public SendResult sendMessage(//
- final String addr, // 1
- final String brokerName, // 2
- final Message msg, // 3
- final SendMessageRequestHeader requestHeader, // 4
- final long timeoutMillis, // 5
- final CommunicationMode communicationMode, // 6
- final SendCallback sendCallback, // 7
- final TopicPublishInfo topicPublishInfo, // 8
- final MQClientInstance instance, // 9
- final int retryTimesWhenSendFailed, // 10
- final SendMessageContext context, // 11
- final DefaultMQProducerImpl producer // 12
- ) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request = null;
- if (sendSmartMsg) {
- SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
- request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
- } else {
- request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
- }
-
- request.setBody(msg.getBody());
-
- switch (communicationMode) {
- case ONEWAY:
- this.remotingClient.invokeOneway(addr, request, timeoutMillis);
- return null;
- case ASYNC:
- final AtomicInteger times = new AtomicInteger();
- this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, context, producer);
- return null;
- case SYNC:
- return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
- default:
- assert false;
- break;
- }
-
- return null;
- }
-
- private SendResult sendMessageSync(//
- final String addr, //
- final String brokerName, //
- final Message msg, //
- final long timeoutMillis, //
- final RemotingCommand request//
- ) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
- assert response != null;
- return this.processSendResponse(brokerName, msg, response);
- }
-
- private void sendMessageAsync(//
- final String addr, //
- final String brokerName, //
- final Message msg, //
- final long timeoutMillis, //
- final RemotingCommand request, //
- final SendCallback sendCallback, //
- final TopicPublishInfo topicPublishInfo, //
- final MQClientInstance instance, //
- final int retryTimesWhenSendFailed, //
- final AtomicInteger times, //
- final SendMessageContext context, //
- final DefaultMQProducerImpl producer //
- ) throws InterruptedException, RemotingException {
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (null == sendCallback && response != null) {
-
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
- if (context != null && sendResult != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
- } catch (Throwable e) {
- //
- }
-
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- return;
- }
-
- if (response != null) {
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
- assert sendResult != null;
- if (context != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
-
- try {
- sendCallback.onSuccess(sendResult);
- } catch (Throwable e) {
- }
-
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- } catch (Exception e) {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, e, context, false, producer);
- }
- } else {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- if (!responseFuture.isSendRequestOK()) {
- MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
- } else if (responseFuture.isTimeout()) {
- MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause());
- onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
- } else {
- MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
- }
- }
- }
- });
- }
-
-
- private void onExceptionImpl(final String brokerName, //
- final Message msg, //
- final long timeoutMillis, //
- final RemotingCommand request, //
- final SendCallback sendCallback, //
- final TopicPublishInfo topicPublishInfo, //
- final MQClientInstance instance, //
- final int timesTotal, //
- final AtomicInteger curTimes, //
- final Exception e, //
- final SendMessageContext context, //
- final boolean needRetry, //
- final DefaultMQProducerImpl producer // 12
- ) {
- int tmp = curTimes.incrementAndGet();
- if (needRetry && tmp <= timesTotal) {
- MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
- String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName());
- log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
- tmpmq.getBrokerName());
- try {
- request.setOpaque(RemotingCommand.createNewRequestId());
- sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
- timesTotal, curTimes, context, producer);
- } catch (InterruptedException e1) {
- onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
- context, false, producer);
- } catch (RemotingConnectException e1) {
- producer.updateFaultItem(brokerName, 3000, true);
- onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
- context, true, producer);
- } catch (RemotingTooMuchRequestException e1) {
- onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
- context, false, producer);
- } catch (RemotingException e1) {
- producer.updateFaultItem(brokerName, 3000, true);
- onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
- context, true, producer);
- }
- } else {
- if (context != null) {
- context.setException(e);
- context.getProducer().executeSendMessageHookAfter(context);
- }
- try {
- sendCallback.onException(e);
- } catch (Exception e2) {
- }
- }
- }
-
-
- private SendResult processSendResponse(//
- final String brokerName, //
- final Message msg, //
- final RemotingCommand response//
- ) throws MQBrokerException, RemotingCommandException {
- switch (response.getCode()) {
- case ResponseCode.FLUSH_DISK_TIMEOUT:
- case ResponseCode.FLUSH_SLAVE_TIMEOUT:
- case ResponseCode.SLAVE_NOT_AVAILABLE: {
- // TODO LOG
- }
- case ResponseCode.SUCCESS: {
- SendStatus sendStatus = SendStatus.SEND_OK;
- switch (response.getCode()) {
- case ResponseCode.FLUSH_DISK_TIMEOUT:
- sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
- break;
- case ResponseCode.FLUSH_SLAVE_TIMEOUT:
- sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
- break;
- case ResponseCode.SLAVE_NOT_AVAILABLE:
- sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
- break;
- case ResponseCode.SUCCESS:
- sendStatus = SendStatus.SEND_OK;
- break;
- default:
- assert false;
- break;
- }
-
- SendMessageResponseHeader responseHeader =
- (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
-
- MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());
-
- SendResult sendResult = new SendResult(sendStatus,
- MessageClientIDSetter.getUniqID(msg),
- responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
- sendResult.setTransactionId(responseHeader.getTransactionId());
- String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
- String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
- if (regionId == null || regionId.isEmpty()) {
- regionId = MixAll.DEFAULT_TRACE_REGION_ID;
- }
- if (traceOn != null && traceOn.equals("false")) {
- sendResult.setTraceOn(false);
- } else {
- sendResult.setTraceOn(true);
- }
- sendResult.setRegionId(regionId);
- return sendResult;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public PullResult pullMessage(//
- final String addr, //
- final PullMessageRequestHeader requestHeader, //
- final long timeoutMillis, //
- final CommunicationMode communicationMode, //
- final PullCallback pullCallback//
- ) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
-
- switch (communicationMode) {
- case ONEWAY:
- assert false;
- return null;
- case ASYNC:
- this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
- return null;
- case SYNC:
- return this.pullMessageSync(addr, request, timeoutMillis);
- default:
- assert false;
- break;
- }
-
- return null;
- }
-
-
- private void pullMessageAsync(//
- final String addr, // 1
- final RemotingCommand request, //
- final long timeoutMillis, //
- final PullCallback pullCallback//
- ) throws RemotingException, InterruptedException {
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- try {
- PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
- assert pullResult != null;
- pullCallback.onSuccess(pullResult);
- } catch (Exception e) {
- pullCallback.onException(e);
- }
- } else {
- if (!responseFuture.isSendRequestOK()) {
- pullCallback.onException(new MQClientException("send request failed", responseFuture.getCause()));
- } else if (responseFuture.isTimeout()) {
- pullCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause()));
- } else {
- pullCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause()));
- }
- }
- }
- });
- }
-
- private PullResult pullMessageSync(//
- final String addr, // 1
- final RemotingCommand request, // 2
- final long timeoutMillis// 3
- ) throws RemotingException, InterruptedException, MQBrokerException {
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
- assert response != null;
- return this.processPullResponse(response);
- }
-
- private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
- PullStatus pullStatus = PullStatus.NO_NEW_MSG;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS:
- pullStatus = PullStatus.FOUND;
- break;
- case ResponseCode.PULL_NOT_FOUND:
- pullStatus = PullStatus.NO_NEW_MSG;
- break;
- case ResponseCode.PULL_RETRY_IMMEDIATELY:
- pullStatus = PullStatus.NO_MATCHED_MSG;
- break;
- case ResponseCode.PULL_OFFSET_MOVED:
- pullStatus = PullStatus.OFFSET_ILLEGAL;
- break;
-
- default:
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
- PullMessageResponseHeader responseHeader =
- (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
-
- return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
- responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
- }
-
- public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException {
- ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
- requestHeader.setOffset(phyoffset);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
- MessageExt messageExt = MessageDecoder.clientDecode(byteBuffer, true);
- return messageExt;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException {
- SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setQueueId(queueId);
- requestHeader.setTimestamp(timestamp);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- SearchOffsetResponseHeader responseHeader =
- (SearchOffsetResponseHeader) response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
- return responseHeader.getOffset();
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException {
- GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setQueueId(queueId);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- GetMaxOffsetResponseHeader responseHeader =
- (GetMaxOffsetResponseHeader) response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
-
- return responseHeader.getOffset();
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public List<String> getConsumerIdListByGroup(//
- final String addr, //
- final String consumerGroup, //
- final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- MQBrokerException, InterruptedException {
- GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
- requestHeader.setConsumerGroup(consumerGroup);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- if (response.getBody() != null) {
- GetConsumerListByGroupResponseBody body =
- GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
- return body.getConsumerIdList();
- }
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException {
- GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setQueueId(queueId);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- GetMinOffsetResponseHeader responseHeader =
- (GetMinOffsetResponseHeader) response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
-
- return responseHeader.getOffset();
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException {
- GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setQueueId(queueId);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- GetEarliestMsgStoretimeResponseHeader responseHeader =
- (GetEarliestMsgStoretimeResponseHeader) response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
-
- return responseHeader.getTimestamp();
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public long queryConsumerOffset(//
- final String addr, //
- final QueryConsumerOffsetRequestHeader requestHeader, //
- final long timeoutMillis//
- ) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- QueryConsumerOffsetResponseHeader responseHeader =
- (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
-
- return responseHeader.getOffset();
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public void updateConsumerOffset(//
- final String addr, //
- final UpdateConsumerOffsetRequestHeader requestHeader, //
- final long timeoutMillis//
- ) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public void updateConsumerOffsetOneway(//
- final String addr, //
- final UpdateConsumerOffsetRequestHeader requestHeader, //
- final long timeoutMillis//
- ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,
- InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
-
- this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
- }
-
-
- public void sendHearbeat(//
- final String addr, //
- final HeartbeatData heartbeatData, //
- final long timeoutMillis//
- ) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
-
- request.setBody(heartbeatData.encode());
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public void unregisterClient(//
- final String addr, //
- final String clientID, //
- final String producerGroup, //
- final String consumerGroup, //
- final long timeoutMillis//
- ) throws RemotingException, MQBrokerException, InterruptedException {
- final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
- requestHeader.setClientID(clientID);
- requestHeader.setProducerGroup(producerGroup);
- requestHeader.setConsumerGroup(consumerGroup);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public void endTransactionOneway(//
- final String addr, //
- final EndTransactionRequestHeader requestHeader, //
- final String remark, //
- final long timeoutMillis//
- ) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
-
- request.setRemark(remark);
- this.remotingClient.invokeOneway(addr, request, timeoutMillis);
- }
-
-
- public void queryMessage(
- final String addr,
- final QueryMessageRequestHeader requestHeader,
- final long timeoutMillis,
- final InvokeCallback invokeCallback,
- final Boolean isUnqiueKey
- ) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader);
- request.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, isUnqiueKey.toString());
- this.remotingClient.invokeAsync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis,
- invokeCallback);
- }
-
-
- public boolean registerClient(final String addr, final HeartbeatData heartbeat, final long timeoutMillis)
- throws RemotingException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
-
- request.setBody(heartbeat.encode());
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
- return response.getCode() == ResponseCode.SUCCESS;
- }
-
-
- public void consumerSendMessageBack(
- final String addr,
- final MessageExt msg,
- final String consumerGroup,
- final int delayLevel,
- final long timeoutMillis,
- final int maxConsumeRetryTimes
- ) throws RemotingException, MQBrokerException, InterruptedException {
- ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
-
- requestHeader.setGroup(consumerGroup);
- requestHeader.setOriginTopic(msg.getTopic());
- requestHeader.setOffset(msg.getCommitLogOffset());
- requestHeader.setDelayLevel(delayLevel);
- requestHeader.setOriginMsgId(msg.getMsgId());
- requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public Set<MessageQueue> lockBatchMQ(//
- final String addr, //
- final LockBatchRequestBody requestBody, //
- final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
-
- request.setBody(requestBody.encode());
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
- Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();
- return messageQueues;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public void unlockBatchMQ(//
- final String addr, //
- final UnlockBatchRequestBody requestBody, //
- final long timeoutMillis, //
- final boolean oneway//
- ) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
-
- request.setBody(requestBody.encode());
-
- if (oneway) {
- this.remotingClient.invokeOneway(addr, request, timeoutMillis);
- } else {
- RemotingCommand response = this.remotingClient
- .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
- }
-
-
- public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, final long timeoutMillis) throws InterruptedException,
- RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- GetTopicStatsInfoRequestHeader requestHeader = new GetTopicStatsInfoRequestHeader();
- requestHeader.setTopic(topic);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- TopicStatsTable topicStatsTable = TopicStatsTable.decode(response.getBody(), TopicStatsTable.class);
- return topicStatsTable;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis)
- throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
- MQBrokerException {
- return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
- }
-
-
- public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis)
- throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
- MQBrokerException {
- GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
- requestHeader.setConsumerGroup(consumerGroup);
- requestHeader.setTopic(topic);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class);
- return consumeStats;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, final long timeoutMillis)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
- MQBrokerException {
- GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader();
- requestHeader.setProducerGroup(producerGroup);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_PRODUCER_CONNECTION_LIST, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return ProducerConnection.decode(response.getBody(), ProducerConnection.class);
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, final long timeoutMillis)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
- MQBrokerException {
- GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
- requestHeader.setConsumerGroup(consumerGroup);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- ConsumerConnection consumerConnection = ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
- return consumerConnection;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public KVTable getBrokerRuntimeInfo(final String addr, final long timeoutMillis) throws RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return KVTable.decode(response.getBody(), KVTable.class);
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public void updateBrokerConfig(final String addr, final Properties properties, final long timeoutMillis)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
- MQBrokerException, UnsupportedEncodingException {
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_BROKER_CONFIG, null);
-
- String str = MixAll.properties2String(properties);
- if (str != null && str.length() > 0) {
- request.setBody(str.getBytes(MixAll.DEFAULT_CHARSET));
- RemotingCommand response = this.remotingClient
- .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
- }
-
-
- public Properties getBrokerConfig(final String addr, final long timeoutMillis)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
- MQBrokerException, UnsupportedEncodingException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONFIG, null);
-
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return MixAll.string2Properties(new String(response.getBody(), MixAll.DEFAULT_CHARSET));
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
- public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
-
- RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- ClusterInfo responseBody = ClusterInfo.decode(response.getBody(), ClusterInfo.class);
- return responseBody;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
- GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
- requestHeader.setTopic(topic);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.TOPIC_NOT_EXIST: {
- // TODO LOG
- break;
- }
- case ResponseCode.SUCCESS: {
- byte[] body = response.getBody();
- if (body != null) {
- return TopicRouteData.decode(body, TopicRouteData.class);
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
- GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
- requestHeader.setTopic(topic);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.TOPIC_NOT_EXIST: {
- if (!topic.equals(MixAll.DEFAULT_TOPIC))
- log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
- break;
- }
- case ResponseCode.SUCCESS: {
- byte[] body = response.getBody();
- if (body != null) {
- return TopicRouteData.decode(body, TopicRouteData.class);
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public TopicList getTopicListFromNameServer(final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER, null);
-
- RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- byte[] body = response.getBody();
- if (body != null) {
- TopicList topicList = TopicList.decode(body, TopicList.class);
- return topicList;
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, final long timeoutMillis) throws RemotingCommandException,
- RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
- WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader();
- requestHeader.setBrokerName(brokerName);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.WIPE_WRITE_PERM_OF_BROKER, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- WipeWritePermOfBrokerResponseHeader responseHeader =
- (WipeWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
- return responseHeader.getWipeTopicCount();
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
- requestHeader.setTopic(topic);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public void deleteTopicInNameServer(final String addr, final String topic, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
- requestHeader.setTopic(topic);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public void deleteSubscriptionGroup(final String addr, final String groupName, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader();
- requestHeader.setGroupName(groupName);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public String getKVConfigValue(final String namespace, final String key, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
- GetKVConfigRequestHeader requestHeader = new GetKVConfigRequestHeader();
- requestHeader.setNamespace(namespace);
- requestHeader.setKey(key);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- GetKVConfigResponseHeader responseHeader =
- (GetKVConfigResponseHeader) response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class);
- return responseHeader.getValue();
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public void putKVConfigValue(final String namespace, final String key, final String value, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
- PutKVConfigRequestHeader requestHeader = new PutKVConfigRequestHeader();
- requestHeader.setNamespace(namespace);
- requestHeader.setKey(key);
- requestHeader.setValue(value);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUT_KV_CONFIG, requestHeader);
-
- List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
- if (nameServerAddressList != null) {
- RemotingCommand errResponse = null;
- for (String namesrvAddr : nameServerAddressList) {
- RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- break;
- }
- default:
- errResponse = response;
- }
- }
-
- if (errResponse != null) {
- throw new MQClientException(errResponse.getCode(), errResponse.getRemark());
- }
- }
- }
-
-
- public void deleteKVConfigValue(final String namespace, final String key, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
- DeleteKVConfigRequestHeader requestHeader = new DeleteKVConfigRequestHeader();
- requestHeader.setNamespace(namespace);
- requestHeader.setKey(key);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_KV_CONFIG, requestHeader);
-
- List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
- if (nameServerAddressList != null) {
- RemotingCommand errResponse = null;
- for (String namesrvAddr : nameServerAddressList) {
- RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- break;
- }
- default:
- errResponse = response;
- }
- }
- if (errResponse != null) {
- throw new MQClientException(errResponse.getCode(), errResponse.getRemark());
- }
- }
- }
-
-
- public KVTable getKVListByNamespace(final String namespace, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
- GetKVListByNamespaceRequestHeader requestHeader = new GetKVListByNamespaceRequestHeader();
- requestHeader.setNamespace(namespace);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KVLIST_BY_NAMESPACE, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return KVTable.decode(response.getBody(), KVTable.class);
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
- final long timestamp, final boolean isForce, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
- return invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, false);
- }
-
-
- public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
- final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
- throws RemotingException, MQClientException, InterruptedException {
- ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setGroup(group);
- requestHeader.setTimestamp(timestamp);
- requestHeader.setForce(isForce);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
- if (isC) {
- request.setLanguage(LanguageCode.CPP);
- }
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- if (response.getBody() != null) {
- ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
- return body.getOffsetTable();
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group,
- final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
- GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setGroup(group);
- requestHeader.setClientAddr(clientAddr);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- if (response.getBody() != null) {
- GetConsumerStatusBody body = GetConsumerStatusBody.decode(response.getBody(), GetConsumerStatusBody.class);
- return body.getConsumerTable();
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public GroupList queryTopicConsumeByWho(final String addr, final String topic, final long timeoutMillis)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
- MQBrokerException {
- QueryTopicConsumeByWhoRequestHeader requestHeader = new QueryTopicConsumeByWhoRequestHeader();
- requestHeader.setTopic(topic);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- GroupList groupList = GroupList.decode(response.getBody(), GroupList.class);
- return groupList;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group, final long timeoutMillis)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
- MQBrokerException {
- QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setGroup(group);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
- return consumeTimeSpanBody.getConsumeTimeSpanSet();
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public TopicList getTopicsByCluster(final String cluster, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
- GetTopicsByClusterRequestHeader requestHeader = new GetTopicsByClusterRequestHeader();
- requestHeader.setCluster(cluster);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPICS_BY_CLUSTER, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- byte[] body = response.getBody();
- if (body != null) {
- TopicList topicList = TopicList.decode(body, TopicList.class);
- return topicList;
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public void registerMessageFilterClass(final String addr, //
- final String consumerGroup, //
- final String topic, //
- final String className, //
- final int classCRC, //
- final byte[] classBody, //
- final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- InterruptedException, MQBrokerException {
- RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader();
- requestHeader.setConsumerGroup(consumerGroup);
- requestHeader.setClassName(className);
- requestHeader.setTopic(topic);
- requestHeader.setClassCRC(classCRC);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_MESSAGE_FILTER_CLASS, requestHeader);
- request.setBody(classBody);
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
-
- public TopicList getSystemTopicList(final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null);
-
- RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- byte[] body = response.getBody();
- if (body != null) {
- TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
- if (topicList.getTopicList() != null && !topicList.getTopicList().isEmpty()
- && !UtilAll.isBlank(topicList.getBrokerAddr())) {
- TopicList tmp = getSystemTopicListFromBroker(topicList.getBrokerAddr(), timeoutMillis);
- if (tmp.getTopicList() != null && !tmp.getTopicList().isEmpty()) {
- topicList.getTopicList().addAll(tmp.getTopicList());
- }
- }
- return topicList;
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public TopicList getSystemTopicListFromBroker(final String addr, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER, null);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- byte[] body = response.getBody();
- if (body != null) {
- TopicList topicList = TopicList.decode(body, TopicList.class);
- return topicList;
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public boolean cleanExpiredConsumeQueue(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, null);
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return true;
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public boolean cleanUnusedTopicByAddr(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null);
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return true;
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
- public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId, boolean jstack,
- final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
- GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader();
- requestHeader.setConsumerGroup(consumerGroup);
- requestHeader.setClientId(clientId);
- requestHeader.setJstackEnable(jstack);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- byte[] body = response.getBody();
- if (body != null) {
- ConsumerRunningInfo info = ConsumerRunningInfo.decode(body, ConsumerRunningInfo.class);
- return info;
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
- public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, //
- String consumerGroup, //
- String clientId, //
- String msgId, //
- final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
- ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader();
- requestHeader.setConsumerGroup(consumerGroup);
- requestHeader.setClientId(clientId);
- requestHeader.setMsgId(msgId);
-
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- byte[] body = response.getBody();
- if (body != null) {
- ConsumeMessageDirectlyResult info = ConsumeMessageDirectlyResult.decode(body, ConsumeMessageDirectlyResult.class);
- return info;
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
- public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group, Set<String> filterGroup,
- long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- InterruptedException {
- QueryCorrectionOffsetHeader requestHeader = new QueryCorrectionOffsetHeader();
- requestHeader.setCompareGroup(group);
- requestHeader.setTopic(topic);
- if (filterGroup != null) {
- StringBuilder sb = new StringBuilder();
- String splitor = "";
- for (String s : filterGroup) {
- sb.append(splitor).append(s);
- splitor = ",";
- }
- requestHeader.setFilterGroups(sb.toString());
- }
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CORRECTION_OFFSET, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- if (response.getBody() != null) {
- QueryCorrectionOffsetBody body = QueryCorrectionOffsetBody.decode(response.getBody(), QueryCorrectionOffsetBody.class);
- return body.getCorrectionOffsets();
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
- public TopicList getUnitTopicList(final boolean containRetry, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_UNIT_TOPIC_LIST, null);
-
- RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- byte[] body = response.getBody();
- if (body != null) {
- TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
- if (!containRetry) {
- Iterator<String> it = topicList.getTopicList().iterator();
- while (it.hasNext()) {
- String topic = it.next();
- if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
- it.remove();
- }
- }
-
- return topicList;
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public TopicList getHasUnitSubTopicList(final boolean containRetry, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST, null);
-
- RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- byte[] body = response.getBody();
- if (body != null) {
- TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
- if (!containRetry) {
- Iterator<String> it = topicList.getTopicList().iterator();
- while (it.hasNext()) {
- String topic = it.next();
- if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
- it.remove();
- }
- }
- return topicList;
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public TopicList getHasUnitSubUnUnitTopicList(final boolean containRetry, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST, null);
-
- RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- byte[] body = response.getBody();
- if (body != null) {
- TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
- if (!containRetry) {
- Iterator<String> it = topicList.getTopicList().iterator();
- while (it.hasNext()) {
- String topic = it.next();
- if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
- it.remove();
- }
- }
- return topicList;
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public void cloneGroupOffset(final String addr, final String srcGroup, final String destGroup, final String topic,
- final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
- CloneGroupOffsetRequestHeader requestHeader = new CloneGroupOffsetRequestHeader();
- requestHeader.setSrcGroup(srcGroup);
- requestHeader.setDestGroup(destGroup);
- requestHeader.setTopic(topic);
- requestHeader.setOffline(isOffline);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLONE_GROUP_OFFSET, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
- }
-
-
- public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey, long timeoutMillis)
- throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- InterruptedException
<TRUNCATED>