You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/02/20 09:59:28 UTC

[incubator-inlong] branch master updated: [INLONG-2609][TubeMQ] Fix Javadoc related errors (#2611)

This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1662c12  [INLONG-2609][TubeMQ] Fix Javadoc related errors (#2611)
1662c12 is described below

commit 1662c123f7dc6b3f183a6261420161aab726814b
Author: gosonzhang <46...@qq.com>
AuthorDate: Sun Feb 20 17:59:23 2022 +0800

    [INLONG-2609][TubeMQ] Fix Javadoc related errors (#2611)
---
 .../consumer/SimpleClientBalanceConsumer.java      | 16 +++++-
 .../client/factory/TubeBaseSessionFactory.java     |  6 +++
 .../tubemq/client/producer/ProducerManager.java    | 19 ++++---
 .../client/producer/SimpleMessageProducer.java     |  7 +++
 .../qltystats/DefaultBrokerRcvQltyStats.java       | 14 +++--
 .../connectors/tubemq/TubemqSourceFunction.java    | 20 +++++--
 .../flink/connectors/tubemq/TubemqTableSource.java | 28 +++++++---
 .../tubemq/corebase/utils/ServiceStatusHolder.java | 42 +++++++++------
 .../inlong/tubemq/corerpc/ResponseWrapper.java     | 33 ++++++++++++
 .../inlong/tubemq/corerpc/RpcServiceFactory.java   | 62 ++++++++++++++--------
 .../benchemark/RcpService4BenchmarkClient.java     | 15 +++++-
 .../inlong/tubemq/corerpc/codec/PbEnDecoder.java   | 24 ++++-----
 .../corerpc/netty/ByteBufferInputStream.java       | 11 +++-
 .../corerpc/netty/ByteBufferOutputStream.java      |  6 +++
 .../inlong/tubemq/corerpc/netty/NettyClient.java   | 46 ++++++++++------
 .../tubemq/corerpc/netty/NettyClientFactory.java   | 14 ++---
 .../inlong/tubemq/corerpc/utils/MixUtils.java      | 26 +++++++--
 .../tubemq/corerpc/utils/TSSLEngineUtil.java       | 39 +++++++++++---
 .../tubemq/server/broker/BrokerServiceServer.java  |  2 +-
 .../server/broker/metadata/BrokerDefMetadata.java  |  9 ++--
 .../server/broker/nodeinfo/ConsumerNodeInfo.java   | 42 ++++++++++++---
 .../common/offsetstorage/OffsetStorageInfo.java    | 31 ++++++++---
 .../common/offsetstorage/ZkOffsetStorage.java      | 13 +++--
 .../offsetstorage/zookeeper/ZooKeeperWatcher.java  |  2 +
 .../inlong/tubemq/server/common/utils/Bytes.java   |  2 +
 .../tubemq/server/common/utils/ClientSyncInfo.java |  6 +++
 .../tubemq/server/common/utils/FileUtil.java       |  8 ++-
 .../tubemq/server/common/utils/HttpUtils.java      | 31 ++++++-----
 .../inlong/tubemq/server/common/utils/RowLock.java |  7 ++-
 .../inlong/tubemq/server/common/utils/Sleeper.java |  9 ++--
 .../inlong/tubemq/server/master/TMaster.java       |  8 ++-
 .../bdbstore/bdbentitys/BdbBrokerConfEntity.java   | 26 ++++++++-
 .../bdbentitys/BdbClusterSettingEntity.java        | 26 ++++++++-
 .../bdbstore/bdbentitys/BdbTopicConfEntity.java    | 22 +++++++-
 .../server/master/metamanage/MetaDataManager.java  | 13 +++++
 .../nodemanage/nodeconsumer/ConsumeGroupInfo.java  |  8 +++
 .../nodemanage/nodeconsumer/ConsumerInfo.java      | 29 ++++++++++
 .../server/master/web/common/BaseResult.java       | 40 +-------------
 .../master/web/simplemvc/RequestDispatcher.java    | 12 +++++
 39 files changed, 584 insertions(+), 190 deletions(-)

diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
index 8ac51fa..eaf379d 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
@@ -67,6 +67,12 @@ import org.apache.inlong.tubemq.corerpc.service.MasterService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * SimpleClientBalanceConsumer, This type of consumer supports the client for
+ * independent partition allocation and consumption.
+ * Compared with the server-side allocation scheme, this type of client manages the partition by
+ * itself and is not affected by the server-side allocation cycle
+ */
 public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
     private static final Logger logger =
             LoggerFactory.getLogger(SimpleClientBalanceConsumer.class);
@@ -94,9 +100,9 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
     private final RpcConfig rpcConfig = new RpcConfig();
     private final AtomicLong visitToken =
             new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
-    private AtomicReference<String> authAuthorizedTokenRef =
+    private final AtomicReference<String> authAuthorizedTokenRef =
             new AtomicReference<>("");
-    private ClientAuthenticateHandler authenticateHandler =
+    private final ClientAuthenticateHandler authenticateHandler =
             new SimpleClientAuthenticateHandler();
     private final ScheduledExecutorService heartService2Master;
     private final AtomicInteger metaReqStatusId = new AtomicInteger(0);
@@ -110,6 +116,12 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
             new ConcurrentHashMap<>();
     protected final ClientStatsInfo clientStatsInfo;
 
+    /**
+     * Initial a client-balance consumer object
+     * @param messageSessionFactory   the session factory
+     * @param consumerConfig          the consumer configure
+     * @throws TubeClientException    the exception while creating object.
+     */
     public SimpleClientBalanceConsumer(final InnerSessionFactory messageSessionFactory,
                                        final ConsumerConfig consumerConfig) throws TubeClientException {
         java.security.Security.setProperty("networkaddress.cache.ttl", "3");
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
index 357b4b4..3e74256 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
@@ -53,6 +53,12 @@ public class TubeBaseSessionFactory implements InnerSessionFactory {
     private final DefaultBrokerRcvQltyStats brokerRcvQltyStats;
     private final AtomicBoolean shutdown = new AtomicBoolean(false);
 
+    /**
+     * Initial Session factory
+     *
+     * @param clientFactory      the client factory
+     * @param tubeClientConfig   the tube client configure
+     */
     public TubeBaseSessionFactory(final ClientFactory clientFactory,
                                   final TubeClientConfig tubeClientConfig) throws TubeClientException {
         super();
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
index 4ad2a29..312ada1 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
@@ -87,7 +87,7 @@ public class ProducerManager {
             new AtomicReference<>("");
     private final ClientAuthenticateHandler authenticateHandler =
             new SimpleClientAuthenticateHandler();
-    private MasterService masterService;
+    private final MasterService masterService;
     private Map<Integer, BrokerInfo> brokersMap = new ConcurrentHashMap<>();
     private long brokerInfoCheckSum = -1L;
     private long lastBrokerUpdatedTime = System.currentTimeMillis();
@@ -104,6 +104,13 @@ public class ProducerManager {
             new AtomicBoolean(false);
     private final ClientStatsInfo clientStatsInfo;
 
+    /**
+     * Initial a producer manager
+     *
+     * @param sessionFactory         the session factory
+     * @param tubeClientConfig       the client configure
+     * @throws TubeClientException   the exception while creating object
+     */
     public ProducerManager(final InnerSessionFactory sessionFactory,
                            final TubeClientConfig tubeClientConfig) throws TubeClientException {
         java.security.Security.setProperty("networkaddress.cache.ttl", "3");
@@ -163,7 +170,7 @@ public class ProducerManager {
     /**
      * Start the producer manager.
      *
-     * @throws Throwable
+     * @throws Throwable  the exception
      */
     public void start() throws Throwable {
         if (nodeStatus.get() <= 0) {
@@ -178,7 +185,7 @@ public class ProducerManager {
      * Publish a topic.
      *
      * @param topic topic name
-     * @throws TubeClientException
+     * @throws TubeClientException  the exception at publish
      */
     public void publish(final String topic) throws TubeClientException {
         checkServiceStatus();
@@ -223,7 +230,7 @@ public class ProducerManager {
      *
      * @param topicSet a set of topic names
      * @return a set of successful published topic names
-     * @throws TubeClientException
+     * @throws TubeClientException   the exception at publish
      */
     public Set<String> publish(Set<String> topicSet) throws TubeClientException {
         checkServiceStatus();
@@ -282,7 +289,7 @@ public class ProducerManager {
     /**
      * Shutdown the produce manager.
      *
-     * @throws Throwable
+     * @throws Throwable   the exception at shutdown
      */
     public void shutdown() throws Throwable {
         StringBuilder strBuff = new StringBuilder(512);
@@ -373,7 +380,7 @@ public class ProducerManager {
     /**
      * Remove published topics. We will ignore null topics or non-published topics.
      *
-     * @param topicSet
+     * @param topicSet   the topic set need to delete
      */
     public void removeTopic(Set<String> topicSet) {
         for (String topic : topicSet) {
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
index c4699fd..6f71b44 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
@@ -65,6 +65,13 @@ public class SimpleMessageProducer implements MessageProducer {
     private final RpcConfig rpcConfig = new RpcConfig();
     private final AtomicBoolean isShutDown = new AtomicBoolean(false);
 
+    /**
+     * Initial a producer object
+     *
+     * @param sessionFactory        the session factory
+     * @param tubeClientConfig      the client configure
+     * @throws TubeClientException  the exception while creating object
+     */
     public SimpleMessageProducer(final InnerSessionFactory sessionFactory,
                                  TubeClientConfig tubeClientConfig) throws TubeClientException {
         java.security.Security.setProperty("networkaddress.cache.ttl", "3");
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/qltystats/DefaultBrokerRcvQltyStats.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/qltystats/DefaultBrokerRcvQltyStats.java
index 160d770..a8dabbc 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/qltystats/DefaultBrokerRcvQltyStats.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/qltystats/DefaultBrokerRcvQltyStats.java
@@ -61,10 +61,10 @@ public class DefaultBrokerRcvQltyStats implements BrokerRcvQltyStats {
     // -1: Uninitialized
     // 0: Running
     // 1:Stopped
-    private AtomicInteger statusId = new AtomicInteger(-1);
+    private final AtomicInteger statusId = new AtomicInteger(-1);
     private long lastPrintTime = System.currentTimeMillis();
     // Total sent request number.
-    private AtomicLong curTotalSentRequestNum = new AtomicLong(0);
+    private final AtomicLong curTotalSentRequestNum = new AtomicLong(0);
     // The time of last link quality statistic.
     private long lastLinkStatisticTime = System.currentTimeMillis();
     // Analyze the broker quality based on the request response. We calculate the quality metric by
@@ -77,6 +77,12 @@ public class DefaultBrokerRcvQltyStats implements BrokerRcvQltyStats {
     private long lastQualityStatisticTime = System.currentTimeMillis();
     private long printCount = 0;
 
+    /**
+     * Initial a broker receive status statistics ojbect
+     *
+     * @param rpcServiceFactory  the session factory
+     * @param producerConfig     the producer configure
+     */
     public DefaultBrokerRcvQltyStats(final RpcServiceFactory rpcServiceFactory,
                                      final TubeClientConfig producerConfig) {
         this.clientConfig = producerConfig;
@@ -120,7 +126,7 @@ public class DefaultBrokerRcvQltyStats implements BrokerRcvQltyStats {
      *
      * @param brokerPartList broker partition mapping
      * @return partition list
-     * @throws TubeClientException
+     * @throws TubeClientException  the exception while query
      */
     @Override
     public List<Partition> getAllowedBrokerPartitions(
@@ -212,7 +218,7 @@ public class DefaultBrokerRcvQltyStats implements BrokerRcvQltyStats {
     /**
      * Remove a registered broker from the statistic list.
      *
-     * @param registeredBrokerIdList
+     * @param registeredBrokerIdList   the broker id need to delete
      */
     @Override
     public void removeUnRegisteredBroker(List<Integer> registeredBrokerIdList) {
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
index 368af08..7f3e2e4 100644
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
+++ b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The Flink tubemq Consumer.
+ * The Flink TubeMQ Consumer.
  *
  * @param <T> The type of records produced by this data source
  */
@@ -67,7 +67,7 @@ public class TubemqSourceFunction<T>
     private static final String SPLIT_COLON = ":";
 
     /**
-     * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081.
+     * The address of TubeMQ master, format eg: 127.0.0.1:8080,127.0.0.2:8081.
      */
     private final String masterAddress;
 
@@ -92,7 +92,7 @@ public class TubemqSourceFunction<T>
     private final DeserializationSchema<T> deserializationSchema;
 
     /**
-     * The random key for tubemq consumer group when startup.
+     * The random key for TubeMQ consumer group when startup.
      */
     private final String sessionKey;
 
@@ -131,15 +131,25 @@ public class TubemqSourceFunction<T>
     private transient Map<String, Long> currentOffsets;
 
     /**
-     * The tubemq session factory.
+     * The TubeMQ session factory.
      */
     private transient TubeSingleSessionFactory messageSessionFactory;
 
     /**
-     * The tubemq pull consumer.
+     * The TubeMQ pull consumer.
      */
     private transient PullMessageConsumer messagePullConsumer;
 
+    /**
+     * Build a TubeMQ source function
+     *
+     * @param masterAddress            the master address of TubeMQ
+     * @param topic                    the topic name
+     * @param tidSet                   the  topic's filter condition items
+     * @param consumerGroup            the consumer group name
+     * @param deserializationSchema    the deserialize schema
+     * @param configuration            the configure
+     */
     public TubemqSourceFunction(
         String masterAddress,
         String topic,
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
index 8ba4b2d..ea1e000 100644
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
+++ b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
@@ -41,7 +41,7 @@ import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 
 /**
- * Tubemq {@link StreamTableSource}.
+ * TubeMQ {@link StreamTableSource}.
  */
 public class TubemqTableSource implements
     StreamTableSource<Row>,
@@ -50,7 +50,7 @@ public class TubemqTableSource implements
     DefinedFieldMapping {
 
     /**
-     * Deserialization schema for records from tubemq.
+     * Deserialization schema for records from TubeMQ.
      */
     private final DeserializationSchema<Row> deserializationSchema;
 
@@ -77,30 +77,44 @@ public class TubemqTableSource implements
     private final Map<String, String> fieldMapping;
 
     /**
-     * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
+     * The address of TubeMQ master, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
      */
     private final String masterAddress;
 
     /**
-     * The tubemq topic name.
+     * The TubeMQ topic name.
      */
     private final String topic;
 
     /**
-     * The tubemq tid filter collection.
+     * The TubeMQ tid filter collection.
      */
     private final TreeSet<String> tidSet;
 
     /**
-     * The tubemq consumer group name.
+     * The TubeMQ consumer group name.
      */
     private final String consumerGroup;
 
     /**
-     * The parameters collection for tubemq consumer.
+     * The parameters collection for TubeMQ consumer.
      */
     private final Configuration configuration;
 
+    /**
+     * Build TubeMQ table source
+     *
+     * @param deserializationSchema   the deserialize schema
+     * @param schema             the data schema
+     * @param proctimeAttribute              the proc time
+     * @param rowtimeAttributeDescriptors    the row time attribute descriptor
+     * @param fieldMapping        the field map information
+     * @param masterAddress       the master address
+     * @param topic               the topic name
+     * @param tidSet              the topic's filter condition items
+     * @param consumerGroup       the consumer group
+     * @param configuration       the configure
+     */
     public TubemqTableSource(
         DeserializationSchema<Row> deserializationSchema,
         TableSchema schema,
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
index 1f0126e..fa229bc 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
@@ -22,32 +22,35 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Node Read and Write Service status holder.
+ */
 public class ServiceStatusHolder {
     private static final Logger logger =
             LoggerFactory.getLogger(ServiceStatusHolder.class);
-    private static AtomicBoolean isServiceStopped = new AtomicBoolean(false);
-    private static AtomicBoolean isReadStopped = new AtomicBoolean(false);
-    private static AtomicBoolean isWriteStopped = new AtomicBoolean(false);
+    private static final AtomicBoolean isServiceStopped = new AtomicBoolean(false);
+    private static final AtomicBoolean isReadStopped = new AtomicBoolean(false);
+    private static final AtomicBoolean isWriteStopped = new AtomicBoolean(false);
 
     private static int allowedReadIOExcptCnt = 10;
     private static int allowedWriteIOExcptCnt = 10;
-    private static long statisDurationMs = 120000;
-    private static AtomicLong curReadIOExcptCnt = new AtomicLong(0);
-    private static AtomicLong lastReadStatsTime =
+    private static long statsDurationMs = 120000;
+    private static final AtomicLong curReadIOExcptCnt = new AtomicLong(0);
+    private static final AtomicLong lastReadStatsTime =
             new AtomicLong(System.currentTimeMillis());
-    private static AtomicBoolean isPauseRead = new AtomicBoolean(false);
+    private static final AtomicBoolean isPauseRead = new AtomicBoolean(false);
 
-    private static AtomicLong curWriteIOExcptCnt = new AtomicLong(0);
-    private static AtomicLong lastWriteStatsTime =
+    private static final AtomicLong curWriteIOExcptCnt = new AtomicLong(0);
+    private static final AtomicLong lastWriteStatsTime =
             new AtomicLong(System.currentTimeMillis());
-    private static AtomicBoolean isPauseWrite = new AtomicBoolean(false);
+    private static final AtomicBoolean isPauseWrite = new AtomicBoolean(false);
 
-    public static void setStatisParameters(int paraAllowedReadIOExcptCnt,
-                                           int paraAllowedWriteIOExcptCnt,
-                                           long paraStatisDurationMs) {
+    public static void setStatsParameters(int paraAllowedReadIOExcptCnt,
+                                          int paraAllowedWriteIOExcptCnt,
+                                          long paraStatsDurationMs) {
         allowedReadIOExcptCnt = paraAllowedReadIOExcptCnt;
         allowedWriteIOExcptCnt = paraAllowedWriteIOExcptCnt;
-        statisDurationMs = paraStatisDurationMs;
+        statsDurationMs = paraStatsDurationMs;
     }
 
     public static boolean isServiceStopped() {
@@ -67,7 +70,7 @@ public class ServiceStatusHolder {
 
     public static boolean addWriteIOErrCnt() {
         long curTime = lastWriteStatsTime.get();
-        if (System.currentTimeMillis() - curTime > statisDurationMs) {
+        if (System.currentTimeMillis() - curTime > statsDurationMs) {
             if (lastWriteStatsTime.compareAndSet(curTime, System.currentTimeMillis())) {
                 curWriteIOExcptCnt.getAndSet(0);
                 if (isPauseWrite.get()) {
@@ -92,7 +95,7 @@ public class ServiceStatusHolder {
 
     public static boolean addReadIOErrCnt() {
         long curTime = lastReadStatsTime.get();
-        if (System.currentTimeMillis() - curTime > statisDurationMs) {
+        if (System.currentTimeMillis() - curTime > statsDurationMs) {
             if (lastReadStatsTime.compareAndSet(curTime, System.currentTimeMillis())) {
                 curReadIOExcptCnt.getAndSet(0);
                 if (isPauseRead.get()) {
@@ -115,6 +118,13 @@ public class ServiceStatusHolder {
         return getCurServiceStatus(isPauseRead.get(), isReadStopped.get());
     }
 
+    /**
+     * Set the read and write service status
+     *
+     * @param isReadStop      whether stop read service
+     * @param isWriteStop     whether stop write service
+     * @param caller          the caller
+     */
     public static void setReadWriteServiceStatus(boolean isReadStop,
                                                  boolean isWriteStop,
                                                  String caller) {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/ResponseWrapper.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/ResponseWrapper.java
index feb3215..3f389c9 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/ResponseWrapper.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/ResponseWrapper.java
@@ -21,6 +21,9 @@ import java.io.Serializable;
 import org.apache.inlong.tubemq.corerpc.exception.StandbyException;
 import org.apache.inlong.tubemq.corerpc.utils.MixUtils;
 
+/**
+ *  Response message wrapper class.
+ */
 public class ResponseWrapper implements Serializable {
 
     private static final long serialVersionUID = -3852197088007144687L;
@@ -35,6 +38,16 @@ public class ResponseWrapper implements Serializable {
     private String errMsg;
     private String stackTrace;
 
+    /**
+     *  Initial a response wrapper object
+     *
+     * @param flagId         the flag id
+     * @param serialNo       the serial no.
+     * @param serviceType    the service type
+     * @param locVersion     the local protocol version
+     * @param methodId       the method id
+     * @param responseData   the response data
+     */
     public ResponseWrapper(int flagId, int serialNo,
                            int serviceType, int locVersion,
                            int methodId, Object responseData) {
@@ -47,6 +60,16 @@ public class ResponseWrapper implements Serializable {
         this.success = true;
     }
 
+    /**
+     *  Initial a response wrapper object
+     *
+     * @param flagId         the flag id
+     * @param serialNo       the serial no.
+     * @param serviceType    the service type
+     * @param rmtVersion     the remote protocol version
+     * @param locVersion     the local protocol version
+     * @param exception      the exception
+     */
     public ResponseWrapper(int flagId, int serialNo,
                            int serviceType, int rmtVersion,
                            int locVersion, Throwable exception) {
@@ -75,6 +98,16 @@ public class ResponseWrapper implements Serializable {
         }
     }
 
+    /**
+     *  Initial a response wrapper object
+     *
+     * @param flagId         the flag id
+     * @param serialNo       the serial no.
+     * @param serviceType    the service type
+     * @param locVersion     the local protocol version
+     * @param errorMsg       the text error message
+     * @param stackTrace     the stack trace information
+     */
     public ResponseWrapper(int flagId, int serialNo,
                            int serviceType, int locVersion,
                            String errorMsg, String stackTrace) {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcServiceFactory.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcServiceFactory.java
index 0748b5b..a05853e 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcServiceFactory.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcServiceFactory.java
@@ -48,7 +48,7 @@ public class RpcServiceFactory {
     private static final Logger logger =
             LoggerFactory.getLogger(RpcServiceFactory.class);
     private static final int DEFAULT_IDLE_TIME = 10 * 60 * 1000;
-    private static AtomicInteger threadIdGen = new AtomicInteger(0);
+    private static final AtomicInteger threadIdGen = new AtomicInteger(0);
     private final ClientFactory clientFactory;
     private final ConcurrentHashMap<Integer, ServiceRpcServer> servers =
             new ConcurrentHashMap<>();
@@ -68,8 +68,8 @@ public class RpcServiceFactory {
             new ConcurrentHashMap<>();
     private long unAvailableFbdDurationMs =
         RpcConstants.CFG_UNAVAILABLE_FORBIDDEN_DURATION_MS;
-    private AtomicLong lastLogPrintTime = new AtomicLong(0);
-    private AtomicLong lastCheckTime = new AtomicLong(0);
+    private final AtomicLong lastLogPrintTime = new AtomicLong(0);
+    private final AtomicLong lastCheckTime = new AtomicLong(0);
     private long linkStatsDurationMs =
             RpcConstants.CFG_LQ_STATS_DURATION_MS;
     private long linkStatsForbiddenDurMs =
@@ -89,6 +89,8 @@ public class RpcServiceFactory {
 
     /**
      * initial with an tube clientFactory
+     *
+     * @param clientFactory    the client factory
      */
     public RpcServiceFactory(final ClientFactory clientFactory) {
         this.clientFactory = clientFactory;
@@ -97,6 +99,9 @@ public class RpcServiceFactory {
 
     /**
      * initial with an tube clientFactory and rpc config
+     *
+     * @param clientFactory  the client factory
+     * @param config         the configure information
      */
     public RpcServiceFactory(final ClientFactory clientFactory, final RpcConfig config) {
         this.clientFactory = clientFactory;
@@ -125,8 +130,8 @@ public class RpcServiceFactory {
     /**
      * check if the remote address is forbidden or not
      *
-     * @param remoteAddr
-     * @return
+     * @param remoteAddr   the remote address
+     * @return             whether is forbidden
      */
     public boolean isRemoteAddrForbidden(String remoteAddr) {
         Long forbiddenTime = forbiddenAddrMap.get(remoteAddr);
@@ -144,7 +149,7 @@ public class RpcServiceFactory {
     /**
      * get all Link abnormal Forbidden Address
      *
-     * @return
+     * @return  the forbidden address map
      */
     public ConcurrentHashMap<String, Long> getForbiddenAddrMap() {
         return forbiddenAddrMap;
@@ -153,14 +158,16 @@ public class RpcServiceFactory {
     /**
      * get all service abnormal Forbidden brokerIds
      *
-     * @return
+     * @return   the unavailable broker map
      */
     public ConcurrentHashMap<Integer, Long> getUnavailableBrokerMap() {
         return brokerUnavailableMap;
     }
 
     /**
-     * @param remoteAddr
+     * Remove the remote address from forbidden address map
+     *
+     * @param remoteAddr   the remote address need to removed
      */
     public void resetRmtAddrErrCount(String remoteAddr) {
         forbiddenAddrMap.remove(remoteAddr);
@@ -178,7 +185,9 @@ public class RpcServiceFactory {
     }
 
     /**
-     * @param remoteAddr
+     * Accumulate a error count for the remote address
+     *
+     * @param remoteAddr    the remote address
      */
     public void addRmtAddrErrCount(String remoteAddr) {
         RemoteConErrStats rmtConErrStats = remoteAddrMap.get(remoteAddr);
@@ -220,7 +229,7 @@ public class RpcServiceFactory {
                 }
                 int needForbiddenCount =
                         (int) Math.rint(remoteAddrMap.size() * linkStatsMaxAllowedForbiddenRate);
-                needForbiddenCount = (needForbiddenCount > 30) ? 30 : needForbiddenCount;
+                needForbiddenCount = Math.min(needForbiddenCount, 30);
                 if (needForbiddenCount > totalCount) {
                     forbiddenAddrMap.put(remoteAddr, System.currentTimeMillis());
                     isAdded = true;
@@ -241,6 +250,10 @@ public class RpcServiceFactory {
         }
     }
 
+    /**
+     * Remove expired records
+     * All forbidden records will be removed after the specified time
+     */
     public void rmvAllExpiredRecords() {
         long curTime = System.currentTimeMillis();
         Set<String> expiredAddrs = new HashSet<>();
@@ -291,6 +304,10 @@ public class RpcServiceFactory {
         brokerUnavailableMap.put(brokerId, System.currentTimeMillis());
     }
 
+    /**
+     * Remove unavailable records
+     * All unavailable records will be removed after the specified time
+     */
     public void rmvExpiredUnavailableBrokers() {
         long curTime = System.currentTimeMillis();
         Set<Integer> expiredBrokers = new HashSet<>();
@@ -316,10 +333,12 @@ public class RpcServiceFactory {
     }
 
     /**
-     * @param clazz
-     * @param brokerInfo
-     * @param config
-     * @return
+     * Get broker's service
+     *
+     * @param clazz        the class object
+     * @param brokerInfo   the broker object
+     * @param config       the configure
+     * @return             the service instance for the broker
      */
     public synchronized <T> T getService(Class<T> clazz,
                                          BrokerInfo brokerInfo,
@@ -343,7 +362,7 @@ public class RpcServiceFactory {
     /**
      * check is service empty
      *
-     * @return
+     * @return whether is empty
      */
     public boolean isServiceEmpty() {
         return servicesCache.isEmpty();
@@ -405,12 +424,12 @@ public class RpcServiceFactory {
     /**
      * start an tube netty server
      *
-     * @param clazz
-     * @param serviceInstance
-     * @param listenPort
-     * @param threadPool
-     * @param config
-     * @throws Exception
+     * @param clazz             the class object
+     * @param serviceInstance   the service instance
+     * @param listenPort        the listen port
+     * @param threadPool        the thread pool
+     * @param config            the configure
+     * @throws Exception        the excepition while processing
      */
     public synchronized void publishService(Class clazz, Object serviceInstance,
                                             int listenPort, ExecutorService threadPool,
@@ -588,5 +607,4 @@ public class RpcServiceFactory {
             }
         }
     }
-
 }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/benchemark/RcpService4BenchmarkClient.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/benchemark/RcpService4BenchmarkClient.java
index bd5b736..344748f 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/benchemark/RcpService4BenchmarkClient.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/benchemark/RcpService4BenchmarkClient.java
@@ -33,10 +33,18 @@ public class RcpService4BenchmarkClient {
     private final int targetPort;
     private final RpcServiceFactory rpcServiceFactory;
     private final NettyClientFactory clientFactory = new NettyClientFactory();
-    private SimpleService simpleService;
+    private final SimpleService simpleService;
     private int threadNum = 10;
     private int invokeTimes = 1000000;
 
+    /**
+     * Initial a benchmark client
+     *
+     * @param targetHost    the target host
+     * @param targetPort    the target port
+     * @param threadNum     the thread count
+     * @param invokeTimes   the invoke count
+     */
     public RcpService4BenchmarkClient(String targetHost, int targetPort, int threadNum,
                                       int invokeTimes) {
         this.targetHost = targetHost;
@@ -59,6 +67,11 @@ public class RcpService4BenchmarkClient {
         new RcpService4BenchmarkClient("127.0.0.1", 8088, 10, 100000).start();
     }
 
+    /**
+     * Start benchmark test
+     *
+     * @throws Exception the exception
+     */
     public void start() throws Exception {
         for (int i = 0; i < threadNum; i++) {
             executorService.submit(new Runnable() {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java
index 9e394d3..980a36c 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java
@@ -75,13 +75,13 @@ public class PbEnDecoder {
     }
 
     /**
-     * lizard forgives
+     * Decode pb content
      *
-     * @param isRequest
-     * @param methodId
-     * @param bytes
-     * @return
-     * @throws Exception
+     * @param isRequest     whether a request message
+     * @param methodId      the method id
+     * @param bytes         the message content
+     * @return              the message's object
+     * @throws Exception    the exception while decoding messsage
      */
     public static Object pbDecode(boolean isRequest, int methodId, byte[] bytes) throws Exception {
         // #lizard forgives
@@ -236,13 +236,13 @@ public class PbEnDecoder {
     }
 
     /**
-     * lizard forgives
+     * Valid service type and method parameters
      *
-     * @param serviceId
-     * @param methodId
-     * @param sBuilder
-     * @return
-     * @throws Exception
+     * @param serviceId     the service id
+     * @param methodId      the method id
+     * @param sBuilder      the string buffer
+     * @return              whether is valid content.
+     * @throws Exception    the exception while processing.
      */
     public static boolean isValidServiceTypeAndMethod(int serviceId,
                                                       int methodId,
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferInputStream.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferInputStream.java
index 12c83d7..7541428 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferInputStream.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferInputStream.java
@@ -28,7 +28,7 @@ import java.util.List;
  * Copied from <a href="http://avro.apache.org">Apache Avro Project</a>
  */
 public class ByteBufferInputStream extends InputStream {
-    private List<ByteBuffer> buffers;
+    private final List<ByteBuffer> buffers;
     private int current;
 
     public ByteBufferInputStream(List<ByteBuffer> buffers) {
@@ -36,6 +36,9 @@ public class ByteBufferInputStream extends InputStream {
     }
 
     /**
+     * Read a byte at this buffer
+     *
+     * @return the read value
      * @throws java.io.EOFException if EOF is reached.
      * @see java.io.InputStream#read()
      */
@@ -45,6 +48,11 @@ public class ByteBufferInputStream extends InputStream {
     }
 
     /**
+     * Read content of specified length
+     *
+     * @param b     the content buffer
+     * @param off   the offset position
+     * @param len   the content length
      * @throws java.io.EOFException if EOF is reached before reading all the bytes.
      * @see java.io.InputStream#read(byte[], int, int)
      */
@@ -67,6 +75,7 @@ public class ByteBufferInputStream extends InputStream {
     /**
      * Read a buffer from the input without copying, if possible.
      *
+     * @param length    the need read data length
      * @throws java.io.EOFException if EOF is reached before reading all the bytes.
      */
     public ByteBuffer readBuffer(int length) throws IOException {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferOutputStream.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferOutputStream.java
index b51b1ad..346e55d 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferOutputStream.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferOutputStream.java
@@ -50,6 +50,8 @@ public class ByteBufferOutputStream extends OutputStream {
 
     /**
      * Prepend a list of ByteBuffers to this stream.
+     *
+     * @param lists    need to prepended content
      */
     public void prepend(List<ByteBuffer> lists) {
         for (ByteBuffer buffer : lists) {
@@ -60,6 +62,8 @@ public class ByteBufferOutputStream extends OutputStream {
 
     /**
      * Append a list of ByteBuffers to this stream.
+     *
+     * @param lists    need to appended content
      */
     public void append(List<ByteBuffer> lists) {
         for (ByteBuffer buffer : lists) {
@@ -104,6 +108,8 @@ public class ByteBufferOutputStream extends OutputStream {
 
     /**
      * Add a buffer to the output without copying, if possible.
+     *
+     * @param buffer   the content need to written
      */
     public void writeBuffer(ByteBuffer buffer) throws IOException {
         if (buffer.remaining() < RpcConstants.RPC_MAX_BUFFER_SIZE) {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
index 868fc65..6a14446 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
@@ -70,16 +70,18 @@ public class NettyClient implements Client {
             new ConcurrentHashMap<>();
     private final AtomicInteger serialNoGenerator =
             new AtomicInteger(0);
-    private AtomicBoolean released = new AtomicBoolean(false);
+    private final AtomicBoolean released = new AtomicBoolean(false);
     private NodeAddrInfo addressInfo;
-    private ClientFactory clientFactory;
+    private final ClientFactory clientFactory;
     private Channel channel;
-    private long connectTimeout;
-    private volatile AtomicBoolean closed = new AtomicBoolean(true);
+    private final long connectTimeout;
+    private final AtomicBoolean closed = new AtomicBoolean(true);
 
     /**
-     * @param clientFactory
-     * @param connectTimeout
+     * Initial a netty client object
+     *
+     * @param clientFactory    the client factory
+     * @param connectTimeout   the connection timeout
      */
     public NettyClient(ClientFactory clientFactory, long connectTimeout) {
         this.clientFactory = clientFactory;
@@ -94,8 +96,10 @@ public class NettyClient implements Client {
     }
 
     /**
-     * @param channel
-     * @param addressInfo
+     * Set a channel
+     *
+     * @param channel      the channel
+     * @param addressInfo   the address of the channel
      */
     public void setChannel(Channel channel, final NodeAddrInfo addressInfo) {
         this.channel = channel;
@@ -226,6 +230,8 @@ public class NettyClient implements Client {
      * remove clientFactory cache
      * handler unfinished callbacks
      * and close the channel
+     *
+     * @param removeParent    whether remove the object from client factory
      */
     @Override
     public void close(boolean removeParent) {
@@ -275,11 +281,13 @@ public class NettyClient implements Client {
      */
     public class NettyClientHandler extends SimpleChannelUpstreamHandler {
 
-        @Override
         /**
-         * Invoked when a message object (e.g: {@link ChannelBuffer}) was received
-         * from a remote peer.
+         * Invoked when a message object was received from a remote peer.
+         *
+         * @param ctx     the channel handler context
+         * @param e       the message event
          */
+        @Override
         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
             if (e.getMessage() instanceof RpcDataPack) {
                 RpcDataPack dataPack = (RpcDataPack) e.getMessage();
@@ -366,8 +374,10 @@ public class NettyClient implements Client {
         }
 
         /**
-         * Invoked when an exception was raised by an I/O thread or a
-         * {@link ChannelHandler}.
+         * Invoked when an exception was raised by an I/O thread or a {@link ChannelHandler}.
+         *
+         * @param ctx   the channel handler context
+         * @param e     the exception object
          */
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
@@ -386,11 +396,13 @@ public class NettyClient implements Client {
             }
         }
 
-        @Override
         /**
-         * Invoked when a {@link Channel} was closed and all its related resources
-         * were released.
+         * Invoked when a {@link Channel} was closed and all its related resources were released.
+         *
+         * @param ctx   the channel handler context
+         * @param e     the channel state event
          */
+        @Override
         public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
             NettyClient.this.close();
         }
@@ -401,7 +413,7 @@ public class NettyClient implements Client {
      */
     public class TimeoutTask implements TimerTask {
 
-        private int serialNo;
+        private final int serialNo;
 
         public TimeoutTask(int serialNo) {
             this.serialNo = serialNo;
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
index 8c33506..9204419 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
@@ -83,8 +83,8 @@ public class NettyClientFactory implements ClientFactory {
     /**
      * initial the network by rpc config object
      *
-     * @param conf
-     * @throws IllegalArgumentException
+     * @param conf      the configure information
+     * @throws IllegalArgumentException  the exception while configuring object
      */
     public void configure(final RpcConfig conf) throws IllegalArgumentException {
         if (this.init.compareAndSet(false, true)) {
@@ -239,11 +239,11 @@ public class NettyClientFactory implements ClientFactory {
     /**
      * create a netty client
      *
-     * @param addressInfo
-     * @param connectTimeout
-     * @param conf
-     * @return
-     * @throws Exception
+     * @param addressInfo        the remote address information
+     * @param connectTimeout     the connection timeout
+     * @param conf               the configure information
+     * @return                   the client object
+     * @throws Exception         the exception while creating object.
      */
     private Client createClient(final NodeAddrInfo addressInfo,
                                 int connectTimeout, final RpcConfig conf) throws Exception {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/MixUtils.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/MixUtils.java
index cbe126e..59af704 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/MixUtils.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/MixUtils.java
@@ -25,17 +25,31 @@ import org.apache.inlong.tubemq.corerpc.protocol.RpcProtocol;
 
 public class MixUtils {
 
+    /**
+     * Substitute class name prefix
+     *
+     * After TubeMQ was donated to Apache, the package prefix name was not migrated to
+     * the name of apache for a long time, and some errors on the server side were returned
+     * to the client by throwing exceptions.
+     * In order to maintain the compatibility between the previous and previous versions,
+     * the the class name in exception information is replaced through this function
+     *
+     * @param className        the class name
+     * @param toOldVersion     whether the client is old version
+     * @param toProtocolVer     the client's protocol version
+     * @return                 the translated class name
+     */
     public static String replaceClassNamePrefix(String className,
                                                 boolean toOldVersion,
-                                                int toPotocolVer) {
+                                                int toProtocolVer) {
 
-        if (toPotocolVer == RpcProtocol.RPC_PROTOCOL_VERSION_OLD_1) {
+        if (toProtocolVer == RpcProtocol.RPC_PROTOCOL_VERSION_OLD_1) {
             if (toOldVersion) {
                 return className.replace("org.apache.inlong.tubemq.", "com.tencent.tubemq.");
             } else {
                 return className.replace("com.tencent.tubemq.", "org.apache.inlong.tubemq.");
             }
-        } else if (toPotocolVer == RpcProtocol.RPC_PROTOCOL_VERSION_TUBEMQ) {
+        } else if (toProtocolVer == RpcProtocol.RPC_PROTOCOL_VERSION_TUBEMQ) {
             if (toOldVersion) {
                 return className.replace("org.apache.inlong.tubemq.", "com.apache.tubemq.");
             } else {
@@ -46,6 +60,12 @@ public class MixUtils {
         }
     }
 
+    /**
+     * Construct the corresponding exception object according to the exception text
+     *
+     * @param exceptionMsg     the exception text
+     * @return                 the exception object
+     */
     public static Throwable unwrapException(String exceptionMsg) {
         // Perform string to exception conversion processing
         try {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/TSSLEngineUtil.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/TSSLEngineUtil.java
index c163b16..5f0f227 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/TSSLEngineUtil.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/TSSLEngineUtil.java
@@ -27,19 +27,34 @@ import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManagerFactory;
 
+/**
+ * SSL-related general function classes
+ */
 public class TSSLEngineUtil {
 
+    /**
+     * Create a SSL engine
+     *
+     * @param keyStoreStream        the key-store stream
+     * @param keyStorePassword      the key-sored password
+     * @param trustStoreStream      the trust-store stream
+     * @param trustStorePassword    the trust-store password
+     * @param isClientMode          whether client mode
+     * @param needTwoWayAuth        Whether require two-way authentication
+     * @return                      the SSL engine
+     * @throws Exception            the exception information while creating
+     */
     public static SSLEngine createSSLEngine(InputStream keyStoreStream,
                                             String keyStorePassword,
                                             InputStream trustStoreStream,
                                             String trustStorePassword,
                                             boolean isClientMode,
-                                            boolean needTwyWayAuth)
+                                            boolean needTwoWayAuth)
         throws Exception {
 
         KeyManagerFactory kmf = null;
         TrustManagerFactory tmf = null;
-        if (isClientMode || needTwyWayAuth) {
+        if (isClientMode || needTwoWayAuth) {
             KeyStore ts = KeyStore.getInstance("JKS");
             try {
                 ts.load(trustStoreStream, trustStorePassword.toCharArray());
@@ -52,7 +67,7 @@ public class TSSLEngineUtil {
             }
 
         }
-        if (!isClientMode || needTwyWayAuth) {
+        if (!isClientMode || needTwoWayAuth) {
             KeyStore ks = KeyStore.getInstance("JKS");
             try {
                 ks.load(keyStoreStream, keyStorePassword.toCharArray());
@@ -70,20 +85,32 @@ public class TSSLEngineUtil {
             tmf == null ? null : tmf.getTrustManagers(), null);
         SSLEngine sslEngine = serverContext.createSSLEngine();
         sslEngine.setUseClientMode(isClientMode);
-        sslEngine.setNeedClientAuth(needTwyWayAuth);
+        sslEngine.setNeedClientAuth(needTwoWayAuth);
 
         return sslEngine;
     }
 
+    /**
+     * Create a SSL engine
+     *
+     * @param keyStorePath           the key-store file path
+     * @param trustStorePath         the trust-store file path
+     * @param keyStorePassword       the key-store password
+     * @param trustStorePassword     the trust-store password
+     * @param isClientMode          whether client mode
+     * @param needTwoWayAuth        Whether require two-way authentication
+     * @return                      the SSL engine
+     * @throws Exception            the exception information while creating
+     */
     public static SSLEngine createSSLEngine(String keyStorePath, String trustStorePath,
                                             String keyStorePassword, String trustStorePassword,
-                                            boolean isClientMode, boolean needTwyWayAuth)
+                                            boolean isClientMode, boolean needTwoWayAuth)
         throws Exception {
 
         InputStream keyStoreStream = new FileInputStream(new File(keyStorePath));
         InputStream trustStoreStream = new FileInputStream(new File(trustStorePath));
 
         return createSSLEngine(keyStoreStream, keyStorePassword, trustStoreStream,
-            trustStorePassword, isClientMode, needTwyWayAuth);
+            trustStorePassword, isClientMode, needTwoWayAuth);
     }
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index f9f195a..16da439 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -125,7 +125,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
         this.storeManager = tubeBroker.getStoreManager();
         this.offsetManager = tubeBroker.getOffsetManager();
         this.serverAuthHandler = tubeBroker.getServerAuthHandler();
-        ServiceStatusHolder.setStatisParameters(tubeConfig.getAllowedReadIOExcptCnt(),
+        ServiceStatusHolder.setStatsParameters(tubeConfig.getAllowedReadIOExcptCnt(),
                 tubeConfig.getAllowedWriteIOExcptCnt(), tubeConfig.getIoExcptStatsDurationMs());
         this.putCounterGroup = new TrafficStatsService("PutCounterGroup", "Producer", 60 * 1000);
         this.getCounterGroup = new TrafficStatsService("GetCounterGroup", "Consumer", 60 * 1000);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
index 95e4430..6e6806f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
@@ -46,7 +46,7 @@ public class BrokerDefMetadata {
     // expire policy.
     private String deletePolicy = "delete,168h";
     // the max cache size for topic.
-    private int memCacheMsgSize = 1 * 1024 * 1024;
+    private int memCacheMsgSize = 1024 * 1024;
     // the max cache message count for topic.
     private int memCacheMsgCnt = 5 * 1024;
     // the max interval(milliseconds) that topic's memory cache will flush to disk.
@@ -56,9 +56,10 @@ public class BrokerDefMetadata {
 
     }
 
-    /*
-     * Build BrokerDefMetadata from brokerDefMetaConfInfo, each segment is separated by ':'.
-     * brokerDefMetaConfInfo is often generated by Master service. Only used for inner communication.
+    /**
+     * Initial broker meta-data object by configure info
+     *
+     * @param brokerDefMetaConfInfo      the broker configure information.
     */
     public BrokerDefMetadata(String brokerDefMetaConfInfo) {
         if (TStringUtils.isBlank(brokerDefMetaConfInfo)) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
index 3444daf..487b95e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
@@ -61,6 +61,16 @@ public class ConsumerNodeInfo {
             new AtomicInteger(TBaseConstants.META_VALUE_UNDEFINED);
     private long createTime = System.currentTimeMillis();
 
+    /**
+     * Initial consumer node information
+     *
+     * @param storeManager    the store manager
+     * @param consumerId      the consumer id
+     * @param filterCodes     the filter condition items
+     * @param sessionKey      the session key
+     * @param sessionTime     the session create time
+     * @param partStr         the partition information
+     */
     public ConsumerNodeInfo(final MessageStoreManager storeManager,
                             final String consumerId, Set<String> filterCodes,
                             final String sessionKey, long sessionTime, final String partStr) {
@@ -68,6 +78,18 @@ public class ConsumerNodeInfo {
             filterCodes, sessionKey, sessionTime, false, partStr);
     }
 
+    /**
+     * Initial consumer node information
+     *
+     * @param storeManager       the store manager
+     * @param qryPriorityId      the query priority id
+     * @param consumerId         the consumer id
+     * @param filterCodes        the filter condition items
+     * @param sessionKey         the session key
+     * @param sessionTime        the session create time
+     * @param isSupportLimit     whether to support limited consumption function
+     * @param partStr            the partition information
+     */
     public ConsumerNodeInfo(final MessageStoreManager storeManager,
                             final int qryPriorityId, final String consumerId,
                             Set<String> filterCodes, final String sessionKey,
@@ -92,7 +114,16 @@ public class ConsumerNodeInfo {
         this.isSupportLimit = isSupportLimit;
     }
 
-    // #lizard forgives
+    /**
+     * Query the current allowed maximum consumption size
+     *
+     * @param storeKey              the store block key
+     * @param flowCtrlRuleHandler   the flow-control rule handler
+     * @param currMaxDataOffset     the current max data offset
+     * @param maxMsgTransferSize    the max message transfer size
+     * @param isEscFlowCtrl         whether need escape flow-control process
+     * @return                      the allowed consumption size
+     */
     public int getCurrentAllowedSize(final String storeKey,
                                      final FlowCtrlRuleHandler flowCtrlRuleHandler,
                                      final long currMaxDataOffset, int maxMsgTransferSize,
@@ -229,10 +260,10 @@ public class ConsumerNodeInfo {
     /**
      * Recalculate message limit value.
      *
-     * @param curDataDlt
-     * @param currTime
-     * @param maxMsgTransferSize
-     * @param flowCtrlRuleHandler
+     * @param curDataDlt              current data lag
+     * @param currTime                current time
+     * @param maxMsgTransferSize      the max message transfer size
+     * @param flowCtrlRuleHandler     the flow-control rule handler
      */
     private void recalcMsgLimitValue(long curDataDlt, long currTime, int maxMsgTransferSize,
                                      final FlowCtrlRuleHandler flowCtrlRuleHandler) {
@@ -257,5 +288,4 @@ public class ConsumerNodeInfo {
                     currTime + TBaseConstants.CFG_FC_MAX_SAMPLING_PERIOD;
         }
     }
-
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/OffsetStorageInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/OffsetStorageInfo.java
index adb0959..310b97c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/OffsetStorageInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/OffsetStorageInfo.java
@@ -25,19 +25,38 @@ import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
 public class OffsetStorageInfo implements Serializable {
 
     private static final long serialVersionUID = -4232003748320500757L;
-    private String topic;
-    private int brokerId;
-    private int partitionId;
-    private AtomicLong offset = new AtomicLong(0);
+    private final String topic;
+    private final int brokerId;
+    private final int partitionId;
+    private final AtomicLong offset = new AtomicLong(0);
     private long messageId;
     private boolean firstCreate = false;
     private boolean modified = false;
 
+    /**
+     * Initial offset storage information
+     *
+     * @param topic          the topic name
+     * @param brokerId       the broker id
+     * @param partitionId    the partition id
+     * @param offset         the offset
+     * @param messageId      the message id
+     */
     public OffsetStorageInfo(String topic, int brokerId, int partitionId,
                              long offset, long messageId) {
         this(topic, brokerId, partitionId, offset, messageId, true);
     }
 
+    /**
+     * Initial offset storage information
+     *
+     * @param topic          the topic name
+     * @param brokerId       the broker id
+     * @param partitionId    the partition id
+     * @param offset         the offset
+     * @param messageId      the message id
+     * @param firstCreate    whether is the first record creation
+     */
     public OffsetStorageInfo(String topic, int brokerId, int partitionId,
                              long offset, long messageId, boolean firstCreate) {
         this.topic = topic;
@@ -126,7 +145,7 @@ public class OffsetStorageInfo implements Serializable {
         if (!topic.equals(that.topic)) {
             return false;
         }
-        return offset != null ? offset.equals(that.offset) : that.offset == null;
+        return (offset.get() == that.offset.get());
 
     }
 
@@ -135,7 +154,7 @@ public class OffsetStorageInfo implements Serializable {
         int result = topic.hashCode();
         result = 31 * result + brokerId;
         result = 31 * result + partitionId;
-        result = 31 * result + (offset != null ? offset.hashCode() : 0);
+        result = 31 * result + offset.hashCode();
         result = 31 * result + (int) (messageId ^ (messageId >>> 32));
         result = 31 * result + (firstCreate ? 1 : 0);
         result = 31 * result + (modified ? 1 : 0);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
index c19b99e..70aaa6c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
@@ -65,9 +65,16 @@ public class ZkOffsetStorage implements OffsetStorage {
     private final boolean isBroker;
     private final int brokerId;
     private final String strBrokerId;
-    private ZKConfig zkConfig;
+    private final ZKConfig zkConfig;
     private ZooKeeperWatcher zkw;
 
+    /**
+     * Initial ZooKeeper offset storage ojbect
+     *
+     * @param zkConfig   the ZooKeeper configure
+     * @param isBroker   whether used in broker node
+     * @param brokerId   the broker id
+     */
     public ZkOffsetStorage(final ZKConfig zkConfig, boolean isBroker, int brokerId) {
         this.zkConfig = zkConfig;
         this.isBroker = isBroker;
@@ -302,9 +309,8 @@ public class ZkOffsetStorage implements OffsetStorage {
 
     /**
      * Get offset stored in zookeeper, if not found or error, set null
-     * <p/>
      *
-     * @return partitionId--offset map info
+     * @param groupTopicPartMap   the group topic-partition map
      */
     @Override
     public void deleteGroupOffsetInfo(
@@ -371,5 +377,4 @@ public class ZkOffsetStorage implements OffsetStorage {
             return root;
         }
     }
-
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java
index 49b3eb3..8ea1de1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java
@@ -431,6 +431,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
     }
 
     /**
+     * Get the active Zookeeper address
+     *
      * @return Path to the currently active master.
      */
     public String getMasterAddressZNode() {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Bytes.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Bytes.java
index 51e3627..ae467e5 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Bytes.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Bytes.java
@@ -39,6 +39,8 @@ public class Bytes {
     private static final Logger logger = LoggerFactory.getLogger(Bytes.class);
 
     /**
+     * Compare two arrays content.
+     *
      * @param left  left operand
      * @param right right operand
      * @return 0 if equal, < 0 if left is less than right, etc.
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java
index 72f0773..8819418 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java
@@ -40,6 +40,12 @@ public class ClientSyncInfo {
 
     }
 
+    /**
+     * Update the client reported subscription information
+     *
+     * @param brokerRunManager   the broker run-manager
+     * @param clientSubRepInfo   the client reported subscription information
+     */
     public void updSubRepInfo(BrokerRunManager brokerRunManager,
                               ClientMaster.ClientSubRepInfo clientSubRepInfo) {
         if (clientSubRepInfo == null) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/FileUtil.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/FileUtil.java
index fe5f4c5..c9dc001 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/FileUtil.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/FileUtil.java
@@ -29,6 +29,13 @@ public class FileUtil {
         return dir.delete();
     }
 
+    /**
+     * Delete the contents of files and subdirectories in the specified directory
+     *
+     * @param dir            the specified directory
+     * @return               the deleted count
+     * @throws IOException   the exception while deleting contents
+     */
     public static boolean fullyDeleteContents(File dir) throws IOException {
         boolean deletionSucceeded = true;
         File[] contents = dir.listFiles();
@@ -67,5 +74,4 @@ public class FileUtil {
                     .append(dir.getAbsolutePath()).toString());
         }
     }
-
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/HttpUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/HttpUtils.java
index 0d9afc2..8716fe1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/HttpUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/HttpUtils.java
@@ -47,7 +47,12 @@ public class HttpUtils {
     private static final Logger logger =
             LoggerFactory.getLogger(HttpUtils.class);
 
-    /* Send request to target server. */
+    /**
+     * Send request to target server.
+     *
+     * @param url            the target url
+     * @param inParamMap     the parameter map
+     */
     public static JsonObject requestWebService(String url,
                                                Map<String, String> inParamMap) throws Exception {
         if (url == null) {
@@ -120,17 +125,20 @@ public class HttpUtils {
         return jsonRes;
     }
 
+    /**
+     *  Test scenario:
+     *     simulate where there are multiple Master nodes in the cluster,
+     *      and there are nodes that do not take effect
+     * Call url:
+     *    http://127.0.0.1:8080/webapi.htm?method=admin_query_topic_info
+     * Request parameters:
+     *    topicName=test_1, brokerId=170399798
+     * Master nodes:
+     *    10.54.55.32:8080(invalid node),127.0.0.1:8080(valid node)
+     *
+     * @param args   the call arguments
+     */
     public static void main(String[] args) {
-        /** Test scenario:
-         *     simulate where there are multiple Master nodes in the cluster,
-         *      and there are nodes that do not take effect
-         * Call url:
-         *    http://127.0.0.1:8080/webapi.htm?method=admin_query_topic_info
-         * Request parameters:
-         *    topicName=test_1, brokerId=170399798
-         * Master nodes:
-         *    10.54.55.32:8080(invalid node),127.0.0.1:8080(valid node)
-         */
         Map<String, String> inParamMap = new HashMap<>();
         inParamMap.put("topicName", "test_1");
         inParamMap.put("brokerId", "170399798");
@@ -157,5 +165,4 @@ public class HttpUtils {
             System.out.println("query result is " + jsonRes.toString());
         }
     }
-
 }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/RowLock.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/RowLock.java
index ce15415..6b32ab6 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/RowLock.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/RowLock.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
  */
 public class RowLock {
     private static final Logger logger = LoggerFactory.getLogger(RowLock.class);
-    private static Random rand = new Random();
+    private static final Random rand = new Random();
     private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
             new ConcurrentHashMap<>();
     private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
@@ -103,6 +103,11 @@ public class RowLock {
         }
     }
 
+    /**
+     * Release row lock
+     *
+     * @param lockId the lock id
+     */
     public void releaseRowLock(final Integer lockId) {
         if (lockId == null) {
             return;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Sleeper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Sleeper.java
index 3658857..5679b28 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Sleeper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Sleeper.java
@@ -34,9 +34,11 @@ public class Sleeper {
     private final int period;
     private final Stoppable stopper;
     private final Object sleepLock = new Object();
-    private AtomicBoolean triggerWake = new AtomicBoolean(false);
+    private final AtomicBoolean triggerWake = new AtomicBoolean(false);
 
     /**
+     * Initial a Sleeper object
+     *
      * @param sleep   sleep time in milliseconds
      * @param stopper When {@link Stoppable#isStopped()} is true, this thread will cleanup and exit
      *                cleanly.
@@ -65,10 +67,9 @@ public class Sleeper {
     }
 
     /**
-     * Sleep for period adjusted by passed <code>startTime<code>
+     * Sleep for period adjusted by passed startTime
      *
-     * @param startTime Time some task started previous to now. Time to sleep will be docked current
-     *                  time minus passed <code>startTime<code>.
+     * @param startTime Time some task started previous to now.
      */
     public void sleep(final long startTime) {
         if (this.stopper.isStopped()) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index bbc7831..da6a876 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -1863,7 +1863,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         return result;
     }
 
-    // process unReset group balance
+    /**
+     * process unReset group balance
+     *
+     * @param rebalanceId   the re-balance id
+     * @param isFirstReb    whether is first re-balance
+     * @param groups        the need re-balance group set
+     */
     public void processRebalance(long rebalanceId, boolean isFirstReb, List<String> groups) {
         // #lizard forgives
         Map<String, Map<String, List<Partition>>> finalSubInfoMap = null;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
index 95a2aef..2d6be25 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
@@ -64,7 +64,28 @@ public class BdbBrokerConfEntity implements Serializable {
     public BdbBrokerConfEntity() {
     }
 
-    //Constructor
+    /**
+     * Build a broker configure entity
+     *
+     * @param brokerId            the broker id
+     * @param brokerIp            the broker ip
+     * @param brokerPort          the broker port
+     * @param numPartitions       the number of partition
+     * @param unflushThreshold    the un-flushed message count
+     * @param unflushInterval     the un-flushed time delta
+     * @param deleteWhen          the delete time
+     * @param deletePolicy        the delete policy
+     * @param manageStatus        the manage status
+     * @param acceptPublish       whether accept publish
+     * @param acceptSubscribe     whether accept subscribe
+     * @param attributes          the attribute information
+     * @param isConfDataUpdated   whether the configure is updated
+     * @param isBrokerLoaded      whether the broker has loaded
+     * @param createUser          the creator
+     * @param createDate          the create date
+     * @param modifyUser          the modifier
+     * @param modifyDate          the modify date
+     */
     public BdbBrokerConfEntity(final int brokerId, final String brokerIp,
                                final int brokerPort, final int numPartitions,
                                final int unflushThreshold, final int unflushInterval,
@@ -97,6 +118,9 @@ public class BdbBrokerConfEntity implements Serializable {
 
     /**
      * Serialize config field to json format
+     *
+     * @param sb  string buffer
+     * @return  the content in json format
      */
     public StringBuilder toJsonString(final StringBuilder sb) {
         return sb.append("{\"type\":\"BdbBrokerConfEntity\",")
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
index 5fddada..ded8cb1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
@@ -73,7 +73,31 @@ public class BdbClusterSettingEntity implements Serializable {
     public BdbClusterSettingEntity() {
     }
 
-    //Constructor
+    /**
+     * Build cluster setting entity
+     *
+     * @param recordKey              the record key
+     * @param configId               the configure id
+     * @param brokerPort             the broker port
+     * @param brokerTLSPort          the broker TLS port
+     * @param brokerWebPort          the broker web port
+     * @param numTopicStores         the number of topic store
+     * @param numPartitions          the number of partition
+     * @param unflushThreshold       the un-flushed message count
+     * @param unflushInterval        the un-flushed time delta
+     * @param unflushDataHold        the un-flushed data size
+     * @param memCacheMsgCntInK      the memory cached message count
+     * @param memCacheFlushIntvl     the memory cached time delta
+     * @param memCacheMsgSizeInMB    the memory cached message size
+     * @param acceptPublish          whether accept publish
+     * @param acceptSubscribe        whether accept subscribe
+     * @param deletePolicy           the delete policy
+     * @param qryPriorityId          the query priority id
+     * @param maxMsgSizeInB          the default message max size
+     * @param attributes          the attribute information
+     * @param modifyUser          the modifier
+     * @param modifyDate          the modify date
+     */
     public BdbClusterSettingEntity(String recordKey, long configId, int brokerPort,
                                    int brokerTLSPort, int brokerWebPort,
                                    int numTopicStores, int numPartitions,
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
index 90b9ac1..f435497 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
@@ -60,7 +60,27 @@ public class BdbTopicConfEntity implements Serializable {
     public BdbTopicConfEntity() {
     }
 
-    //Constructor
+    /**
+     * Build topic configure entity
+     *
+     * @param brokerId            the broker id
+     * @param brokerIp            the broker ip
+     * @param brokerPort          the broker port
+     * @param topicName           the topic name
+     * @param numPartitions       the number of partition
+     * @param unflushThreshold    the un-flushed message count
+     * @param unflushInterval     the un-flushed time delta
+     * @param deleteWhen          the delete time
+     * @param deletePolicy        the delete policy
+     * @param acceptPublish       whether accept publish
+     * @param acceptSubscribe     whether accept subscribe
+     * @param numTopicStores      the number of topic store
+     * @param attributes          the attribute information
+     * @param createUser          the creator
+     * @param createDate          the create date
+     * @param modifyUser          the modifier
+     * @param modifyDate          the modify date
+     */
     public BdbTopicConfEntity(final int brokerId, final String brokerIp,
                               final int brokerPort, final String topicName,
                               final int numPartitions, final int unflushThreshold,
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
index b7c8934..f7f16f4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
@@ -922,6 +922,19 @@ public class MetaDataManager implements Server {
 
     // ////////////////////////////////////////////////////////////////////////////
 
+    /**
+     * Add or update topic deploy information
+     *
+     * @param isAddOp         whether add operation
+     * @param opEntity        the operation information
+     * @param brokerId        the broker id
+     * @param topicName       the topic name
+     * @param deployStatus    the deploy status
+     * @param topicPropInfo   the topic property set
+     * @param sBuffer         the string buffer
+     * @param result          the process result
+     * @return                true if success otherwise false
+     */
     public TopicProcessResult addOrUpdTopicDeployInfo(boolean isAddOp, BaseEntity opEntity,
                                                       int brokerId, String topicName,
                                                       TopicStatus deployStatus,
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
index b9823a6..65b8281 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
@@ -95,6 +95,11 @@ public class ConsumeGroupInfo {
     private final AtomicLong lastMetaInfoFreshTime =
             new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
 
+    /**
+     *  Initial a Consume group node information.
+     *
+     * @param consumer   the consumer of consume group.
+     */
     public ConsumeGroupInfo(ConsumerInfo consumer) {
         this.groupName = consumer.getGroupName();
         this.consumeType = consumer.getConsumeType();
@@ -107,6 +112,9 @@ public class ConsumeGroupInfo {
      * Add consumer to consume group
      *
      * @param inConsumer consumer object
+     * @param sBuffer    the string buffer
+     * @param result     the process result
+     * @return           whether the addition is successful
      */
     public boolean addConsumer(ConsumerInfo inConsumer,
                                StringBuilder sBuffer,
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
index 4fbcedf..4828e98 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
@@ -50,6 +50,21 @@ public class ConsumerInfo implements Comparable<ConsumerInfo>, Serializable {
     private long lstAssignedTime = TBaseConstants.META_VALUE_UNDEFINED;
     private long usedTopicMetaInfoId = TBaseConstants.META_VALUE_UNDEFINED;
 
+    /**
+     * Initial Consumer node information
+     *
+     * @param consumerId          the consumer id
+     * @param overTLS             whether to communicate via TLS
+     * @param group               the group name of the consumer
+     * @param topicSet            the topic set subscribed
+     * @param topicConditions     the topic filter condition set
+     * @param consumeType         the consume type
+     * @param sessionKey          the session key
+     * @param startTime           the start time
+     * @param sourceCount         the minimum consumer count of consume group
+     * @param selectedBig         whether to choose a larger value if there is a conflict
+     * @param requiredPartition   the required partitions
+     */
     public ConsumerInfo(String consumerId, boolean overTLS, String group,
                         Set<String> topicSet, Map<String, TreeSet<String>> topicConditions,
                         ConsumeType consumeType, String sessionKey, long startTime,
@@ -73,6 +88,20 @@ public class ConsumerInfo implements Comparable<ConsumerInfo>, Serializable {
         this.consumerViewInfo = toString();
     }
 
+    /**
+     * Initial Consumer node information
+     *
+     * @param consumerId          the consumer id
+     * @param overTLS             whether to communicate via TLS
+     * @param group               the group name of the consumer
+     * @param consumeType         the consume type
+     * @param sourceCount         the minimum consumer count of consume group
+     * @param nodeId              the node id
+     * @param topicSet            the topic set subscribed
+     * @param topicConditions     the topic filter condition set
+     * @param curCsmCtrlId        the node's consume control id
+     * @param syncInfo            the consumer report information
+     */
     public ConsumerInfo(String consumerId, boolean overTLS, String group,
                         ConsumeType consumeType, int sourceCount, int nodeId,
                         Set<String> topicSet, Map<String, TreeSet<String>> topicConditions,
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/common/BaseResult.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/common/BaseResult.java
index 024098b..a34fa96 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/common/BaseResult.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/common/BaseResult.java
@@ -20,10 +20,8 @@ package org.apache.inlong.tubemq.server.master.web.common;
 import java.io.Serializable;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
@@ -476,42 +474,6 @@ public class BaseResult implements Serializable {
         return new String[0];
     }
 
-    public List getTaobaoSlider() {
-        List l = new ArrayList(10);
-        int leftStart = 1;
-        int leftEnd = 2;
-        int mStart = this.getCurrentPage().intValue() - 2;
-        int mEnd = this.getCurrentPage().intValue() + 2;
-        int rStart = this.getTotalPage() - 1;
-        int rEnd = this.getTotalPage();
-        if (mStart <= leftEnd) {
-            leftStart = 0;
-            leftEnd = 0;
-            mStart = 1;
-        }
-        if (mEnd >= rStart) {
-            rStart = 0;
-            rEnd = 0;
-            mEnd = this.getTotalPage();
-        }
-        if (leftEnd > leftStart) {
-            for (int i = leftStart; i <= leftEnd; ++i) {
-                l.add(String.valueOf(i));
-            }
-            l.add("...");
-        }
-        for (int i = mStart; i <= mEnd; ++i) {
-            l.add(String.valueOf(i));
-        }
-        if (rEnd > rStart) {
-            l.add("...");
-            for (int i = rStart; i <= rEnd; ++i) {
-                l.add(String.valueOf(i));
-            }
-        }
-        return l;
-    }
-
     /**
      * Get ajax prefix value
      *
@@ -645,6 +607,8 @@ public class BaseResult implements Serializable {
     }
 
     /**
+     * Set escape js status
+     *
      * @param jsEscape The jsEscape to set.
      */
     public final BaseResult setJsEscape(boolean jsEscape) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/simplemvc/RequestDispatcher.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/simplemvc/RequestDispatcher.java
index fa387af..523733c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/simplemvc/RequestDispatcher.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/simplemvc/RequestDispatcher.java
@@ -89,6 +89,13 @@ public class RequestDispatcher {
         executeTarget(context, context.getTarget(), TYPE_LAYOUT);
     }
 
+    /**
+     * Execute required method service
+     * @param context   the context
+     * @param target    the target information
+     * @param type      the operation type
+     * @throws Exception the exception
+     */
     public void executeTarget(RequestContext context,
                               String target, String type) throws Exception {
         String targetKey = getActionKey(type, target);
@@ -162,6 +169,11 @@ public class RequestDispatcher {
                 .append("/").append(target).append(".vm").toString();
     }
 
+    /**
+     * Get layout information
+     * @param target  the target information
+     * @return   the layout information
+     */
     public String getLayout(String target) {
         String layout = null;
         String[] targetPaths = target.split("/");