You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 08:13:48 UTC
[23/34] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 84ee7db..2dd9200 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,6 +16,17 @@
*/
package org.apache.rocketmq.client.impl;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
@@ -37,21 +48,97 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.*;
-import org.apache.rocketmq.common.protocol.header.*;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
+import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
+import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
+import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.*;
+import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.exception.*;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
@@ -60,17 +147,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
public class MQClientAPIImpl {
private final static Logger log = ClientLogger.getLog();
public static boolean sendSmartMsg =
- Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
+ Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
static {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
@@ -83,7 +164,7 @@ public class MQClientAPIImpl {
private ClientConfig clientConfig;
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor,
- RPCHook rpcHook, final ClientConfig clientConfig) {
+ RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.WS_ADDR, clientConfig.getUnitName());
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
@@ -149,14 +230,14 @@ public class MQClientAPIImpl {
}
public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
byte[] body = RemotingSerializable.encode(config);
request.setBody(body);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -171,7 +252,7 @@ public class MQClientAPIImpl {
}
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
@@ -185,7 +266,7 @@ public class MQClientAPIImpl {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -199,31 +280,31 @@ public class MQClientAPIImpl {
}
public SendResult sendMessage(//
- final String addr, // 1
- final String brokerName, // 2
- final Message msg, // 3
- final SendMessageRequestHeader requestHeader, // 4
- final long timeoutMillis, // 5
- final CommunicationMode communicationMode, // 6
- final SendMessageContext context, // 7
- final DefaultMQProducerImpl producer // 8
+ final String addr, // 1
+ final String brokerName, // 2
+ final Message msg, // 3
+ final SendMessageRequestHeader requestHeader, // 4
+ final long timeoutMillis, // 5
+ final CommunicationMode communicationMode, // 6
+ final SendMessageContext context, // 7
+ final DefaultMQProducerImpl producer // 8
) throws RemotingException, MQBrokerException, InterruptedException {
return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
}
public SendResult sendMessage(//
- final String addr, // 1
- final String brokerName, // 2
- final Message msg, // 3
- final SendMessageRequestHeader requestHeader, // 4
- final long timeoutMillis, // 5
- final CommunicationMode communicationMode, // 6
- final SendCallback sendCallback, // 7
- final TopicPublishInfo topicPublishInfo, // 8
- final MQClientInstance instance, // 9
- final int retryTimesWhenSendFailed, // 10
- final SendMessageContext context, // 11
- final DefaultMQProducerImpl producer // 12
+ final String addr, // 1
+ final String brokerName, // 2
+ final Message msg, // 3
+ final SendMessageRequestHeader requestHeader, // 4
+ final long timeoutMillis, // 5
+ final CommunicationMode communicationMode, // 6
+ final SendCallback sendCallback, // 7
+ final TopicPublishInfo topicPublishInfo, // 8
+ final MQClientInstance instance, // 9
+ final int retryTimesWhenSendFailed, // 10
+ final SendMessageContext context, // 11
+ final DefaultMQProducerImpl producer // 12
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = null;
if (sendSmartMsg) {
@@ -242,7 +323,7 @@ public class MQClientAPIImpl {
case ASYNC:
final AtomicInteger times = new AtomicInteger();
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, context, producer);
+ retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
@@ -255,11 +336,11 @@ public class MQClientAPIImpl {
}
private SendResult sendMessageSync(//
- final String addr, //
- final String brokerName, //
- final Message msg, //
- final long timeoutMillis, //
- final RemotingCommand request//
+ final String addr, //
+ final String brokerName, //
+ final Message msg, //
+ final long timeoutMillis, //
+ final RemotingCommand request//
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
@@ -267,18 +348,18 @@ public class MQClientAPIImpl {
}
private void sendMessageAsync(//
- final String addr, //
- final String brokerName, //
- final Message msg, //
- final long timeoutMillis, //
- final RemotingCommand request, //
- final SendCallback sendCallback, //
- final TopicPublishInfo topicPublishInfo, //
- final MQClientInstance instance, //
- final int retryTimesWhenSendFailed, //
- final AtomicInteger times, //
- final SendMessageContext context, //
- final DefaultMQProducerImpl producer //
+ final String addr, //
+ final String brokerName, //
+ final Message msg, //
+ final long timeoutMillis, //
+ final RemotingCommand request, //
+ final SendCallback sendCallback, //
+ final TopicPublishInfo topicPublishInfo, //
+ final MQClientInstance instance, //
+ final int retryTimesWhenSendFailed, //
+ final AtomicInteger times, //
+ final SendMessageContext context, //
+ final DefaultMQProducerImpl producer //
) throws InterruptedException, RemotingException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
@@ -318,68 +399,67 @@ public class MQClientAPIImpl {
} catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, e, context, false, producer);
+ retryTimesWhenSendFailed, times, e, context, false, producer);
}
} else {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
} else if (responseFuture.isTimeout()) {
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause());
+ responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
} else {
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
+ retryTimesWhenSendFailed, times, ex, context, true, producer);
}
}
}
});
}
-
private void onExceptionImpl(final String brokerName, //
- final Message msg, //
- final long timeoutMillis, //
- final RemotingCommand request, //
- final SendCallback sendCallback, //
- final TopicPublishInfo topicPublishInfo, //
- final MQClientInstance instance, //
- final int timesTotal, //
- final AtomicInteger curTimes, //
- final Exception e, //
- final SendMessageContext context, //
- final boolean needRetry, //
- final DefaultMQProducerImpl producer // 12
+ final Message msg, //
+ final long timeoutMillis, //
+ final RemotingCommand request, //
+ final SendCallback sendCallback, //
+ final TopicPublishInfo topicPublishInfo, //
+ final MQClientInstance instance, //
+ final int timesTotal, //
+ final AtomicInteger curTimes, //
+ final Exception e, //
+ final SendMessageContext context, //
+ final boolean needRetry, //
+ final DefaultMQProducerImpl producer // 12
) {
int tmp = curTimes.incrementAndGet();
if (needRetry && tmp <= timesTotal) {
MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName());
log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
- tmpmq.getBrokerName());
+ tmpmq.getBrokerName());
try {
request.setOpaque(RemotingCommand.createNewRequestId());
sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
- timesTotal, curTimes, context, producer);
+ timesTotal, curTimes, context, producer);
} catch (InterruptedException e1) {
onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
- context, false, producer);
+ context, false, producer);
} catch (RemotingConnectException e1) {
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
- context, true, producer);
+ context, true, producer);
} catch (RemotingTooMuchRequestException e1) {
onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
- context, false, producer);
+ context, false, producer);
} catch (RemotingException e1) {
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
- context, true, producer);
+ context, true, producer);
}
} else {
if (context != null) {
@@ -393,11 +473,10 @@ public class MQClientAPIImpl {
}
}
-
private SendResult processSendResponse(//
- final String brokerName, //
- final Message msg, //
- final RemotingCommand response//
+ final String brokerName, //
+ final Message msg, //
+ final RemotingCommand response//
) throws MQBrokerException, RemotingCommandException {
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT:
@@ -426,13 +505,13 @@ public class MQClientAPIImpl {
}
SendMessageResponseHeader responseHeader =
- (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+ (SendMessageResponseHeader)response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId());
SendResult sendResult = new SendResult(sendStatus,
- MessageClientIDSetter.getUniqID(msg),
- responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
+ MessageClientIDSetter.getUniqID(msg),
+ responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
@@ -454,13 +533,12 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public PullResult pullMessage(//
- final String addr, //
- final PullMessageRequestHeader requestHeader, //
- final long timeoutMillis, //
- final CommunicationMode communicationMode, //
- final PullCallback pullCallback//
+ final String addr, //
+ final PullMessageRequestHeader requestHeader, //
+ final long timeoutMillis, //
+ final CommunicationMode communicationMode, //
+ final PullCallback pullCallback//
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
@@ -481,12 +559,11 @@ public class MQClientAPIImpl {
return null;
}
-
private void pullMessageAsync(//
- final String addr, // 1
- final RemotingCommand request, //
- final long timeoutMillis, //
- final PullCallback pullCallback//
+ final String addr, // 1
+ final RemotingCommand request, //
+ final long timeoutMillis, //
+ final PullCallback pullCallback//
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
@@ -505,7 +582,7 @@ public class MQClientAPIImpl {
pullCallback.onException(new MQClientException("send request failed", responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause()));
+ responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause()));
}
@@ -515,9 +592,9 @@ public class MQClientAPIImpl {
}
private PullResult pullMessageSync(//
- final String addr, // 1
- final RemotingCommand request, // 2
- final long timeoutMillis// 3
+ final String addr, // 1
+ final RemotingCommand request, // 2
+ final long timeoutMillis// 3
) throws RemotingException, InterruptedException, MQBrokerException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
@@ -545,20 +622,20 @@ public class MQClientAPIImpl {
}
PullMessageResponseHeader responseHeader =
- (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+ (PullMessageResponseHeader)response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
- responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
+ responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}
public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException {
+ throws RemotingException, MQBrokerException, InterruptedException {
ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader();
requestHeader.setOffset(phyoffset);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -573,9 +650,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException {
+ throws RemotingException, MQBrokerException, InterruptedException {
SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
@@ -583,12 +659,12 @@ public class MQClientAPIImpl {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
SearchOffsetResponseHeader responseHeader =
- (SearchOffsetResponseHeader) response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
+ (SearchOffsetResponseHeader)response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
return responseHeader.getOffset();
}
default:
@@ -598,21 +674,20 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException {
+ throws RemotingException, MQBrokerException, InterruptedException {
GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
GetMaxOffsetResponseHeader responseHeader =
- (GetMaxOffsetResponseHeader) response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
+ (GetMaxOffsetResponseHeader)response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
return responseHeader.getOffset();
}
@@ -623,24 +698,23 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public List<String> getConsumerIdListByGroup(//
- final String addr, //
- final String consumerGroup, //
- final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- MQBrokerException, InterruptedException {
+ final String addr, //
+ final String consumerGroup, //
+ final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ MQBrokerException, InterruptedException {
GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
if (response.getBody() != null) {
GetConsumerListByGroupResponseBody body =
- GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
+ GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);
return body.getConsumerIdList();
}
}
@@ -651,21 +725,20 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException {
+ throws RemotingException, MQBrokerException, InterruptedException {
GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
GetMinOffsetResponseHeader responseHeader =
- (GetMinOffsetResponseHeader) response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
+ (GetMinOffsetResponseHeader)response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
return responseHeader.getOffset();
}
@@ -676,21 +749,20 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException {
+ throws RemotingException, MQBrokerException, InterruptedException {
GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
GetEarliestMsgStoretimeResponseHeader responseHeader =
- (GetEarliestMsgStoretimeResponseHeader) response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
+ (GetEarliestMsgStoretimeResponseHeader)response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
return responseHeader.getTimestamp();
}
@@ -701,21 +773,20 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public long queryConsumerOffset(//
- final String addr, //
- final QueryConsumerOffsetRequestHeader requestHeader, //
- final long timeoutMillis//
+ final String addr, //
+ final QueryConsumerOffsetRequestHeader requestHeader, //
+ final long timeoutMillis//
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryConsumerOffsetResponseHeader responseHeader =
- (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+ (QueryConsumerOffsetResponseHeader)response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
return responseHeader.getOffset();
}
@@ -726,16 +797,15 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public void updateConsumerOffset(//
- final String addr, //
- final UpdateConsumerOffsetRequestHeader requestHeader, //
- final long timeoutMillis//
+ final String addr, //
+ final UpdateConsumerOffsetRequestHeader requestHeader, //
+ final long timeoutMillis//
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -748,23 +818,21 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public void updateConsumerOffsetOneway(//
- final String addr, //
- final UpdateConsumerOffsetRequestHeader requestHeader, //
- final long timeoutMillis//
+ final String addr, //
+ final UpdateConsumerOffsetRequestHeader requestHeader, //
+ final long timeoutMillis//
) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,
- InterruptedException {
+ InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
}
-
public void sendHearbeat(//
- final String addr, //
- final HeartbeatData heartbeatData, //
- final long timeoutMillis//
+ final String addr, //
+ final HeartbeatData heartbeatData, //
+ final long timeoutMillis//
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
@@ -782,13 +850,12 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public void unregisterClient(//
- final String addr, //
- final String clientID, //
- final String producerGroup, //
- final String consumerGroup, //
- final long timeoutMillis//
+ final String addr, //
+ final String clientID, //
+ final String producerGroup, //
+ final String consumerGroup, //
+ final long timeoutMillis//
) throws RemotingException, MQBrokerException, InterruptedException {
final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
requestHeader.setClientID(clientID);
@@ -809,12 +876,11 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public void endTransactionOneway(//
- final String addr, //
- final EndTransactionRequestHeader requestHeader, //
- final String remark, //
- final long timeoutMillis//
+ final String addr, //
+ final EndTransactionRequestHeader requestHeader, //
+ final String remark, //
+ final long timeoutMillis//
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
@@ -822,23 +888,21 @@ public class MQClientAPIImpl {
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
}
-
public void queryMessage(
- final String addr,
- final QueryMessageRequestHeader requestHeader,
- final long timeoutMillis,
- final InvokeCallback invokeCallback,
- final Boolean isUnqiueKey
+ final String addr,
+ final QueryMessageRequestHeader requestHeader,
+ final long timeoutMillis,
+ final InvokeCallback invokeCallback,
+ final Boolean isUnqiueKey
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader);
request.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, isUnqiueKey.toString());
this.remotingClient.invokeAsync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis,
- invokeCallback);
+ invokeCallback);
}
-
public boolean registerClient(final String addr, final HeartbeatData heartbeat, final long timeoutMillis)
- throws RemotingException, InterruptedException {
+ throws RemotingException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
request.setBody(heartbeat.encode());
@@ -846,14 +910,13 @@ public class MQClientAPIImpl {
return response.getCode() == ResponseCode.SUCCESS;
}
-
public void consumerSendMessageBack(
- final String addr,
- final MessageExt msg,
- final String consumerGroup,
- final int delayLevel,
- final long timeoutMillis,
- final int maxConsumeRetryTimes
+ final String addr,
+ final MessageExt msg,
+ final String consumerGroup,
+ final int delayLevel,
+ final long timeoutMillis,
+ final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
@@ -866,7 +929,7 @@ public class MQClientAPIImpl {
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -879,16 +942,15 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public Set<MessageQueue> lockBatchMQ(//
- final String addr, //
- final LockBatchRequestBody requestBody, //
- final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
+ final String addr, //
+ final LockBatchRequestBody requestBody, //
+ final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
request.setBody(requestBody.encode());
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
@@ -902,12 +964,11 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public void unlockBatchMQ(//
- final String addr, //
- final UnlockBatchRequestBody requestBody, //
- final long timeoutMillis, //
- final boolean oneway//
+ final String addr, //
+ final UnlockBatchRequestBody requestBody, //
+ final long timeoutMillis, //
+ final boolean oneway//
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
@@ -917,7 +978,7 @@ public class MQClientAPIImpl {
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
} else {
RemotingCommand response = this.remotingClient
- .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
+ .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
@@ -930,16 +991,15 @@ public class MQClientAPIImpl {
}
}
-
public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, final long timeoutMillis) throws InterruptedException,
- RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
GetTopicStatsInfoRequestHeader requestHeader = new GetTopicStatsInfoRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
TopicStatsTable topicStatsTable = TopicStatsTable.decode(response.getBody(), TopicStatsTable.class);
@@ -952,17 +1012,15 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis)
- throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
- MQBrokerException {
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
+ MQBrokerException {
return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
}
-
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis)
- throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
- MQBrokerException {
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
+ MQBrokerException {
GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(topic);
@@ -970,7 +1028,7 @@ public class MQClientAPIImpl {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class);
@@ -983,17 +1041,16 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, final long timeoutMillis)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
- MQBrokerException {
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+ MQBrokerException {
GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader();
requestHeader.setProducerGroup(producerGroup);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_PRODUCER_CONNECTION_LIST, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return ProducerConnection.decode(response.getBody(), ProducerConnection.class);
@@ -1005,17 +1062,16 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, final long timeoutMillis)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
- MQBrokerException {
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+ MQBrokerException {
GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
ConsumerConnection consumerConnection = ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
@@ -1028,14 +1084,13 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public KVTable getBrokerRuntimeInfo(final String addr, final long timeoutMillis) throws RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
+ RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return KVTable.decode(response.getBody(), KVTable.class);
@@ -1047,10 +1102,9 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public void updateBrokerConfig(final String addr, final Properties properties, final long timeoutMillis)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
- MQBrokerException, UnsupportedEncodingException {
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+ MQBrokerException, UnsupportedEncodingException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_BROKER_CONFIG, null);
@@ -1058,7 +1112,7 @@ public class MQClientAPIImpl {
if (str != null && str.length() > 0) {
request.setBody(str.getBytes(MixAll.DEFAULT_CHARSET));
RemotingCommand response = this.remotingClient
- .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
+ .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
@@ -1071,10 +1125,9 @@ public class MQClientAPIImpl {
}
}
-
public Properties getBrokerConfig(final String addr, final long timeoutMillis)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
- MQBrokerException, UnsupportedEncodingException {
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+ MQBrokerException, UnsupportedEncodingException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
@@ -1091,7 +1144,7 @@ public class MQClientAPIImpl {
}
public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
@@ -1108,9 +1161,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
+ throws RemotingException, MQClientException, InterruptedException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
@@ -1136,9 +1188,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
+ throws RemotingException, MQClientException, InterruptedException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
@@ -1165,9 +1216,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public TopicList getTopicListFromNameServer(final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
+ throws RemotingException, MQClientException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
@@ -1187,9 +1237,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, final long timeoutMillis) throws RemotingCommandException,
- RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
+ RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader();
requestHeader.setBrokerName(brokerName);
@@ -1200,7 +1249,7 @@ public class MQClientAPIImpl {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
WipeWritePermOfBrokerResponseHeader responseHeader =
- (WipeWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
+ (WipeWritePermOfBrokerResponseHeader)response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
return responseHeader.getWipeTopicCount();
}
default:
@@ -1210,15 +1259,14 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -1231,9 +1279,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public void deleteTopicInNameServer(final String addr, final String topic, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, requestHeader);
@@ -1251,15 +1298,14 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public void deleteSubscriptionGroup(final String addr, final String groupName, final long timeoutMillis)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader();
requestHeader.setGroupName(groupName);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -1272,9 +1318,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public String getKVConfigValue(final String namespace, final String key, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
+ throws RemotingException, MQClientException, InterruptedException {
GetKVConfigRequestHeader requestHeader = new GetKVConfigRequestHeader();
requestHeader.setNamespace(namespace);
requestHeader.setKey(key);
@@ -1286,7 +1331,7 @@ public class MQClientAPIImpl {
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
GetKVConfigResponseHeader responseHeader =
- (GetKVConfigResponseHeader) response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class);
+ (GetKVConfigResponseHeader)response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class);
return responseHeader.getValue();
}
default:
@@ -1296,9 +1341,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public void putKVConfigValue(final String namespace, final String key, final String value, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
+ throws RemotingException, MQClientException, InterruptedException {
PutKVConfigRequestHeader requestHeader = new PutKVConfigRequestHeader();
requestHeader.setNamespace(namespace);
requestHeader.setKey(key);
@@ -1327,9 +1371,8 @@ public class MQClientAPIImpl {
}
}
-
public void deleteKVConfigValue(final String namespace, final String key, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
+ throws RemotingException, MQClientException, InterruptedException {
DeleteKVConfigRequestHeader requestHeader = new DeleteKVConfigRequestHeader();
requestHeader.setNamespace(namespace);
requestHeader.setKey(key);
@@ -1356,9 +1399,8 @@ public class MQClientAPIImpl {
}
}
-
public KVTable getKVListByNamespace(final String namespace, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
+ throws RemotingException, MQClientException, InterruptedException {
GetKVListByNamespaceRequestHeader requestHeader = new GetKVListByNamespaceRequestHeader();
requestHeader.setNamespace(namespace);
@@ -1377,17 +1419,15 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
- final long timestamp, final boolean isForce, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
+ final long timestamp, final boolean isForce, final long timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
return invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, false);
}
-
public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
- final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
- throws RemotingException, MQClientException, InterruptedException {
+ final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
+ throws RemotingException, MQClientException, InterruptedException {
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
@@ -1400,7 +1440,7 @@ public class MQClientAPIImpl {
}
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -1416,9 +1456,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group,
- final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
@@ -1427,7 +1466,7 @@ public class MQClientAPIImpl {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -1443,17 +1482,16 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public GroupList queryTopicConsumeByWho(final String addr, final String topic, final long timeoutMillis)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
- MQBrokerException {
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+ MQBrokerException {
QueryTopicConsumeByWhoRequestHeader requestHeader = new QueryTopicConsumeByWhoRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
GroupList groupList = GroupList.decode(response.getBody(), GroupList.class);
@@ -1466,10 +1504,9 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group, final long timeoutMillis)
- throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
- MQBrokerException {
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
+ MQBrokerException {
QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
@@ -1477,7 +1514,7 @@ public class MQClientAPIImpl {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
@@ -1490,9 +1527,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public TopicList getTopicsByCluster(final String cluster, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
+ throws RemotingException, MQClientException, InterruptedException {
GetTopicsByClusterRequestHeader requestHeader = new GetTopicsByClusterRequestHeader();
requestHeader.setCluster(cluster);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPICS_BY_CLUSTER, requestHeader);
@@ -1514,15 +1550,14 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public void registerMessageFilterClass(final String addr, //
- final String consumerGroup, //
- final String topic, //
- final String className, //
- final int classCRC, //
- final byte[] classBody, //
- final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- InterruptedException, MQBrokerException {
+ final String consumerGroup, //
+ final String topic, //
+ final String className, //
+ final int classCRC, //
+ final byte[] classBody, //
+ final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ InterruptedException, MQBrokerException {
RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setClassName(className);
@@ -1543,7 +1578,6 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
public TopicList getSystemTopicList(final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null);
@@ -1555,7 +1589,7 @@ public class MQClientAPIImpl {
if (body != null) {
TopicList topicList = TopicList.decode(response.getBody(), TopicList.class);
if (topicList.getTopicList() != null && !topicList.getTopicList().isEmpty()
- && !UtilAll.isBlank(topicList.getBrokerAddr())) {
+ && !UtilAll.isBlank(topicList.getBrokerAddr())) {
TopicList tmp = getSystemTopicListFromBroker(topicList.getBrokerAddr(), timeoutMillis);
if (tmp.getTopicList() != null && !tmp.getTopicList().isEmpty()) {
topicList.getTopicList().addAll(tmp.getTopicList());
@@ -1571,13 +1605,12 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public TopicList getSystemTopicListFromBroker(final String addr, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
+ throws RemotingException, MQClientException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -1594,12 +1627,11 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public boolean cleanExpiredConsumeQueue(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+ RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return true;
@@ -1611,12 +1643,11 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public boolean cleanUnusedTopicByAddr(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+ RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return true;
@@ -1629,7 +1660,7 @@ public class MQClientAPIImpl {
}
public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId, boolean jstack,
- final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setClientId(clientId);
@@ -1638,7 +1669,7 @@ public class MQClientAPIImpl {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -1656,10 +1687,10 @@ public class MQClientAPIImpl {
}
public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, //
- String consumerGroup, //
- String clientId, //
- String msgId, //
- final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ String consumerGroup, //
+ String clientId, //
+ String msgId, //
+ final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setClientId(clientId);
@@ -1668,7 +1699,7 @@ public class MQClientAPIImpl {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -1686,8 +1717,8 @@ public class MQClientAPIImpl {
}
public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group, Set<String> filterGroup,
- long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- InterruptedException {
+ long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ InterruptedException {
QueryCorrectionOffsetHeader requestHeader = new QueryCorrectionOffsetHeader();
requestHeader.setCompareGroup(group);
requestHeader.setTopic(topic);
@@ -1703,7 +1734,7 @@ public class MQClientAPIImpl {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CORRECTION_OFFSET, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -1720,7 +1751,7 @@ public class MQClientAPIImpl {
}
public TopicList getUnitTopicList(final boolean containRetry, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
+ throws RemotingException, MQClientException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_UNIT_TOPIC_LIST, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
@@ -1749,9 +1780,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public TopicList getHasUnitSubTopicList(final boolean containRetry, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
+ throws RemotingException, MQClientException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
@@ -1779,9 +1809,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public TopicList getHasUnitSubUnUnitTopicList(final boolean containRetry, final long timeoutMillis)
- throws RemotingException, MQClientException, InterruptedException {
+ throws RemotingException, MQClientException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
@@ -1809,9 +1838,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public void cloneGroupOffset(final String addr, final String srcGroup, final String destGroup, final String topic,
- final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
CloneGroupOffsetRequestHeader requestHeader = new CloneGroupOffsetRequestHeader();
requestHeader.setSrcGroup(srcGroup);
requestHeader.setDestGroup(destGroup);
@@ -1820,7 +1848,7 @@ public class MQClientAPIImpl {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLONE_GROUP_OFFSET, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
+ request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -1833,10 +1861,9 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey, long timeoutMillis)
- throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- InterruptedException {
+ throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ InterruptedException {
ViewBrokerStatsDataRequestHeader requestHeader = new ViewBrokerStatsDataRequestHeader();
requestHeader.setStatsName(statsName);
requestHeader.setStatsKey(statsKey);
@@ -1844,7 +1871,7 @@ public class MQClientAPIImpl {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_BROKER_STATS_DATA, requestHeader);
RemotingCommand response = this.remotingClient
- .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
+ .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -1860,23 +1887,21 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public Set<String> getClusterList(String topic, long timeoutMillis) throws MQClientException, RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+ RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
// todo:jodie
return Collections.EMPTY_SET;
}
-
public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder, long timeoutMillis) throws MQClientException,
- RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+ RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
GetConsumeStatsInBrokerHeader requestHeader = new GetConsumeStatsInBrokerHeader();
requestHeader.setIsOrder(isOrder);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONSUME_STATS, requestHeader);
RemotingCommand response = this.remotingClient
- .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
+ .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -1892,12 +1917,11 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
-
public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException,
- RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = this.remotingClient
- .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
+ .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
@@ -1909,13 +1933,12 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
<TRUNCATED>