You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/15 06:59:57 UTC
[rocketmq] 01/02: Fix check stype
This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit ba2c768f5ff6fb22a5b9a3660bdbb078b95c185f
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Dec 15 14:57:32 2021 +0800
Fix check stype
---
.../apache/rocketmq/broker/BrokerController.java | 304 ++++++++++-----------
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 23 +-
.../broker/processor/AdminBrokerProcessor.java | 15 +-
.../broker/processor/ConsumerManageProcessor.java | 21 +-
.../rocketmq/broker/slave/SlaveSynchronize.java | 8 +-
.../topic/TopicQueueMappingCleanService.java | 6 +-
.../broker/topic/TopicQueueMappingManager.java | 13 +-
.../consumer/store/RemoteBrokerOffsetStore.java | 18 +-
.../client/exception/OffsetNotFoundException.java | 16 ++
.../rocketmq/client/impl/MQClientAPIImpl.java | 2 +-
.../client/impl/consumer/PullAPIWrapper.java | 21 +-
.../client/impl/factory/MQClientInstance.java | 51 ++--
.../header/GetTopicConfigRequestHeader.java | 2 -
.../apache/rocketmq/common/rpc/ClientMetadata.java | 16 ++
.../apache/rocketmq/common/rpc/RequestBuilder.java | 16 ++
.../org/apache/rocketmq/common/rpc/RpcClient.java | 16 ++
.../apache/rocketmq/common/rpc/RpcClientHook.java | 18 +-
.../apache/rocketmq/common/rpc/RpcClientImpl.java | 33 ++-
.../apache/rocketmq/common/rpc/RpcClientUtils.java | 16 ++
.../apache/rocketmq/common/rpc/RpcException.java | 16 ++
.../common/statictopic/LogicQueueMappingItem.java | 16 ++
.../common/statictopic/TopicQueueMappingInfo.java | 19 +-
.../common/statictopic/TopicQueueMappingUtils.java | 10 +-
.../statictopic/TopicRemappingDetailWrapper.java | 16 ++
.../remoting/protocol/RemotingCommand.java | 2 -
.../test/client/rmq/RMQNormalProducer.java | 3 -
.../rocketmq/test/util/MQAdminTestUtils.java | 101 +++----
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 2 -
.../tools/admin/DefaultMQAdminExtImpl.java | 4 -
.../apache/rocketmq/tools/admin/MQAdminExt.java | 2 -
.../apache/rocketmq/tools/admin/MQAdminUtils.java | 20 +-
.../topic/RemappingStaticTopicSubCommand.java | 8 +-
.../command/topic/UpdateStaticTopicSubCommand.java | 9 +-
33 files changed, 473 insertions(+), 370 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 338f31e..d0d319d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -157,7 +157,7 @@ public class BrokerController {
private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
private final BrokerOuterAPI brokerOuterAPI;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "BrokerControllerScheduledThread"));
+ "BrokerControllerScheduledThread"));
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> ackThreadPoolQueue;
@@ -200,14 +200,14 @@ public class BrokerController {
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
private Future<?> slaveSyncFuture;
- private Map<Class,AccessValidator> accessValidatorMap = new HashMap<Class, AccessValidator>();
+ private Map<Class, AccessValidator> accessValidatorMap = new HashMap<Class, AccessValidator>();
private long shouldStartTime;
public BrokerController(
- final BrokerConfig brokerConfig,
- final NettyServerConfig nettyServerConfig,
- final NettyClientConfig nettyClientConfig,
- final MessageStoreConfig messageStoreConfig
+ final BrokerConfig brokerConfig,
+ final NettyServerConfig nettyServerConfig,
+ final NettyClientConfig nettyClientConfig,
+ final MessageStoreConfig messageStoreConfig
) {
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
@@ -223,7 +223,7 @@ public class BrokerController {
this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this);
this.sendMessageProcessor = new SendMessageProcessor(this);
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService,
- this.popMessageProcessor);
+ this.popMessageProcessor);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
this.consumerFilterManager = new ConsumerFilterManager(this);
@@ -255,9 +255,9 @@ public class BrokerController {
this.brokerFastFailure = new BrokerFastFailure(this);
this.configuration = new Configuration(
- log,
- BrokerPathConfigHelper.getBrokerConfigPath(),
- this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
+ log,
+ BrokerPathConfigHelper.getBrokerConfigPath(),
+ this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);
}
@@ -297,11 +297,11 @@ public class BrokerController {
if (result) {
try {
this.messageStore =
- new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
- this.brokerConfig);
+ new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
+ this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
- ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
+ ((DLedgerCommitLog) ((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
@@ -322,77 +322,77 @@ public class BrokerController {
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getSendMessageThreadPoolNums(),
- this.brokerConfig.getSendMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.sendThreadPoolQueue,
- new ThreadFactoryImpl("SendMessageThread_"));
+ this.brokerConfig.getSendMessageThreadPoolNums(),
+ this.brokerConfig.getSendMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.sendThreadPoolQueue,
+ new ThreadFactoryImpl("SendMessageThread_"));
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getPullMessageThreadPoolNums(),
- this.brokerConfig.getPullMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.pullThreadPoolQueue,
- new ThreadFactoryImpl("PullMessageThread_"));
+ this.brokerConfig.getPullMessageThreadPoolNums(),
+ this.brokerConfig.getPullMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.pullThreadPoolQueue,
+ new ThreadFactoryImpl("PullMessageThread_"));
this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getAckMessageThreadPoolNums(),
- this.brokerConfig.getAckMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.ackThreadPoolQueue,
- new ThreadFactoryImpl("AckMessageThread_"));
+ this.brokerConfig.getAckMessageThreadPoolNums(),
+ this.brokerConfig.getAckMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.ackThreadPoolQueue,
+ new ThreadFactoryImpl("AckMessageThread_"));
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
- this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.replyThreadPoolQueue,
- new ThreadFactoryImpl("ProcessReplyMessageThread_"));
+ this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+ this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.replyThreadPoolQueue,
+ new ThreadFactoryImpl("ProcessReplyMessageThread_"));
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getQueryMessageThreadPoolNums(),
- this.brokerConfig.getQueryMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.queryThreadPoolQueue,
- new ThreadFactoryImpl("QueryMessageThread_"));
+ this.brokerConfig.getQueryMessageThreadPoolNums(),
+ this.brokerConfig.getQueryMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.queryThreadPoolQueue,
+ new ThreadFactoryImpl("QueryMessageThread_"));
this.adminBrokerExecutor =
- Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
- "AdminBrokerThread_"));
+ Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
+ "AdminBrokerThread_"));
this.clientManageExecutor = new ThreadPoolExecutor(
- this.brokerConfig.getClientManageThreadPoolNums(),
- this.brokerConfig.getClientManageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.clientManagerThreadPoolQueue,
- new ThreadFactoryImpl("ClientManageThread_"));
+ this.brokerConfig.getClientManageThreadPoolNums(),
+ this.brokerConfig.getClientManageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.clientManagerThreadPoolQueue,
+ new ThreadFactoryImpl("ClientManageThread_"));
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getHeartbeatThreadPoolNums(),
- this.brokerConfig.getHeartbeatThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.heartbeatThreadPoolQueue,
- new ThreadFactoryImpl("HeartbeatThread_", true));
+ this.brokerConfig.getHeartbeatThreadPoolNums(),
+ this.brokerConfig.getHeartbeatThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.heartbeatThreadPoolQueue,
+ new ThreadFactoryImpl("HeartbeatThread_", true));
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getEndTransactionThreadPoolNums(),
- this.brokerConfig.getEndTransactionThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.endTransactionThreadPoolQueue,
- new ThreadFactoryImpl("EndTransactionThread_"));
+ this.brokerConfig.getEndTransactionThreadPoolNums(),
+ this.brokerConfig.getEndTransactionThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.endTransactionThreadPoolQueue,
+ new ThreadFactoryImpl("EndTransactionThread_"));
this.consumerManageExecutor =
- Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
- "ConsumerManageThread_"));
+ Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
+ "ConsumerManageThread_"));
this.registerProcessor();
@@ -466,8 +466,8 @@ public class BrokerController {
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
this.loadBalanceExecutor =
- Executors.newFixedThreadPool(this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), new ThreadFactoryImpl(
- "LoadBalanceProcessorThread_"));
+ Executors.newFixedThreadPool(this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), new ThreadFactoryImpl(
+ "LoadBalanceProcessorThread_"));
if (this.brokerConfig.getNamesrvAddr() != null) {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
@@ -523,38 +523,38 @@ public class BrokerController {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
- new String[] {
- TlsSystemConfig.tlsServerCertPath,
- TlsSystemConfig.tlsServerKeyPath,
- TlsSystemConfig.tlsServerTrustCertPath
- },
- new FileWatchService.Listener() {
- boolean certChanged, keyChanged = false;
-
- @Override
- public void onChanged(String path) {
- if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
- log.info("The trust certificate changed, reload the ssl context");
- reloadServerSslContext();
+ new String[]{
+ TlsSystemConfig.tlsServerCertPath,
+ TlsSystemConfig.tlsServerKeyPath,
+ TlsSystemConfig.tlsServerTrustCertPath
+ },
+ new FileWatchService.Listener() {
+ boolean certChanged, keyChanged = false;
+
+ @Override
+ public void onChanged(String path) {
+ if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
+ log.info("The trust certificate changed, reload the ssl context");
+ reloadServerSslContext();
+ }
+ if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
+ certChanged = true;
+ }
+ if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
+ keyChanged = true;
+ }
+ if (certChanged && keyChanged) {
+ log.info("The certificate and private key changed, reload the ssl context");
+ certChanged = keyChanged = false;
+ reloadServerSslContext();
+ }
}
- if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
- certChanged = true;
- }
- if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
- keyChanged = true;
- }
- if (certChanged && keyChanged) {
- log.info("The certificate and private key changed, reload the ssl context");
- certChanged = keyChanged = false;
- reloadServerSslContext();
- }
- }
- private void reloadServerSslContext() {
- ((NettyRemotingServer) remotingServer).loadSslContext();
- ((NettyRemotingServer) fastRemotingServer).loadSslContext();
- }
- });
+ private void reloadServerSslContext() {
+ ((NettyRemotingServer) remotingServer).loadSslContext();
+ ((NettyRemotingServer) fastRemotingServer).loadSslContext();
+ }
+ });
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
@@ -593,9 +593,9 @@ public class BrokerController {
return;
}
- for (AccessValidator accessValidator: accessValidators) {
+ for (AccessValidator accessValidator : accessValidators) {
final AccessValidator validator = accessValidator;
- accessValidatorMap.put(validator.getClass(),validator);
+ accessValidatorMap.put(validator.getClass(), validator);
this.registerServerRPCHook(new RPCHook() {
@Override
@@ -618,7 +618,7 @@ public class BrokerController {
if (rpcHooks == null || rpcHooks.isEmpty()) {
return;
}
- for (RPCHook rpcHook: rpcHooks) {
+ for (RPCHook rpcHook : rpcHooks) {
this.registerServerRPCHook(rpcHook);
}
}
@@ -962,10 +962,10 @@ public class BrokerController {
private void unregisterBrokerAll() {
this.brokerOuterAPI.unregisterBrokerAll(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId());
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId());
}
public String getBrokerAddr() {
@@ -1064,30 +1064,30 @@ public class BrokerController {
topicConfigSerializeWrapper.setDataVersion(dataVersion);
ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigList.stream()
- .map(topicConfig -> {
- TopicConfig registerTopicConfig;
- if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
- || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
- registerTopicConfig =
- new TopicConfig(topicConfig.getTopicName(),
- topicConfig.getReadQueueNums(),
- topicConfig.getWriteQueueNums(),
- this.brokerConfig.getBrokerPermission());
- } else {
- registerTopicConfig = new TopicConfig(topicConfig);
- }
- return registerTopicConfig;
- })
- .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
+ .map(topicConfig -> {
+ TopicConfig registerTopicConfig;
+ if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+ || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+ registerTopicConfig =
+ new TopicConfig(topicConfig.getTopicName(),
+ topicConfig.getReadQueueNums(),
+ topicConfig.getWriteQueueNums(),
+ this.brokerConfig.getBrokerPermission());
+ } else {
+ registerTopicConfig = new TopicConfig(topicConfig);
+ }
+ return registerTopicConfig;
+ })
+ .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream()
- .map(TopicConfig::getTopicName)
- .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
- .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info)))
- .orElse(null))
- .filter(Objects::nonNull)
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ .map(TopicConfig::getTopicName)
+ .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
+ .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info)))
+ .orElse(null))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!topicQueueMappingInfoMap.isEmpty()) {
topicConfigSerializeWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
}
@@ -1103,43 +1103,43 @@ public class BrokerController {
topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
- entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
+ entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
- || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+ || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
- new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
- this.brokerConfig.getBrokerPermission());
+ new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+ this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
- this.brokerConfig.getRegisterBrokerTimeoutMills())) {
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
}
}
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
- TopicConfigAndMappingSerializeWrapper topicConfigWrapper) {
+ TopicConfigAndMappingSerializeWrapper topicConfigWrapper) {
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
- this.getHAServerAddr(),
- topicConfigWrapper,
- this.filterServerManager.buildNewFilterServerList(),
- oneway,
- this.brokerConfig.getRegisterBrokerTimeoutMills(),
- this.brokerConfig.isCompressedRegister());
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.getHAServerAddr(),
+ topicConfigWrapper,
+ this.filterServerManager.buildNewFilterServerList(),
+ oneway,
+ this.brokerConfig.getRegisterBrokerTimeoutMills(),
+ this.brokerConfig.isCompressedRegister());
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
@@ -1158,10 +1158,10 @@ public class BrokerController {
}
private boolean needRegister(final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final int timeoutMills) {
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final int timeoutMills) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
@@ -1279,7 +1279,7 @@ public class BrokerController {
}
public void setTransactionalMessageCheckService(
- TransactionalMessageCheckService transactionalMessageCheckService) {
+ TransactionalMessageCheckService transactionalMessageCheckService) {
this.transactionalMessageCheckService = transactionalMessageCheckService;
}
@@ -1296,7 +1296,7 @@ public class BrokerController {
}
public void setTransactionalMessageCheckListener(
- AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
+ AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}
@@ -1321,8 +1321,7 @@ public class BrokerController {
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
- }
- catch (Throwable e) {
+ } catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
@@ -1369,7 +1368,6 @@ public class BrokerController {
}
-
public void changeToMaster(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
return;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 6a5b31e..8a2093a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -16,15 +16,6 @@
*/
package org.apache.rocketmq.broker.out;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -44,10 +35,6 @@ import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
-import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
-import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
-import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
-import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
@@ -63,8 +50,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.common.rpc.RpcRequest;
-import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -75,6 +60,14 @@ import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
public class BrokerOuterAPI {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final RemotingClient remotingClient;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 02a0060..0d2a59b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -20,7 +20,6 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.concurrent.CompleteFuture;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.broker.BrokerController;
@@ -154,8 +153,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.function.BiConsumer;
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
@@ -651,7 +648,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
if (!mappingContext.isLeader()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
}
- //TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp
+ //TO DO should make sure the timestampOfOffset is equal or bigger than the searched timestamp
Long timestamp = requestHeader.getTimestamp();
long offset = -1;
for (int i = 0; i < mappingItems.size(); i++) {
@@ -795,7 +792,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
//this may not
return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d is not leader in broker %s, request code %d", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname(), request.getCode()))));
- };
+ }
GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
assert mappingItem != null;
@@ -862,14 +859,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
if (!mappingContext.isLeader()) {
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
- };
+ }
LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
assert mappingItem != null;
try {
requestHeader.setBname(mappingItem.getBname());
requestHeader.setLo(false);
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null);
- //TODO check if it is in current broker
+ //TO DO check if it is in current broker
RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
@@ -882,7 +879,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
- }catch (Throwable t) {
+ } catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
}
@@ -1058,7 +1055,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
}
TopicStatsTable topicStatsTable = new TopicStatsTable();
- qidItemMap.forEach( (qid, itemPair) -> {
+ qidItemMap.forEach((qid, itemPair) -> {
LogicQueueMappingItem minItem = itemPair[0];
LogicQueueMappingItem maxItem = itemPair[1];
TopicOffset minTopicOffset = statsTable.get(minItem.getBname()).getOffsetTable().get(new MessageQueue(topic, minItem.getBname(), minItem.getQueueId()));
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 66abe62..04e705b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -17,18 +17,9 @@
package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
-import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.rpc.RpcRequest;
-import org.apache.rocketmq.common.rpc.RpcResponse;
-import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
-import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
@@ -38,6 +29,12 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -45,6 +42,8 @@ import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import java.util.List;
+
import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@@ -177,7 +176,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
if (rpcResponse.getCode() == ResponseCode.SUCCESS) {
offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset();
break;
- } else if (rpcResponse.getCode() == ResponseCode.QUERY_NOT_FOUND){
+ } else if (rpcResponse.getCode() == ResponseCode.QUERY_NOT_FOUND) {
continue;
} else {
//this should not happen
@@ -234,7 +233,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
if (requestHeader.getSetZeroIfNotFound() != null && Boolean.FALSE.equals(requestHeader.getSetZeroIfNotFound())) {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, do not set to zero, maybe this group boot first");
- }else if (minOffset <= 0
+ } else if (minOffset <= 0
&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 0029318..9bb09bd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -16,19 +16,19 @@
*/
package org.apache.rocketmq.broker.slave;
-import java.io.IOException;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
-import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
-import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import java.io.IOException;
+
public class SlaveSynchronize {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
index 9518de0..0533668 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
@@ -104,7 +104,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
boolean changed = false;
long start = System.currentTimeMillis();
try {
- for(String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
+ for (String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
try {
if (isStopped()) {
break;
@@ -123,7 +123,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
if (items.size() <= 1) {
continue;
}
- if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
+ if (!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
@@ -154,7 +154,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
if (items.size() <= 1) {
continue;
}
- if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
+ if (!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 1633440..56fc792 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -21,18 +21,12 @@ import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
-import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
-import org.apache.rocketmq.common.rpc.RpcRequest;
-import org.apache.rocketmq.common.rpc.RpcResponse;
import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.common.rpc.TopicRequestHeader;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
-import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
@@ -40,12 +34,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@@ -98,7 +87,7 @@ public class TopicQueueMappingManager extends ConfigManager {
}
if (force) {
//bakeup the old items
- oldDetail.getHostedQueues().forEach( (queueId, items) -> {
+ oldDetail.getHostedQueues().forEach((queueId, items) -> {
newDetail.getHostedQueues().putIfAbsent(queueId, items);
});
topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 91d12a0..fc0b29c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -16,13 +16,6 @@
*/
package org.apache.rocketmq.client.consumer.store;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.OffsetNotFoundException;
@@ -31,13 +24,20 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* Remote storage implementation
*/
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java b/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java
index c3d275f..e73bbbf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.client.exception;
public class OffsetNotFoundException extends MQBrokerException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 1de4fdb..0c15cff 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1235,7 +1235,7 @@ public class MQClientAPIImpl {
(QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
return responseHeader.getOffset();
}
- case ResponseCode.PULL_NOT_FOUND:{
+ case ResponseCode.PULL_NOT_FOUND: {
throw new OffsetNotFoundException(response.getCode(), response.getRemark(), addr);
}
default:
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index 273add4..30e8439 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -16,17 +16,7 @@
*/
package org.apache.rocketmq.client.impl.consumer;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.PopCallback;
-import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
@@ -41,18 +31,27 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
public class PullAPIWrapper {
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 77add20..793189e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -16,28 +16,6 @@
*/
package org.apache.rocketmq.client.impl.factory;
-import java.io.UnsupportedEncodingException;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -66,11 +44,10 @@ import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.common.message.MessageQueueAssignment;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -82,12 +59,34 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import java.io.UnsupportedEncodingException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import static org.apache.rocketmq.common.rpc.ClientMetadata.topicRouteData2EndpointsForStaticTopic;
public class MQClientInstance {
@@ -169,7 +168,7 @@ public class MQClientInstance {
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
- // TODO should check the usage of raw route, it is better to remove such field
+ // TO DO should check the usage of raw route, it is better to remove such field
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
String[] brokers = route.getOrderTopicConf().split(";");
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
index b701df6..bc7586a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
@@ -17,9 +17,7 @@
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.common.rpc.RpcRequestHeader;
import org.apache.rocketmq.common.rpc.TopicRequestHeader;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
index 4554557..28a5e64 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.common.MixAll;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
index f9478e4..9fec087 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.common.message.MessageQueue;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java
index e4de3e0..7876fdf 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.common.message.MessageQueue;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java
index ca0f2d4..e3430b5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java
@@ -1,7 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.common.rpc;
-import java.util.concurrent.Future;
-
public abstract class RpcClientHook {
//if the return is not null, return it
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 97879d1..62e6ec1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -1,8 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.common.rpc;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
@@ -10,16 +25,12 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
-import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
@@ -190,7 +201,7 @@ public class RpcClientImpl implements RpcClient {
rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
break;
}
- default:{
+ default: {
rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
}
}
@@ -216,7 +227,7 @@ public class RpcClientImpl implements RpcClient {
rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null));
break;
}
- default:{
+ default: {
rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
}
}
@@ -233,7 +244,7 @@ public class RpcClientImpl implements RpcClient {
rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(responseCommand.getBody(), bodyClass)));
break;
}
- default:{
+ default: {
rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
}
}
@@ -254,7 +265,7 @@ public class RpcClientImpl implements RpcClient {
rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
break;
}
- default:{
+ default: {
rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
}
}
@@ -275,7 +286,7 @@ public class RpcClientImpl implements RpcClient {
rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
break;
}
- default:{
+ default: {
rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
}
}
@@ -296,7 +307,7 @@ public class RpcClientImpl implements RpcClient {
rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
break;
}
- default:{
+ default: {
rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
index 61dce64..40c6eef 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java
index fc096df..36fc056 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.common.rpc;
import org.apache.rocketmq.remoting.exception.RemotingException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index 76e7406..3c217f5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.common.statictopic;
import org.apache.commons.lang3.builder.EqualsBuilder;
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
index e9cf6f7..784247d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
@@ -15,9 +15,22 @@
* limitations under the License.
*/
package org.apache.rocketmq.common.statictopic;
-
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index e7e7817..75f0a4f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -224,7 +224,7 @@ public class TopicQueueMappingUtils {
if (newItem.getGen() < oldItem.getGen()) {
//the earliest item may have been deleted concurrently
inew++;
- } else if (oldItem.getGen() < newItem.getGen()){
+ } else if (oldItem.getGen() < newItem.getGen()) {
//in the following cases, the new item-list has less items than old item-list
//1. the queue is mapped back to a broker which hold the logic queue before
//2. The earliest item is deleted by TopicQueueMappingCleanService
@@ -260,7 +260,7 @@ public class TopicQueueMappingUtils {
}
int lastGen = -1;
long lastOffset = -1;
- for (int i = items.size() - 1; i >=0 ; i--) {
+ for (int i = items.size() - 1; i >= 0 ; i--) {
LogicQueueMappingItem item = items.get(i);
if (item.getStartOffset() < 0
|| item.getGen() < 0
@@ -414,9 +414,9 @@ public class TopicQueueMappingUtils {
}
public static long blockSeqRoundUp(long offset, long blockSeqSize) {
- long num = offset/blockSeqSize;
+ long num = offset / blockSeqSize;
long left = offset % blockSeqSize;
- if (left < blockSeqSize/2) {
+ if (left < blockSeqSize / 2) {
return (num + 1) * blockSeqSize;
} else {
return (num + 2) * blockSeqSize;
@@ -535,7 +535,7 @@ public class TopicQueueMappingUtils {
}
Map<String, Integer> brokerNumMapBeforeRemapping = new HashMap<String, Integer>();
for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
- if(brokerNumMapBeforeRemapping.containsKey(mappingOne.bname)) {
+ if (brokerNumMapBeforeRemapping.containsKey(mappingOne.bname)) {
brokerNumMapBeforeRemapping.put(mappingOne.bname, brokerNumMapBeforeRemapping.get(mappingOne.bname) + 1);
} else {
brokerNumMapBeforeRemapping.put(mappingOne.bname, 1);
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
index e865b6b..ce02a93 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.common.statictopic;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 0e32226..03b1640 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -28,11 +28,9 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
index 4f5d38e..001db95 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
@@ -152,9 +152,6 @@ public class RMQNormalProducer extends AbstractMQProducer {
sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
msgBodys.addData(new String(msg.getBody()));
- if (originMsgs.getAllData().contains(msg)) {
- System.out.println("Hash collision");
- }
originMsgs.addData(msg);
originMsgIndex.put(new String(msg.getBody()), metaqResult);
} catch (Exception e) {
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 6784b76..8287d81 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -17,18 +17,10 @@
package org.apache.rocketmq.test.util;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ForkJoinPool;
-import java.util.stream.Collectors;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger;
-import org.apache.rocketmq.acl.common.AclUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
@@ -42,27 +34,29 @@ import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
-import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.MQAdminUtils;
import org.apache.rocketmq.tools.command.CommandUtil;
-import org.apache.rocketmq.tools.command.MQAdminStartup;
import org.apache.rocketmq.tools.command.topic.RemappingStaticTopicSubCommand;
import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+
public class MQAdminTestUtils {
private static Logger log = Logger.getLogger(MQAdminTestUtils.class);
public static boolean createTopic(String nameSrvAddr, String clusterName, String topic,
- int queueNum) {
+ int queueNum) {
int defaultWaitTime = 5;
return createTopic(nameSrvAddr, clusterName, topic, queueNum, defaultWaitTime);
}
public static boolean createTopic(String nameSrvAddr, String clusterName, String topic,
- int queueNum, int waitTimeSec) {
+ int queueNum, int waitTimeSec) {
boolean createResult = false;
DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.setInstanceName(UUID.randomUUID().toString());
@@ -108,12 +102,12 @@ public class MQAdminTestUtils {
try {
mqAdminExt.start();
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt,
- clusterName);
+ clusterName);
for (String addr : masterSet) {
try {
mqAdminExt.createAndUpdateSubscriptionGroupConfig(addr, config);
log.info(String.format("create subscription group %s to %s success.\n", consumerId,
- addr));
+ addr));
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000 * 1);
@@ -159,37 +153,10 @@ public class MQAdminTestUtils {
return false;
}
- public void getSubConnection(String nameSrvAddr, String clusterName, String consumerId) {
- boolean createResult = true;
- DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
- mqAdminExt.setNamesrvAddr(nameSrvAddr);
- SubscriptionGroupConfig config = new SubscriptionGroupConfig();
- config.setGroupName(consumerId);
- try {
- mqAdminExt.start();
- Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt,
- clusterName);
- for (String addr : masterSet) {
- try {
-
- System.out.printf("create subscription group %s to %s success.\n", consumerId,
- addr);
- } catch (Exception e) {
- e.printStackTrace();
- Thread.sleep(1000 * 1);
- }
- }
- } catch (Exception e) {
- createResult = false;
- e.printStackTrace();
- }
- ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
- }
-
//should only be test, if some middle operation failed, it dose not backup the brokerConfigMap
public static Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
- assert brokerConfigMap.isEmpty();
+ assert brokerConfigMap.isEmpty();
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
MQAdminUtils.updateTopicConfigMappingAll(brokerConfigMap, defaultMQAdminExt, false);
@@ -221,20 +188,20 @@ public class MQAdminTestUtils {
MQAdminUtils.checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata);
// now do the remapping
//Step1: let the new leader can be write without the logicOffset
- for (String broker: brokersToMapIn) {
+ for (String broker : brokersToMapIn) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step2: forbid the write of old leader
- for (String broker: brokersToMapOut) {
+ for (String broker : brokersToMapOut) {
String addr = clientMetadata.findMasterBrokerAddr(broker);
TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
}
//Step5: write the non-target brokers
- for (String broker: brokerConfigMap.keySet()) {
+ for (String broker : brokerConfigMap.keySet()) {
if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) {
continue;
}
@@ -244,25 +211,24 @@ public class MQAdminTestUtils {
}
}
-
- public static void createStaticTopicWithCommand(String topic, int queueNum, Set<String> brokers, String cluster, String nameservers) throws Exception {
+ public static void createStaticTopicWithCommand(String topic, int queueNum, Set<String> brokers, String cluster, String nameservers) throws Exception {
UpdateStaticTopicSubCommand cmd = new UpdateStaticTopicSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] args;
if (cluster != null) {
- args = new String[] {
- "-c", cluster,
- "-t", topic,
- "-qn", String.valueOf(queueNum),
- "-n", nameservers
+ args = new String[]{
+ "-c", cluster,
+ "-t", topic,
+ "-qn", String.valueOf(queueNum),
+ "-n", nameservers
};
} else {
String brokerStr = String.join(",", brokers);
- args = new String[] {
- "-b", brokerStr,
- "-t", topic,
- "-qn", String.valueOf(queueNum),
- "-n", nameservers
+ args = new String[]{
+ "-b", brokerStr,
+ "-t", topic,
+ "-qn", String.valueOf(queueNum),
+ "-n", nameservers
};
}
final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, cmd.buildCommandlineOptions(options), new PosixParser());
@@ -276,23 +242,22 @@ public class MQAdminTestUtils {
cmd.execute(commandLine, options, null);
}
-
public static void remappingStaticTopicWithCommand(String topic, Set<String> brokers, String cluster, String nameservers) throws Exception {
RemappingStaticTopicSubCommand cmd = new RemappingStaticTopicSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] args;
if (cluster != null) {
- args = new String[] {
- "-c", cluster,
- "-t", topic,
- "-n", nameservers
+ args = new String[]{
+ "-c", cluster,
+ "-t", topic,
+ "-n", nameservers
};
} else {
String brokerStr = String.join(",", brokers);
- args = new String[] {
- "-b", brokerStr,
- "-t", topic,
- "-n", nameservers
+ args = new String[]{
+ "-b", brokerStr,
+ "-t", topic,
+ "-n", nameservers
};
}
final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, cmd.buildCommandlineOptions(options), new PosixParser());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 5c80e86..6f551d3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -45,8 +45,6 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.rpc.ClientMetadata;
-import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.topic.TopicValidator;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index d1f0fcd..fce4318 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -64,11 +64,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.rpc.ClientMetadata;
-import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
-import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index c4838e3..35efe96 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -44,8 +44,6 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.rpc.ClientMetadata;
-import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
index 0fda471..a5aab4d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -1,15 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.tools.admin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
@@ -160,7 +174,7 @@ public class MQAdminUtils {
if (topicOffset == null) {
throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
}
- //TODO check the max offset, will it return -1?
+ //TO DO check the max offset, will it return -1?
if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
}
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index a3b757a..ba4e54e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -107,7 +107,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
}
MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force, defaultMQAdminExt);
return;
- }catch (Exception e) {
+ } catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
@@ -138,7 +138,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
try {
defaultMQAdminExt.start();
- if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))) {
+ if (!commandLine.hasOption("b") && !commandLine.hasOption('c')) {
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
return;
}
@@ -184,14 +184,14 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
{
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet<String>(), new HashSet<String>());
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
- System.out.println("The old mapping data is written to file " + oldMappingDataFile);
+ System.out.printf("The old mapping data is written to file " + oldMappingDataFile + "\n");
}
TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
{
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
- System.out.println("The old mapping data is written to file " + newMappingDataFile);
+ System.out.printf("The old mapping data is written to file " + newMappingDataFile + "\n");
}
MQAdminUtils.completeNoTargetBrokers(newWrapper.getBrokerConfigMap(), defaultMQAdminExt);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 410381e..d67b8fb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -21,10 +21,9 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.rpc.ClientMetadata;
import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
@@ -110,7 +109,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
MQAdminUtils.completeNoTargetBrokers(wrapper.getBrokerConfigMap(), defaultMQAdminExt);
MQAdminUtils.updateTopicConfigMappingAll(wrapper.getBrokerConfigMap(), defaultMQAdminExt, false);
return;
- }catch (Exception e) {
+ } catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
@@ -185,7 +184,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
{
TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet<String>(), new HashSet<String>());
String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
- System.out.println("The old mapping data is written to file " + oldMappingDataFile);
+ System.out.printf("The old mapping data is written to file " + oldMappingDataFile + "\n");
}
//add the existed brokers to target brokers
targetBrokers.addAll(brokerConfigMap.keySet());
@@ -195,7 +194,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
{
String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
- System.out.println("The new mapping data is written to file " + newMappingDataFile);
+ System.out.printf("The new mapping data is written to file " + newMappingDataFile + "\n");
}
MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);