You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/12/15 06:59:57 UTC

[rocketmq] 01/02: Fix check stype

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit ba2c768f5ff6fb22a5b9a3660bdbb078b95c185f
Author: dongeforever <do...@apache.org>
AuthorDate: Wed Dec 15 14:57:32 2021 +0800

    Fix check stype
---
 .../apache/rocketmq/broker/BrokerController.java   | 304 ++++++++++-----------
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  23 +-
 .../broker/processor/AdminBrokerProcessor.java     |  15 +-
 .../broker/processor/ConsumerManageProcessor.java  |  21 +-
 .../rocketmq/broker/slave/SlaveSynchronize.java    |   8 +-
 .../topic/TopicQueueMappingCleanService.java       |   6 +-
 .../broker/topic/TopicQueueMappingManager.java     |  13 +-
 .../consumer/store/RemoteBrokerOffsetStore.java    |  18 +-
 .../client/exception/OffsetNotFoundException.java  |  16 ++
 .../rocketmq/client/impl/MQClientAPIImpl.java      |   2 +-
 .../client/impl/consumer/PullAPIWrapper.java       |  21 +-
 .../client/impl/factory/MQClientInstance.java      |  51 ++--
 .../header/GetTopicConfigRequestHeader.java        |   2 -
 .../apache/rocketmq/common/rpc/ClientMetadata.java |  16 ++
 .../apache/rocketmq/common/rpc/RequestBuilder.java |  16 ++
 .../org/apache/rocketmq/common/rpc/RpcClient.java  |  16 ++
 .../apache/rocketmq/common/rpc/RpcClientHook.java  |  18 +-
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  |  33 ++-
 .../apache/rocketmq/common/rpc/RpcClientUtils.java |  16 ++
 .../apache/rocketmq/common/rpc/RpcException.java   |  16 ++
 .../common/statictopic/LogicQueueMappingItem.java  |  16 ++
 .../common/statictopic/TopicQueueMappingInfo.java  |  19 +-
 .../common/statictopic/TopicQueueMappingUtils.java |  10 +-
 .../statictopic/TopicRemappingDetailWrapper.java   |  16 ++
 .../remoting/protocol/RemotingCommand.java         |   2 -
 .../test/client/rmq/RMQNormalProducer.java         |   3 -
 .../rocketmq/test/util/MQAdminTestUtils.java       | 101 +++----
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   2 -
 .../tools/admin/DefaultMQAdminExtImpl.java         |   4 -
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   2 -
 .../apache/rocketmq/tools/admin/MQAdminUtils.java  |  20 +-
 .../topic/RemappingStaticTopicSubCommand.java      |   8 +-
 .../command/topic/UpdateStaticTopicSubCommand.java |   9 +-
 33 files changed, 473 insertions(+), 370 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 338f31e..d0d319d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -157,7 +157,7 @@ public class BrokerController {
     private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
     private final BrokerOuterAPI brokerOuterAPI;
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
-        "BrokerControllerScheduledThread"));
+            "BrokerControllerScheduledThread"));
     private final SlaveSynchronize slaveSynchronize;
     private final BlockingQueue<Runnable> sendThreadPoolQueue;
     private final BlockingQueue<Runnable> ackThreadPoolQueue;
@@ -200,14 +200,14 @@ public class BrokerController {
     private TransactionalMessageService transactionalMessageService;
     private AbstractTransactionalMessageCheckListener transactionalMessageCheckListener;
     private Future<?> slaveSyncFuture;
-    private Map<Class,AccessValidator> accessValidatorMap = new HashMap<Class, AccessValidator>();
+    private Map<Class, AccessValidator> accessValidatorMap = new HashMap<Class, AccessValidator>();
     private long shouldStartTime;
 
     public BrokerController(
-        final BrokerConfig brokerConfig,
-        final NettyServerConfig nettyServerConfig,
-        final NettyClientConfig nettyClientConfig,
-        final MessageStoreConfig messageStoreConfig
+            final BrokerConfig brokerConfig,
+            final NettyServerConfig nettyServerConfig,
+            final NettyClientConfig nettyClientConfig,
+            final MessageStoreConfig messageStoreConfig
     ) {
         this.brokerConfig = brokerConfig;
         this.nettyServerConfig = nettyServerConfig;
@@ -223,7 +223,7 @@ public class BrokerController {
         this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this);
         this.sendMessageProcessor = new SendMessageProcessor(this);
         this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService,
-            this.popMessageProcessor);
+                this.popMessageProcessor);
         this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
         this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
         this.consumerFilterManager = new ConsumerFilterManager(this);
@@ -255,9 +255,9 @@ public class BrokerController {
 
         this.brokerFastFailure = new BrokerFastFailure(this);
         this.configuration = new Configuration(
-            log,
-            BrokerPathConfigHelper.getBrokerConfigPath(),
-            this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
+                log,
+                BrokerPathConfigHelper.getBrokerConfigPath(),
+                this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
         );
     }
 
@@ -297,11 +297,11 @@ public class BrokerController {
         if (result) {
             try {
                 this.messageStore =
-                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
-                        this.brokerConfig);
+                        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
+                                this.brokerConfig);
                 if (messageStoreConfig.isEnableDLegerCommitLog()) {
                     DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
-                    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
+                    ((DLedgerCommitLog) ((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
                 }
                 this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                 //load plugin
@@ -322,77 +322,77 @@ public class BrokerController {
             fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
             this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
             this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                this.brokerConfig.getSendMessageThreadPoolNums(),
-                this.brokerConfig.getSendMessageThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.sendThreadPoolQueue,
-                new ThreadFactoryImpl("SendMessageThread_"));
+                    this.brokerConfig.getSendMessageThreadPoolNums(),
+                    this.brokerConfig.getSendMessageThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.sendThreadPoolQueue,
+                    new ThreadFactoryImpl("SendMessageThread_"));
 
             this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                this.brokerConfig.getPullMessageThreadPoolNums(),
-                this.brokerConfig.getPullMessageThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.pullThreadPoolQueue,
-                new ThreadFactoryImpl("PullMessageThread_"));
+                    this.brokerConfig.getPullMessageThreadPoolNums(),
+                    this.brokerConfig.getPullMessageThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.pullThreadPoolQueue,
+                    new ThreadFactoryImpl("PullMessageThread_"));
 
             this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                this.brokerConfig.getAckMessageThreadPoolNums(),
-                this.brokerConfig.getAckMessageThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.ackThreadPoolQueue,
-                new ThreadFactoryImpl("AckMessageThread_"));
+                    this.brokerConfig.getAckMessageThreadPoolNums(),
+                    this.brokerConfig.getAckMessageThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.ackThreadPoolQueue,
+                    new ThreadFactoryImpl("AckMessageThread_"));
 
 
             this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
-                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.replyThreadPoolQueue,
-                new ThreadFactoryImpl("ProcessReplyMessageThread_"));
+                    this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+                    this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.replyThreadPoolQueue,
+                    new ThreadFactoryImpl("ProcessReplyMessageThread_"));
 
             this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                this.brokerConfig.getQueryMessageThreadPoolNums(),
-                this.brokerConfig.getQueryMessageThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.queryThreadPoolQueue,
-                new ThreadFactoryImpl("QueryMessageThread_"));
+                    this.brokerConfig.getQueryMessageThreadPoolNums(),
+                    this.brokerConfig.getQueryMessageThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.queryThreadPoolQueue,
+                    new ThreadFactoryImpl("QueryMessageThread_"));
 
             this.adminBrokerExecutor =
-                Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
-                    "AdminBrokerThread_"));
+                    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
+                            "AdminBrokerThread_"));
 
             this.clientManageExecutor = new ThreadPoolExecutor(
-                this.brokerConfig.getClientManageThreadPoolNums(),
-                this.brokerConfig.getClientManageThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.clientManagerThreadPoolQueue,
-                new ThreadFactoryImpl("ClientManageThread_"));
+                    this.brokerConfig.getClientManageThreadPoolNums(),
+                    this.brokerConfig.getClientManageThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.clientManagerThreadPoolQueue,
+                    new ThreadFactoryImpl("ClientManageThread_"));
 
             this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
-                this.brokerConfig.getHeartbeatThreadPoolNums(),
-                this.brokerConfig.getHeartbeatThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.heartbeatThreadPoolQueue,
-                new ThreadFactoryImpl("HeartbeatThread_", true));
+                    this.brokerConfig.getHeartbeatThreadPoolNums(),
+                    this.brokerConfig.getHeartbeatThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.heartbeatThreadPoolQueue,
+                    new ThreadFactoryImpl("HeartbeatThread_", true));
 
             this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
-                this.brokerConfig.getEndTransactionThreadPoolNums(),
-                this.brokerConfig.getEndTransactionThreadPoolNums(),
-                1000 * 60,
-                TimeUnit.MILLISECONDS,
-                this.endTransactionThreadPoolQueue,
-                new ThreadFactoryImpl("EndTransactionThread_"));
+                    this.brokerConfig.getEndTransactionThreadPoolNums(),
+                    this.brokerConfig.getEndTransactionThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.endTransactionThreadPoolQueue,
+                    new ThreadFactoryImpl("EndTransactionThread_"));
 
             this.consumerManageExecutor =
-                Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
-                    "ConsumerManageThread_"));
+                    Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
+                            "ConsumerManageThread_"));
 
             this.registerProcessor();
 
@@ -466,8 +466,8 @@ public class BrokerController {
             }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
 
             this.loadBalanceExecutor =
-                Executors.newFixedThreadPool(this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), new ThreadFactoryImpl(
-                    "LoadBalanceProcessorThread_"));
+                    Executors.newFixedThreadPool(this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(), new ThreadFactoryImpl(
+                            "LoadBalanceProcessorThread_"));
 
             if (this.brokerConfig.getNamesrvAddr() != null) {
                 this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
@@ -523,38 +523,38 @@ public class BrokerController {
                 // Register a listener to reload SslContext
                 try {
                     fileWatchService = new FileWatchService(
-                        new String[] {
-                            TlsSystemConfig.tlsServerCertPath,
-                            TlsSystemConfig.tlsServerKeyPath,
-                            TlsSystemConfig.tlsServerTrustCertPath
-                        },
-                        new FileWatchService.Listener() {
-                            boolean certChanged, keyChanged = false;
-
-                            @Override
-                            public void onChanged(String path) {
-                                if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
-                                    log.info("The trust certificate changed, reload the ssl context");
-                                    reloadServerSslContext();
+                            new String[]{
+                                TlsSystemConfig.tlsServerCertPath,
+                                TlsSystemConfig.tlsServerKeyPath,
+                                TlsSystemConfig.tlsServerTrustCertPath
+                            },
+                            new FileWatchService.Listener() {
+                                boolean certChanged, keyChanged = false;
+
+                                @Override
+                                public void onChanged(String path) {
+                                    if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
+                                        log.info("The trust certificate changed, reload the ssl context");
+                                        reloadServerSslContext();
+                                    }
+                                    if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
+                                        certChanged = true;
+                                    }
+                                    if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
+                                        keyChanged = true;
+                                    }
+                                    if (certChanged && keyChanged) {
+                                        log.info("The certificate and private key changed, reload the ssl context");
+                                        certChanged = keyChanged = false;
+                                        reloadServerSslContext();
+                                    }
                                 }
-                                if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
-                                    certChanged = true;
-                                }
-                                if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
-                                    keyChanged = true;
-                                }
-                                if (certChanged && keyChanged) {
-                                    log.info("The certificate and private key changed, reload the ssl context");
-                                    certChanged = keyChanged = false;
-                                    reloadServerSslContext();
-                                }
-                            }
 
-                            private void reloadServerSslContext() {
-                                ((NettyRemotingServer) remotingServer).loadSslContext();
-                                ((NettyRemotingServer) fastRemotingServer).loadSslContext();
-                            }
-                        });
+                                private void reloadServerSslContext() {
+                                    ((NettyRemotingServer) remotingServer).loadSslContext();
+                                    ((NettyRemotingServer) fastRemotingServer).loadSslContext();
+                                }
+                            });
                 } catch (Exception e) {
                     log.warn("FileWatchService created error, can't load the certificate dynamically");
                 }
@@ -593,9 +593,9 @@ public class BrokerController {
             return;
         }
 
-        for (AccessValidator accessValidator: accessValidators) {
+        for (AccessValidator accessValidator : accessValidators) {
             final AccessValidator validator = accessValidator;
-            accessValidatorMap.put(validator.getClass(),validator);
+            accessValidatorMap.put(validator.getClass(), validator);
             this.registerServerRPCHook(new RPCHook() {
 
                 @Override
@@ -618,7 +618,7 @@ public class BrokerController {
         if (rpcHooks == null || rpcHooks.isEmpty()) {
             return;
         }
-        for (RPCHook rpcHook: rpcHooks) {
+        for (RPCHook rpcHook : rpcHooks) {
             this.registerServerRPCHook(rpcHook);
         }
     }
@@ -962,10 +962,10 @@ public class BrokerController {
 
     private void unregisterBrokerAll() {
         this.brokerOuterAPI.unregisterBrokerAll(
-            this.brokerConfig.getBrokerClusterName(),
-            this.getBrokerAddr(),
-            this.brokerConfig.getBrokerName(),
-            this.brokerConfig.getBrokerId());
+                this.brokerConfig.getBrokerClusterName(),
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId());
     }
 
     public String getBrokerAddr() {
@@ -1064,30 +1064,30 @@ public class BrokerController {
         topicConfigSerializeWrapper.setDataVersion(dataVersion);
 
         ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigList.stream()
-            .map(topicConfig -> {
-                TopicConfig registerTopicConfig;
-                if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
-                    || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
-                    registerTopicConfig =
-                        new TopicConfig(topicConfig.getTopicName(),
-                            topicConfig.getReadQueueNums(),
-                            topicConfig.getWriteQueueNums(),
-                            this.brokerConfig.getBrokerPermission());
-                } else {
-                    registerTopicConfig = new TopicConfig(topicConfig);
-                }
-                return registerTopicConfig;
-            })
-            .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
+                .map(topicConfig -> {
+                    TopicConfig registerTopicConfig;
+                    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+                            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+                        registerTopicConfig =
+                                new TopicConfig(topicConfig.getTopicName(),
+                                        topicConfig.getReadQueueNums(),
+                                        topicConfig.getWriteQueueNums(),
+                                        this.brokerConfig.getBrokerPermission());
+                    } else {
+                        registerTopicConfig = new TopicConfig(topicConfig);
+                    }
+                    return registerTopicConfig;
+                })
+                .collect(Collectors.toConcurrentMap(TopicConfig::getTopicName, Function.identity()));
         topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
 
         Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = topicConfigList.stream()
-            .map(TopicConfig::getTopicName)
-            .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
-                .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info)))
-                .orElse(null))
-            .filter(Objects::nonNull)
-            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+                .map(TopicConfig::getTopicName)
+                .map(topicName -> Optional.ofNullable(this.topicQueueMappingManager.getTopicQueueMapping(topicName))
+                        .map(info -> new AbstractMap.SimpleImmutableEntry<>(topicName, TopicQueueMappingDetail.cloneAsMappingInfo(info)))
+                        .orElse(null))
+                .filter(Objects::nonNull)
+                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
         if (!topicQueueMappingInfoMap.isEmpty()) {
             topicConfigSerializeWrapper.setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
         }
@@ -1103,43 +1103,43 @@ public class BrokerController {
         topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());
 
         topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
-                entry ->  new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
+            entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
         ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
 
         if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
-            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
             ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
             for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                 TopicConfig tmp =
-                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
-                        this.brokerConfig.getBrokerPermission());
+                        new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+                                this.brokerConfig.getBrokerPermission());
                 topicConfigTable.put(topicConfig.getTopicName(), tmp);
             }
             topicConfigWrapper.setTopicConfigTable(topicConfigTable);
         }
 
         if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
-            this.getBrokerAddr(),
-            this.brokerConfig.getBrokerName(),
-            this.brokerConfig.getBrokerId(),
-            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId(),
+                this.brokerConfig.getRegisterBrokerTimeoutMills())) {
             doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
         }
     }
 
     private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
-        TopicConfigAndMappingSerializeWrapper topicConfigWrapper) {
+                                     TopicConfigAndMappingSerializeWrapper topicConfigWrapper) {
         List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
-            this.brokerConfig.getBrokerClusterName(),
-            this.getBrokerAddr(),
-            this.brokerConfig.getBrokerName(),
-            this.brokerConfig.getBrokerId(),
-            this.getHAServerAddr(),
-            topicConfigWrapper,
-            this.filterServerManager.buildNewFilterServerList(),
-            oneway,
-            this.brokerConfig.getRegisterBrokerTimeoutMills(),
-            this.brokerConfig.isCompressedRegister());
+                this.brokerConfig.getBrokerClusterName(),
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId(),
+                this.getHAServerAddr(),
+                topicConfigWrapper,
+                this.filterServerManager.buildNewFilterServerList(),
+                oneway,
+                this.brokerConfig.getRegisterBrokerTimeoutMills(),
+                this.brokerConfig.isCompressedRegister());
 
         if (registerBrokerResultList.size() > 0) {
             RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
@@ -1158,10 +1158,10 @@ public class BrokerController {
     }
 
     private boolean needRegister(final String clusterName,
-        final String brokerAddr,
-        final String brokerName,
-        final long brokerId,
-        final int timeoutMills) {
+                                 final String brokerAddr,
+                                 final String brokerName,
+                                 final long brokerId,
+                                 final int timeoutMills) {
 
         TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
         List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
@@ -1279,7 +1279,7 @@ public class BrokerController {
     }
 
     public void setTransactionalMessageCheckService(
-        TransactionalMessageCheckService transactionalMessageCheckService) {
+            TransactionalMessageCheckService transactionalMessageCheckService) {
         this.transactionalMessageCheckService = transactionalMessageCheckService;
     }
 
@@ -1296,7 +1296,7 @@ public class BrokerController {
     }
 
     public void setTransactionalMessageCheckListener(
-        AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
+            AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
         this.transactionalMessageCheckListener = transactionalMessageCheckListener;
     }
 
@@ -1321,8 +1321,7 @@ public class BrokerController {
                 public void run() {
                     try {
                         BrokerController.this.slaveSynchronize.syncAll();
-                    }
-                    catch (Throwable e) {
+                    } catch (Throwable e) {
                         log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                     }
                 }
@@ -1369,7 +1368,6 @@ public class BrokerController {
     }
 
 
-
     public void changeToMaster(BrokerRole role) {
         if (role == BrokerRole.SLAVE) {
             return;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 6a5b31e..8a2093a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -16,15 +16,6 @@
  */
 package org.apache.rocketmq.broker.out;
 
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import com.alibaba.fastjson.JSON;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
 import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -44,10 +35,6 @@ import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
-import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
-import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
-import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
-import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
@@ -63,8 +50,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.common.rpc.RpcRequest;
-import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -75,6 +60,14 @@ import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 public class BrokerOuterAPI {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final RemotingClient remotingClient;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 02a0060..0d2a59b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -20,7 +20,6 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.concurrent.CompleteFuture;
 import org.apache.rocketmq.acl.AccessValidator;
 import org.apache.rocketmq.acl.plain.PlainAccessValidator;
 import org.apache.rocketmq.broker.BrokerController;
@@ -154,8 +153,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.function.BiConsumer;
 
 import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
 
@@ -651,7 +648,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             if (!mappingContext.isLeader()) {
                 return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
             }
-            //TODO should make sure the timestampOfOffset is equal or bigger than the searched timestamp
+            //TO DO should make sure the timestampOfOffset is equal or bigger than the searched timestamp
             Long timestamp = requestHeader.getTimestamp();
             long offset = -1;
             for (int i = 0; i < mappingItems.size(); i++) {
@@ -795,7 +792,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             //this may not
             return CompletableFuture.completedFuture(new RpcResponse(new RpcException(ResponseCode.NOT_LEADER_FOR_QUEUE,
                     String.format("%s-%d is not leader in broker %s, request code %d", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname(), request.getCode()))));
-        };
+        }
         GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.getHeader();
         LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
         assert  mappingItem != null;
@@ -862,14 +859,14 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
         TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
         if (!mappingContext.isLeader()) {
             return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d does not exit in request process of current broker %s", mappingContext.getTopic(), mappingContext.getGlobalId(), mappingDetail.getBname()));
-        };
+        }
         LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
         assert mappingItem != null;
         try {
             requestHeader.setBname(mappingItem.getBname());
             requestHeader.setLo(false);
             RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader, null);
-            //TODO check if it is in current broker
+            //TO DO check if it is in current broker
             RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
             if (rpcResponse.getException() != null) {
                 throw rpcResponse.getException();
@@ -882,7 +879,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
             response.setCode(ResponseCode.SUCCESS);
             response.setRemark(null);
             return response;
-        }catch (Throwable t) {
+        } catch (Throwable t) {
             return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
         }
     }
@@ -1058,7 +1055,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements
                 statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
             }
             TopicStatsTable topicStatsTable = new TopicStatsTable();
-            qidItemMap.forEach( (qid, itemPair) -> {
+            qidItemMap.forEach((qid, itemPair) -> {
                 LogicQueueMappingItem minItem = itemPair[0];
                 LogicQueueMappingItem maxItem = itemPair[1];
                 TopicOffset minTopicOffset = statsTable.get(minItem.getBname()).getOffsetTable().get(new MessageQueue(topic, minItem.getBname(), minItem.getQueueId()));
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 66abe62..04e705b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -17,18 +17,9 @@
 package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.ChannelHandlerContext;
-import java.util.List;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.rpc.RpcRequest;
-import org.apache.rocketmq.common.rpc.RpcResponse;
-import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
-import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
@@ -38,6 +29,12 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.rpc.RpcRequest;
+import org.apache.rocketmq.common.rpc.RpcResponse;
+import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
+import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -45,6 +42,8 @@ import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import java.util.List;
+
 import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
 
 public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@@ -177,7 +176,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
                     if (rpcResponse.getCode() == ResponseCode.SUCCESS) {
                         offset = ((QueryConsumerOffsetResponseHeader) rpcResponse.getHeader()).getOffset();
                         break;
-                    } else if (rpcResponse.getCode() == ResponseCode.QUERY_NOT_FOUND){
+                    } else if (rpcResponse.getCode() == ResponseCode.QUERY_NOT_FOUND) {
                         continue;
                     } else {
                         //this should not happen
@@ -234,7 +233,7 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
             if (requestHeader.getSetZeroIfNotFound() != null && Boolean.FALSE.equals(requestHeader.getSetZeroIfNotFound())) {
                 response.setCode(ResponseCode.QUERY_NOT_FOUND);
                 response.setRemark("Not found, do not set to zero, maybe this group boot first");
-            }else if (minOffset <= 0
+            } else if (minOffset <= 0
                 && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
                 requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
                 responseHeader.setOffset(0L);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 0029318..9bb09bd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -16,19 +16,19 @@
  */
 package org.apache.rocketmq.broker.slave;
 
-import java.io.IOException;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
-import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
-import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 
+import java.io.IOException;
+
 public class SlaveSynchronize {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
index 9518de0..0533668 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java
@@ -104,7 +104,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
         boolean changed = false;
         long start = System.currentTimeMillis();
         try {
-            for(String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
+            for (String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
                 try {
                     if (isStopped()) {
                         break;
@@ -123,7 +123,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
                         if (items.size() <= 1) {
                             continue;
                         }
-                        if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
+                        if (!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
                             continue;
                         }
                         LogicQueueMappingItem earlistItem = items.get(0);
@@ -154,7 +154,7 @@ public class TopicQueueMappingCleanService extends ServiceThread {
                         if (items.size() <= 1) {
                             continue;
                         }
-                        if(!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
+                        if (!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
                             continue;
                         }
                         LogicQueueMappingItem earlistItem = items.get(0);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index 1633440..56fc792 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -21,18 +21,12 @@ import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
-import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
-import org.apache.rocketmq.common.rpc.RpcRequest;
-import org.apache.rocketmq.common.rpc.RpcResponse;
 import org.apache.rocketmq.common.rpc.TopicQueueRequestHeader;
 import org.apache.rocketmq.common.rpc.TopicRequestHeader;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
-import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingContext;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
@@ -40,12 +34,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -98,7 +87,7 @@ public class TopicQueueMappingManager extends ConfigManager {
             }
             if (force) {
                 //bakeup the old items
-                oldDetail.getHostedQueues().forEach( (queueId, items) -> {
+                oldDetail.getHostedQueues().forEach((queueId, items) -> {
                     newDetail.getHostedQueues().putIfAbsent(queueId, items);
                 });
                 topicQueueMappingTable.put(newDetail.getTopic(), newDetail);
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 91d12a0..fc0b29c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -16,13 +16,6 @@
  */
 package org.apache.rocketmq.client.consumer.store;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.exception.OffsetNotFoundException;
@@ -31,13 +24,20 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Remote storage implementation
  */
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java b/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java
index c3d275f..e73bbbf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/OffsetNotFoundException.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.client.exception;
 
 public class OffsetNotFoundException extends MQBrokerException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 1de4fdb..0c15cff 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1235,7 +1235,7 @@ public class MQClientAPIImpl {
                     (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
                 return responseHeader.getOffset();
             }
-            case ResponseCode.PULL_NOT_FOUND:{
+            case ResponseCode.PULL_NOT_FOUND: {
                 throw new OffsetNotFoundException(response.getCode(), response.getRemark(), addr);
             }
             default:
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index 273add4..30e8439 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -16,17 +16,7 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.alibaba.fastjson.JSON;
 import org.apache.rocketmq.client.consumer.PopCallback;
-import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
@@ -41,18 +31,27 @@ import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class PullAPIWrapper {
     private final InternalLogger log = ClientLogger.getLog();
     private final MQClientInstance mQClientFactory;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 77add20..793189e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -16,28 +16,6 @@
  */
 package org.apache.rocketmq.client.impl.factory;
 
-import java.io.UnsupportedEncodingException;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import com.alibaba.fastjson.JSON;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
@@ -66,11 +44,10 @@ import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.filter.ExpressionType;
-import org.apache.rocketmq.common.message.MessageQueueAssignment;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -82,12 +59,34 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+import java.io.UnsupportedEncodingException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import static org.apache.rocketmq.common.rpc.ClientMetadata.topicRouteData2EndpointsForStaticTopic;
 
 public class MQClientInstance {
@@ -169,7 +168,7 @@ public class MQClientInstance {
 
     public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
         TopicPublishInfo info = new TopicPublishInfo();
-        // TODO should check the usage of raw route, it is better to remove such field
+        // TO DO should check the usage of raw route, it is better to remove such field
         info.setTopicRouteData(route);
         if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
             String[] brokers = route.getOrderTopicConf().split(";");
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
index b701df6..bc7586a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetTopicConfigRequestHeader.java
@@ -17,9 +17,7 @@
 
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.common.rpc.RpcRequestHeader;
 import org.apache.rocketmq.common.rpc.TopicRequestHeader;
-import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
index 4554557..28a5e64 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/ClientMetadata.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.common.rpc;
 
 import org.apache.rocketmq.common.MixAll;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
index f9478e4..9fec087 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.common.rpc;
 
 import org.apache.rocketmq.common.message.MessageQueue;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java
index e4de3e0..7876fdf 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClient.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.common.rpc;
 
 import org.apache.rocketmq.common.message.MessageQueue;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java
index ca0f2d4..e3430b5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientHook.java
@@ -1,7 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.common.rpc;
 
-import java.util.concurrent.Future;
-
 public abstract class RpcClientHook {
 
     //if the return is not null, return it
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 97879d1..62e6ec1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -1,8 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.common.rpc;
 
 import io.netty.util.concurrent.ImmediateEventExecutor;
 import io.netty.util.concurrent.Promise;
-import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
@@ -10,16 +25,12 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
-import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
@@ -190,7 +201,7 @@ public class RpcClientImpl implements RpcClient {
                 rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
                 break;
             }
-            default:{
+            default: {
                 rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
             }
         }
@@ -216,7 +227,7 @@ public class RpcClientImpl implements RpcClient {
                 rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), null, null));
                 break;
             }
-            default:{
+            default: {
                 rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
             }
         }
@@ -233,7 +244,7 @@ public class RpcClientImpl implements RpcClient {
                 rpcResponsePromise.setSuccess(new RpcResponse(ResponseCode.SUCCESS, null, RemotingSerializable.decode(responseCommand.getBody(), bodyClass)));
                 break;
             }
-            default:{
+            default: {
                 rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
             }
         }
@@ -254,7 +265,7 @@ public class RpcClientImpl implements RpcClient {
                 rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
                 break;
             }
-            default:{
+            default: {
                 rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
             }
         }
@@ -275,7 +286,7 @@ public class RpcClientImpl implements RpcClient {
                 rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
                 break;
             }
-            default:{
+            default: {
                 rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
             }
         }
@@ -296,7 +307,7 @@ public class RpcClientImpl implements RpcClient {
                 rpcResponsePromise.setSuccess(new RpcResponse(responseCommand.getCode(), responseHeader, responseCommand.getBody()));
                 break;
             }
-            default:{
+            default: {
                 rpcResponsePromise.setSuccess(new RpcResponse(new RpcException(responseCommand.getCode(), "unknown remote error")));
             }
         }
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
index 61dce64..40c6eef 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.common.rpc;
 
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java
index fc096df..36fc056 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcException.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.common.rpc;
 
 import org.apache.rocketmq.remoting.exception.RemotingException;
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
index 76e7406..3c217f5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/LogicQueueMappingItem.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.common.statictopic;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
index e9cf6f7..784247d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingInfo.java
@@ -15,9 +15,22 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.common.statictopic;
-
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
index e7e7817..75f0a4f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicQueueMappingUtils.java
@@ -224,7 +224,7 @@ public class TopicQueueMappingUtils {
             if (newItem.getGen() < oldItem.getGen()) {
                 //the earliest item may have been deleted concurrently
                 inew++;
-            } else if (oldItem.getGen() < newItem.getGen()){
+            } else if (oldItem.getGen() < newItem.getGen()) {
                 //in the following cases, the new item-list has less items than old item-list
                 //1. the queue is mapped back to a broker which hold the logic queue before
                 //2. The earliest item is deleted by  TopicQueueMappingCleanService
@@ -260,7 +260,7 @@ public class TopicQueueMappingUtils {
         }
         int lastGen = -1;
         long lastOffset = -1;
-        for (int i = items.size() - 1; i >=0 ; i--) {
+        for (int i = items.size() - 1; i >= 0 ; i--) {
             LogicQueueMappingItem item = items.get(i);
             if (item.getStartOffset() < 0
                     || item.getGen() < 0
@@ -414,9 +414,9 @@ public class TopicQueueMappingUtils {
     }
 
     public static long blockSeqRoundUp(long offset, long blockSeqSize) {
-        long num = offset/blockSeqSize;
+        long num = offset / blockSeqSize;
         long left = offset % blockSeqSize;
-        if (left < blockSeqSize/2) {
+        if (left < blockSeqSize / 2) {
             return (num + 1) * blockSeqSize;
         } else {
             return (num + 2) * blockSeqSize;
@@ -535,7 +535,7 @@ public class TopicQueueMappingUtils {
         }
         Map<String, Integer> brokerNumMapBeforeRemapping = new HashMap<String, Integer>();
         for (TopicQueueMappingOne mappingOne: globalIdMap.values()) {
-            if(brokerNumMapBeforeRemapping.containsKey(mappingOne.bname)) {
+            if (brokerNumMapBeforeRemapping.containsKey(mappingOne.bname)) {
                 brokerNumMapBeforeRemapping.put(mappingOne.bname, brokerNumMapBeforeRemapping.get(mappingOne.bname) + 1);
             } else {
                 brokerNumMapBeforeRemapping.put(mappingOne.bname, 1);
diff --git a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
index e865b6b..ce02a93 100644
--- a/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/statictopic/TopicRemappingDetailWrapper.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.common.statictopic;
 
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 0e32226..03b1640 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -28,11 +28,9 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
index 4f5d38e..001db95 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
@@ -152,9 +152,6 @@ public class RMQNormalProducer extends AbstractMQProducer {
             sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
             sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
             msgBodys.addData(new String(msg.getBody()));
-            if (originMsgs.getAllData().contains(msg)) {
-                System.out.println("Hash collision");
-            }
             originMsgs.addData(msg);
             originMsgIndex.put(new String(msg.getBody()), metaqResult);
         } catch (Exception e) {
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index 6784b76..8287d81 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -17,18 +17,10 @@
 
 package org.apache.rocketmq.test.util;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ForkJoinPool;
-import java.util.stream.Collectors;
-
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.log4j.Logger;
-import org.apache.rocketmq.acl.common.AclUtils;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
@@ -42,27 +34,29 @@ import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
-import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
-import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.admin.MQAdminUtils;
 import org.apache.rocketmq.tools.command.CommandUtil;
-import org.apache.rocketmq.tools.command.MQAdminStartup;
 import org.apache.rocketmq.tools.command.topic.RemappingStaticTopicSubCommand;
 import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ForkJoinPool;
+
 public class MQAdminTestUtils {
     private static Logger log = Logger.getLogger(MQAdminTestUtils.class);
 
     public static boolean createTopic(String nameSrvAddr, String clusterName, String topic,
-        int queueNum) {
+                                      int queueNum) {
         int defaultWaitTime = 5;
         return createTopic(nameSrvAddr, clusterName, topic, queueNum, defaultWaitTime);
     }
 
     public static boolean createTopic(String nameSrvAddr, String clusterName, String topic,
-        int queueNum, int waitTimeSec) {
+                                      int queueNum, int waitTimeSec) {
         boolean createResult = false;
         DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
         mqAdminExt.setInstanceName(UUID.randomUUID().toString());
@@ -108,12 +102,12 @@ public class MQAdminTestUtils {
         try {
             mqAdminExt.start();
             Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt,
-                clusterName);
+                    clusterName);
             for (String addr : masterSet) {
                 try {
                     mqAdminExt.createAndUpdateSubscriptionGroupConfig(addr, config);
                     log.info(String.format("create subscription group %s to %s success.\n", consumerId,
-                        addr));
+                            addr));
                 } catch (Exception e) {
                     e.printStackTrace();
                     Thread.sleep(1000 * 1);
@@ -159,37 +153,10 @@ public class MQAdminTestUtils {
         return false;
     }
 
-    public void getSubConnection(String nameSrvAddr, String clusterName, String consumerId) {
-        boolean createResult = true;
-        DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
-        mqAdminExt.setNamesrvAddr(nameSrvAddr);
-        SubscriptionGroupConfig config = new SubscriptionGroupConfig();
-        config.setGroupName(consumerId);
-        try {
-            mqAdminExt.start();
-            Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdminExt,
-                clusterName);
-            for (String addr : masterSet) {
-                try {
-
-                    System.out.printf("create subscription group %s to %s success.\n", consumerId,
-                        addr);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    Thread.sleep(1000 * 1);
-                }
-            }
-        } catch (Exception e) {
-            createResult = false;
-            e.printStackTrace();
-        }
-        ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
-    }
-
     //should only be test, if some middle operation failed, it dose not backup the brokerConfigMap
     public static Map<String, TopicConfigAndQueueMapping> createStaticTopic(String topic, int queueNum, Set<String> targetBrokers, DefaultMQAdminExt defaultMQAdminExt) throws Exception {
         Map<String, TopicConfigAndQueueMapping> brokerConfigMap = MQAdminUtils.examineTopicConfigAll(topic, defaultMQAdminExt);
-        assert  brokerConfigMap.isEmpty();
+        assert brokerConfigMap.isEmpty();
         TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, targetBrokers, brokerConfigMap);
         MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);
         MQAdminUtils.updateTopicConfigMappingAll(brokerConfigMap, defaultMQAdminExt, false);
@@ -221,20 +188,20 @@ public class MQAdminTestUtils {
         MQAdminUtils.checkIfMasterAlive(brokerConfigMap.keySet(), defaultMQAdminExt, clientMetadata);
         // now do the remapping
         //Step1: let the new leader can be write without the logicOffset
-        for (String broker: brokersToMapIn) {
+        for (String broker : brokersToMapIn) {
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
             defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
         }
         //Step2: forbid the write of old leader
-        for (String broker: brokersToMapOut) {
+        for (String broker : brokersToMapOut) {
             String addr = clientMetadata.findMasterBrokerAddr(broker);
             TopicConfigAndQueueMapping configMapping = brokerConfigMap.get(broker);
             defaultMQAdminExt.createStaticTopic(addr, defaultMQAdminExt.getCreateTopicKey(), configMapping, configMapping.getMappingDetail(), force);
         }
 
         //Step5: write the non-target brokers
-        for (String broker: brokerConfigMap.keySet()) {
+        for (String broker : brokerConfigMap.keySet()) {
             if (brokersToMapIn.contains(broker) || brokersToMapOut.contains(broker)) {
                 continue;
             }
@@ -244,25 +211,24 @@ public class MQAdminTestUtils {
         }
     }
 
-
-    public static void createStaticTopicWithCommand(String topic, int queueNum, Set<String> brokers, String cluster,  String nameservers) throws Exception {
+    public static void createStaticTopicWithCommand(String topic, int queueNum, Set<String> brokers, String cluster, String nameservers) throws Exception {
         UpdateStaticTopicSubCommand cmd = new UpdateStaticTopicSubCommand();
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         String[] args;
         if (cluster != null) {
-            args = new String[] {
-                    "-c", cluster,
-                    "-t", topic,
-                    "-qn", String.valueOf(queueNum),
-                    "-n", nameservers
+            args = new String[]{
+                "-c", cluster,
+                "-t", topic,
+                "-qn", String.valueOf(queueNum),
+                "-n", nameservers
             };
         } else {
             String brokerStr = String.join(",", brokers);
-            args = new String[] {
-                    "-b", brokerStr,
-                    "-t", topic,
-                    "-qn", String.valueOf(queueNum),
-                    "-n", nameservers
+            args = new String[]{
+                "-b", brokerStr,
+                "-t", topic,
+                "-qn", String.valueOf(queueNum),
+                "-n", nameservers
             };
         }
         final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, cmd.buildCommandlineOptions(options), new PosixParser());
@@ -276,23 +242,22 @@ public class MQAdminTestUtils {
         cmd.execute(commandLine, options, null);
     }
 
-
     public static void remappingStaticTopicWithCommand(String topic, Set<String> brokers, String cluster, String nameservers) throws Exception {
         RemappingStaticTopicSubCommand cmd = new RemappingStaticTopicSubCommand();
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         String[] args;
         if (cluster != null) {
-            args = new String[] {
-                    "-c", cluster,
-                    "-t", topic,
-                    "-n", nameservers
+            args = new String[]{
+                "-c", cluster,
+                "-t", topic,
+                "-n", nameservers
             };
         } else {
             String brokerStr = String.join(",", brokers);
-            args = new String[] {
-                    "-b", brokerStr,
-                    "-t", topic,
-                    "-n", nameservers
+            args = new String[]{
+                "-b", brokerStr,
+                "-t", topic,
+                "-n", nameservers
             };
         }
         final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, cmd.buildCommandlineOptions(options), new PosixParser());
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 5c80e86..6f551d3 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -45,8 +45,6 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.rpc.ClientMetadata;
-import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.common.topic.TopicValidator;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index d1f0fcd..fce4318 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -64,11 +64,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.rpc.ClientMetadata;
-import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
-import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
-import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index c4838e3..35efe96 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -44,8 +44,6 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.rpc.ClientMetadata;
-import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingDetail;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
index 0fda471..a5aab4d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminUtils.java
@@ -1,15 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.tools.admin;
 
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.admin.TopicOffset;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.common.statictopic.LogicQueueMappingItem;
@@ -160,7 +174,7 @@ public class MQAdminUtils {
                 if (topicOffset == null) {
                     throw new RuntimeException("Cannot get the max offset for old leader " + oldLeader);
                 }
-                //TODO check the max offset, will it return -1?
+                //TO DO check the max offset, will it return -1?
                 if (topicOffset.getMaxOffset() < oldLeader.getStartOffset()) {
                     throw new RuntimeException("The max offset is smaller then the start offset " + oldLeader + " " + topicOffset.getMaxOffset());
                 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
index a3b757a..ba4e54e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/RemappingStaticTopicSubCommand.java
@@ -107,7 +107,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
             }
             MQAdminUtils.remappingStaticTopic(topic, wrapper.getBrokerToMapIn(), wrapper.getBrokerToMapOut(), wrapper.getBrokerConfigMap(), 10000, force, defaultMQAdminExt);
             return;
-        }catch (Exception e) {
+        } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
         } finally {
             defaultMQAdminExt.shutdown();
@@ -138,7 +138,7 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
 
         try {
             defaultMQAdminExt.start();
-            if ((!commandLine.hasOption("b") && !commandLine.hasOption('c'))) {
+            if (!commandLine.hasOption("b") && !commandLine.hasOption('c')) {
                 ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
                 return;
             }
@@ -184,14 +184,14 @@ public class RemappingStaticTopicSubCommand implements SubCommand {
             {
                 TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet<String>(), new HashSet<String>());
                 String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
-                System.out.println("The old mapping data is written to file " + oldMappingDataFile);
+                System.out.printf("The old mapping data is written to file " + oldMappingDataFile + "\n");
             }
 
             TopicRemappingDetailWrapper newWrapper = TopicQueueMappingUtils.remappingStaticTopic(topic, brokerConfigMap, targetBrokers);
 
             {
                 String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
-                System.out.println("The old mapping data is written to file " + newMappingDataFile);
+                System.out.printf("The old mapping data is written to file " + newMappingDataFile + "\n");
             }
 
             MQAdminUtils.completeNoTargetBrokers(newWrapper.getBrokerConfigMap(), defaultMQAdminExt);
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
index 410381e..d67b8fb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/UpdateStaticTopicSubCommand.java
@@ -21,10 +21,9 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.statictopic.TopicConfigAndQueueMapping;
 import org.apache.rocketmq.common.statictopic.TopicQueueMappingUtils;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.rpc.ClientMetadata;
 import org.apache.rocketmq.common.statictopic.TopicRemappingDetailWrapper;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.srvutil.ServerUtil;
@@ -110,7 +109,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             MQAdminUtils.completeNoTargetBrokers(wrapper.getBrokerConfigMap(), defaultMQAdminExt);
             MQAdminUtils.updateTopicConfigMappingAll(wrapper.getBrokerConfigMap(), defaultMQAdminExt, false);
             return;
-        }catch (Exception e) {
+        } catch (Exception e) {
             throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
         } finally {
             defaultMQAdminExt.shutdown();
@@ -185,7 +184,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
             {
                 TopicRemappingDetailWrapper oldWrapper = new TopicRemappingDetailWrapper(topic, TopicRemappingDetailWrapper.TYPE_CREATE_OR_UPDATE, maxEpochAndNum.getKey(), brokerConfigMap, new HashSet<String>(), new HashSet<String>());
                 String oldMappingDataFile = TopicQueueMappingUtils.writeToTemp(oldWrapper, false);
-                System.out.println("The old mapping data is written to file " + oldMappingDataFile);
+                System.out.printf("The old mapping data is written to file " + oldMappingDataFile + "\n");
             }
             //add the existed brokers to target brokers
             targetBrokers.addAll(brokerConfigMap.keySet());
@@ -195,7 +194,7 @@ public class UpdateStaticTopicSubCommand implements SubCommand {
 
             {
                 String newMappingDataFile = TopicQueueMappingUtils.writeToTemp(newWrapper, true);
-                System.out.println("The new mapping data is written to file " + newMappingDataFile);
+                System.out.printf("The new mapping data is written to file " + newMappingDataFile + "\n");
             }
 
             MQAdminUtils.completeNoTargetBrokers(brokerConfigMap, defaultMQAdminExt);