You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/02/20 09:59:28 UTC
[incubator-inlong] branch master updated: [INLONG-2609][TubeMQ] Fix Javadoc related errors (#2611)
This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1662c12 [INLONG-2609][TubeMQ] Fix Javadoc related errors (#2611)
1662c12 is described below
commit 1662c123f7dc6b3f183a6261420161aab726814b
Author: gosonzhang <46...@qq.com>
AuthorDate: Sun Feb 20 17:59:23 2022 +0800
[INLONG-2609][TubeMQ] Fix Javadoc related errors (#2611)
---
.../consumer/SimpleClientBalanceConsumer.java | 16 +++++-
.../client/factory/TubeBaseSessionFactory.java | 6 +++
.../tubemq/client/producer/ProducerManager.java | 19 ++++---
.../client/producer/SimpleMessageProducer.java | 7 +++
.../qltystats/DefaultBrokerRcvQltyStats.java | 14 +++--
.../connectors/tubemq/TubemqSourceFunction.java | 20 +++++--
.../flink/connectors/tubemq/TubemqTableSource.java | 28 +++++++---
.../tubemq/corebase/utils/ServiceStatusHolder.java | 42 +++++++++------
.../inlong/tubemq/corerpc/ResponseWrapper.java | 33 ++++++++++++
.../inlong/tubemq/corerpc/RpcServiceFactory.java | 62 ++++++++++++++--------
.../benchemark/RcpService4BenchmarkClient.java | 15 +++++-
.../inlong/tubemq/corerpc/codec/PbEnDecoder.java | 24 ++++-----
.../corerpc/netty/ByteBufferInputStream.java | 11 +++-
.../corerpc/netty/ByteBufferOutputStream.java | 6 +++
.../inlong/tubemq/corerpc/netty/NettyClient.java | 46 ++++++++++------
.../tubemq/corerpc/netty/NettyClientFactory.java | 14 ++---
.../inlong/tubemq/corerpc/utils/MixUtils.java | 26 +++++++--
.../tubemq/corerpc/utils/TSSLEngineUtil.java | 39 +++++++++++---
.../tubemq/server/broker/BrokerServiceServer.java | 2 +-
.../server/broker/metadata/BrokerDefMetadata.java | 9 ++--
.../server/broker/nodeinfo/ConsumerNodeInfo.java | 42 ++++++++++++---
.../common/offsetstorage/OffsetStorageInfo.java | 31 ++++++++---
.../common/offsetstorage/ZkOffsetStorage.java | 13 +++--
.../offsetstorage/zookeeper/ZooKeeperWatcher.java | 2 +
.../inlong/tubemq/server/common/utils/Bytes.java | 2 +
.../tubemq/server/common/utils/ClientSyncInfo.java | 6 +++
.../tubemq/server/common/utils/FileUtil.java | 8 ++-
.../tubemq/server/common/utils/HttpUtils.java | 31 ++++++-----
.../inlong/tubemq/server/common/utils/RowLock.java | 7 ++-
.../inlong/tubemq/server/common/utils/Sleeper.java | 9 ++--
.../inlong/tubemq/server/master/TMaster.java | 8 ++-
.../bdbstore/bdbentitys/BdbBrokerConfEntity.java | 26 ++++++++-
.../bdbentitys/BdbClusterSettingEntity.java | 26 ++++++++-
.../bdbstore/bdbentitys/BdbTopicConfEntity.java | 22 +++++++-
.../server/master/metamanage/MetaDataManager.java | 13 +++++
.../nodemanage/nodeconsumer/ConsumeGroupInfo.java | 8 +++
.../nodemanage/nodeconsumer/ConsumerInfo.java | 29 ++++++++++
.../server/master/web/common/BaseResult.java | 40 +-------------
.../master/web/simplemvc/RequestDispatcher.java | 12 +++++
39 files changed, 584 insertions(+), 190 deletions(-)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
index 8ac51fa..eaf379d 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
@@ -67,6 +67,12 @@ import org.apache.inlong.tubemq.corerpc.service.MasterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * SimpleClientBalanceConsumer, This type of consumer supports the client for
+ * independent partition allocation and consumption.
+ * Compared with the server-side allocation scheme, this type of client manages the partition by
+ * itself and is not affected by the server-side allocation cycle
+ */
public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
private static final Logger logger =
LoggerFactory.getLogger(SimpleClientBalanceConsumer.class);
@@ -94,9 +100,9 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
private final RpcConfig rpcConfig = new RpcConfig();
private final AtomicLong visitToken =
new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
- private AtomicReference<String> authAuthorizedTokenRef =
+ private final AtomicReference<String> authAuthorizedTokenRef =
new AtomicReference<>("");
- private ClientAuthenticateHandler authenticateHandler =
+ private final ClientAuthenticateHandler authenticateHandler =
new SimpleClientAuthenticateHandler();
private final ScheduledExecutorService heartService2Master;
private final AtomicInteger metaReqStatusId = new AtomicInteger(0);
@@ -110,6 +116,12 @@ public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
new ConcurrentHashMap<>();
protected final ClientStatsInfo clientStatsInfo;
+ /**
+ * Initial a client-balance consumer object
+ * @param messageSessionFactory the session factory
+ * @param consumerConfig the consumer configure
+ * @throws TubeClientException the exception while creating object.
+ */
public SimpleClientBalanceConsumer(final InnerSessionFactory messageSessionFactory,
final ConsumerConfig consumerConfig) throws TubeClientException {
java.security.Security.setProperty("networkaddress.cache.ttl", "3");
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
index 357b4b4..3e74256 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
@@ -53,6 +53,12 @@ public class TubeBaseSessionFactory implements InnerSessionFactory {
private final DefaultBrokerRcvQltyStats brokerRcvQltyStats;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
+ /**
+ * Initial Session factory
+ *
+ * @param clientFactory the client factory
+ * @param tubeClientConfig the tube client configure
+ */
public TubeBaseSessionFactory(final ClientFactory clientFactory,
final TubeClientConfig tubeClientConfig) throws TubeClientException {
super();
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
index 4ad2a29..312ada1 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
@@ -87,7 +87,7 @@ public class ProducerManager {
new AtomicReference<>("");
private final ClientAuthenticateHandler authenticateHandler =
new SimpleClientAuthenticateHandler();
- private MasterService masterService;
+ private final MasterService masterService;
private Map<Integer, BrokerInfo> brokersMap = new ConcurrentHashMap<>();
private long brokerInfoCheckSum = -1L;
private long lastBrokerUpdatedTime = System.currentTimeMillis();
@@ -104,6 +104,13 @@ public class ProducerManager {
new AtomicBoolean(false);
private final ClientStatsInfo clientStatsInfo;
+ /**
+ * Initial a producer manager
+ *
+ * @param sessionFactory the session factory
+ * @param tubeClientConfig the client configure
+ * @throws TubeClientException the exception while creating object
+ */
public ProducerManager(final InnerSessionFactory sessionFactory,
final TubeClientConfig tubeClientConfig) throws TubeClientException {
java.security.Security.setProperty("networkaddress.cache.ttl", "3");
@@ -163,7 +170,7 @@ public class ProducerManager {
/**
* Start the producer manager.
*
- * @throws Throwable
+ * @throws Throwable the exception
*/
public void start() throws Throwable {
if (nodeStatus.get() <= 0) {
@@ -178,7 +185,7 @@ public class ProducerManager {
* Publish a topic.
*
* @param topic topic name
- * @throws TubeClientException
+ * @throws TubeClientException the exception at publish
*/
public void publish(final String topic) throws TubeClientException {
checkServiceStatus();
@@ -223,7 +230,7 @@ public class ProducerManager {
*
* @param topicSet a set of topic names
* @return a set of successful published topic names
- * @throws TubeClientException
+ * @throws TubeClientException the exception at publish
*/
public Set<String> publish(Set<String> topicSet) throws TubeClientException {
checkServiceStatus();
@@ -282,7 +289,7 @@ public class ProducerManager {
/**
* Shutdown the produce manager.
*
- * @throws Throwable
+ * @throws Throwable the exception at shutdown
*/
public void shutdown() throws Throwable {
StringBuilder strBuff = new StringBuilder(512);
@@ -373,7 +380,7 @@ public class ProducerManager {
/**
* Remove published topics. We will ignore null topics or non-published topics.
*
- * @param topicSet
+ * @param topicSet the topic set need to delete
*/
public void removeTopic(Set<String> topicSet) {
for (String topic : topicSet) {
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
index c4699fd..6f71b44 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/SimpleMessageProducer.java
@@ -65,6 +65,13 @@ public class SimpleMessageProducer implements MessageProducer {
private final RpcConfig rpcConfig = new RpcConfig();
private final AtomicBoolean isShutDown = new AtomicBoolean(false);
+ /**
+ * Initial a producer object
+ *
+ * @param sessionFactory the session factory
+ * @param tubeClientConfig the client configure
+ * @throws TubeClientException the exception while creating object
+ */
public SimpleMessageProducer(final InnerSessionFactory sessionFactory,
TubeClientConfig tubeClientConfig) throws TubeClientException {
java.security.Security.setProperty("networkaddress.cache.ttl", "3");
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/qltystats/DefaultBrokerRcvQltyStats.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/qltystats/DefaultBrokerRcvQltyStats.java
index 160d770..a8dabbc 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/qltystats/DefaultBrokerRcvQltyStats.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/qltystats/DefaultBrokerRcvQltyStats.java
@@ -61,10 +61,10 @@ public class DefaultBrokerRcvQltyStats implements BrokerRcvQltyStats {
// -1: Uninitialized
// 0: Running
// 1:Stopped
- private AtomicInteger statusId = new AtomicInteger(-1);
+ private final AtomicInteger statusId = new AtomicInteger(-1);
private long lastPrintTime = System.currentTimeMillis();
// Total sent request number.
- private AtomicLong curTotalSentRequestNum = new AtomicLong(0);
+ private final AtomicLong curTotalSentRequestNum = new AtomicLong(0);
// The time of last link quality statistic.
private long lastLinkStatisticTime = System.currentTimeMillis();
// Analyze the broker quality based on the request response. We calculate the quality metric by
@@ -77,6 +77,12 @@ public class DefaultBrokerRcvQltyStats implements BrokerRcvQltyStats {
private long lastQualityStatisticTime = System.currentTimeMillis();
private long printCount = 0;
+ /**
+ * Initial a broker receive status statistics ojbect
+ *
+ * @param rpcServiceFactory the session factory
+ * @param producerConfig the producer configure
+ */
public DefaultBrokerRcvQltyStats(final RpcServiceFactory rpcServiceFactory,
final TubeClientConfig producerConfig) {
this.clientConfig = producerConfig;
@@ -120,7 +126,7 @@ public class DefaultBrokerRcvQltyStats implements BrokerRcvQltyStats {
*
* @param brokerPartList broker partition mapping
* @return partition list
- * @throws TubeClientException
+ * @throws TubeClientException the exception while query
*/
@Override
public List<Partition> getAllowedBrokerPartitions(
@@ -212,7 +218,7 @@ public class DefaultBrokerRcvQltyStats implements BrokerRcvQltyStats {
/**
* Remove a registered broker from the statistic list.
*
- * @param registeredBrokerIdList
+ * @param registeredBrokerIdList the broker id need to delete
*/
@Override
public void removeUnRegisteredBroker(List<Integer> registeredBrokerIdList) {
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
index 368af08..7f3e2e4 100644
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
+++ b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * The Flink tubemq Consumer.
+ * The Flink TubeMQ Consumer.
*
* @param <T> The type of records produced by this data source
*/
@@ -67,7 +67,7 @@ public class TubemqSourceFunction<T>
private static final String SPLIT_COLON = ":";
/**
- * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081.
+ * The address of TubeMQ master, format eg: 127.0.0.1:8080,127.0.0.2:8081.
*/
private final String masterAddress;
@@ -92,7 +92,7 @@ public class TubemqSourceFunction<T>
private final DeserializationSchema<T> deserializationSchema;
/**
- * The random key for tubemq consumer group when startup.
+ * The random key for TubeMQ consumer group when startup.
*/
private final String sessionKey;
@@ -131,15 +131,25 @@ public class TubemqSourceFunction<T>
private transient Map<String, Long> currentOffsets;
/**
- * The tubemq session factory.
+ * The TubeMQ session factory.
*/
private transient TubeSingleSessionFactory messageSessionFactory;
/**
- * The tubemq pull consumer.
+ * The TubeMQ pull consumer.
*/
private transient PullMessageConsumer messagePullConsumer;
+ /**
+ * Build a TubeMQ source function
+ *
+ * @param masterAddress the master address of TubeMQ
+ * @param topic the topic name
+ * @param tidSet the topic's filter condition items
+ * @param consumerGroup the consumer group name
+ * @param deserializationSchema the deserialize schema
+ * @param configuration the configure
+ */
public TubemqSourceFunction(
String masterAddress,
String topic,
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
index 8ba4b2d..ea1e000 100644
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
+++ b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
@@ -41,7 +41,7 @@ import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
/**
- * Tubemq {@link StreamTableSource}.
+ * TubeMQ {@link StreamTableSource}.
*/
public class TubemqTableSource implements
StreamTableSource<Row>,
@@ -50,7 +50,7 @@ public class TubemqTableSource implements
DefinedFieldMapping {
/**
- * Deserialization schema for records from tubemq.
+ * Deserialization schema for records from TubeMQ.
*/
private final DeserializationSchema<Row> deserializationSchema;
@@ -77,30 +77,44 @@ public class TubemqTableSource implements
private final Map<String, String> fieldMapping;
/**
- * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
+ * The address of TubeMQ master, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
*/
private final String masterAddress;
/**
- * The tubemq topic name.
+ * The TubeMQ topic name.
*/
private final String topic;
/**
- * The tubemq tid filter collection.
+ * The TubeMQ tid filter collection.
*/
private final TreeSet<String> tidSet;
/**
- * The tubemq consumer group name.
+ * The TubeMQ consumer group name.
*/
private final String consumerGroup;
/**
- * The parameters collection for tubemq consumer.
+ * The parameters collection for TubeMQ consumer.
*/
private final Configuration configuration;
+ /**
+ * Build TubeMQ table source
+ *
+ * @param deserializationSchema the deserialize schema
+ * @param schema the data schema
+ * @param proctimeAttribute the proc time
+ * @param rowtimeAttributeDescriptors the row time attribute descriptor
+ * @param fieldMapping the field map information
+ * @param masterAddress the master address
+ * @param topic the topic name
+ * @param tidSet the topic's filter condition items
+ * @param consumerGroup the consumer group
+ * @param configuration the configure
+ */
public TubemqTableSource(
DeserializationSchema<Row> deserializationSchema,
TableSchema schema,
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
index 1f0126e..fa229bc 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/utils/ServiceStatusHolder.java
@@ -22,32 +22,35 @@ import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Node Read and Write Service status holder.
+ */
public class ServiceStatusHolder {
private static final Logger logger =
LoggerFactory.getLogger(ServiceStatusHolder.class);
- private static AtomicBoolean isServiceStopped = new AtomicBoolean(false);
- private static AtomicBoolean isReadStopped = new AtomicBoolean(false);
- private static AtomicBoolean isWriteStopped = new AtomicBoolean(false);
+ private static final AtomicBoolean isServiceStopped = new AtomicBoolean(false);
+ private static final AtomicBoolean isReadStopped = new AtomicBoolean(false);
+ private static final AtomicBoolean isWriteStopped = new AtomicBoolean(false);
private static int allowedReadIOExcptCnt = 10;
private static int allowedWriteIOExcptCnt = 10;
- private static long statisDurationMs = 120000;
- private static AtomicLong curReadIOExcptCnt = new AtomicLong(0);
- private static AtomicLong lastReadStatsTime =
+ private static long statsDurationMs = 120000;
+ private static final AtomicLong curReadIOExcptCnt = new AtomicLong(0);
+ private static final AtomicLong lastReadStatsTime =
new AtomicLong(System.currentTimeMillis());
- private static AtomicBoolean isPauseRead = new AtomicBoolean(false);
+ private static final AtomicBoolean isPauseRead = new AtomicBoolean(false);
- private static AtomicLong curWriteIOExcptCnt = new AtomicLong(0);
- private static AtomicLong lastWriteStatsTime =
+ private static final AtomicLong curWriteIOExcptCnt = new AtomicLong(0);
+ private static final AtomicLong lastWriteStatsTime =
new AtomicLong(System.currentTimeMillis());
- private static AtomicBoolean isPauseWrite = new AtomicBoolean(false);
+ private static final AtomicBoolean isPauseWrite = new AtomicBoolean(false);
- public static void setStatisParameters(int paraAllowedReadIOExcptCnt,
- int paraAllowedWriteIOExcptCnt,
- long paraStatisDurationMs) {
+ public static void setStatsParameters(int paraAllowedReadIOExcptCnt,
+ int paraAllowedWriteIOExcptCnt,
+ long paraStatsDurationMs) {
allowedReadIOExcptCnt = paraAllowedReadIOExcptCnt;
allowedWriteIOExcptCnt = paraAllowedWriteIOExcptCnt;
- statisDurationMs = paraStatisDurationMs;
+ statsDurationMs = paraStatsDurationMs;
}
public static boolean isServiceStopped() {
@@ -67,7 +70,7 @@ public class ServiceStatusHolder {
public static boolean addWriteIOErrCnt() {
long curTime = lastWriteStatsTime.get();
- if (System.currentTimeMillis() - curTime > statisDurationMs) {
+ if (System.currentTimeMillis() - curTime > statsDurationMs) {
if (lastWriteStatsTime.compareAndSet(curTime, System.currentTimeMillis())) {
curWriteIOExcptCnt.getAndSet(0);
if (isPauseWrite.get()) {
@@ -92,7 +95,7 @@ public class ServiceStatusHolder {
public static boolean addReadIOErrCnt() {
long curTime = lastReadStatsTime.get();
- if (System.currentTimeMillis() - curTime > statisDurationMs) {
+ if (System.currentTimeMillis() - curTime > statsDurationMs) {
if (lastReadStatsTime.compareAndSet(curTime, System.currentTimeMillis())) {
curReadIOExcptCnt.getAndSet(0);
if (isPauseRead.get()) {
@@ -115,6 +118,13 @@ public class ServiceStatusHolder {
return getCurServiceStatus(isPauseRead.get(), isReadStopped.get());
}
+ /**
+ * Set the read and write service status
+ *
+ * @param isReadStop whether stop read service
+ * @param isWriteStop whether stop write service
+ * @param caller the caller
+ */
public static void setReadWriteServiceStatus(boolean isReadStop,
boolean isWriteStop,
String caller) {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/ResponseWrapper.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/ResponseWrapper.java
index feb3215..3f389c9 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/ResponseWrapper.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/ResponseWrapper.java
@@ -21,6 +21,9 @@ import java.io.Serializable;
import org.apache.inlong.tubemq.corerpc.exception.StandbyException;
import org.apache.inlong.tubemq.corerpc.utils.MixUtils;
+/**
+ * Response message wrapper class.
+ */
public class ResponseWrapper implements Serializable {
private static final long serialVersionUID = -3852197088007144687L;
@@ -35,6 +38,16 @@ public class ResponseWrapper implements Serializable {
private String errMsg;
private String stackTrace;
+ /**
+ * Initial a response wrapper object
+ *
+ * @param flagId the flag id
+ * @param serialNo the serial no.
+ * @param serviceType the service type
+ * @param locVersion the local protocol version
+ * @param methodId the method id
+ * @param responseData the response data
+ */
public ResponseWrapper(int flagId, int serialNo,
int serviceType, int locVersion,
int methodId, Object responseData) {
@@ -47,6 +60,16 @@ public class ResponseWrapper implements Serializable {
this.success = true;
}
+ /**
+ * Initial a response wrapper object
+ *
+ * @param flagId the flag id
+ * @param serialNo the serial no.
+ * @param serviceType the service type
+ * @param rmtVersion the remote protocol version
+ * @param locVersion the local protocol version
+ * @param exception the exception
+ */
public ResponseWrapper(int flagId, int serialNo,
int serviceType, int rmtVersion,
int locVersion, Throwable exception) {
@@ -75,6 +98,16 @@ public class ResponseWrapper implements Serializable {
}
}
+ /**
+ * Initial a response wrapper object
+ *
+ * @param flagId the flag id
+ * @param serialNo the serial no.
+ * @param serviceType the service type
+ * @param locVersion the local protocol version
+ * @param errorMsg the text error message
+ * @param stackTrace the stack trace information
+ */
public ResponseWrapper(int flagId, int serialNo,
int serviceType, int locVersion,
String errorMsg, String stackTrace) {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcServiceFactory.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcServiceFactory.java
index 0748b5b..a05853e 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcServiceFactory.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/RpcServiceFactory.java
@@ -48,7 +48,7 @@ public class RpcServiceFactory {
private static final Logger logger =
LoggerFactory.getLogger(RpcServiceFactory.class);
private static final int DEFAULT_IDLE_TIME = 10 * 60 * 1000;
- private static AtomicInteger threadIdGen = new AtomicInteger(0);
+ private static final AtomicInteger threadIdGen = new AtomicInteger(0);
private final ClientFactory clientFactory;
private final ConcurrentHashMap<Integer, ServiceRpcServer> servers =
new ConcurrentHashMap<>();
@@ -68,8 +68,8 @@ public class RpcServiceFactory {
new ConcurrentHashMap<>();
private long unAvailableFbdDurationMs =
RpcConstants.CFG_UNAVAILABLE_FORBIDDEN_DURATION_MS;
- private AtomicLong lastLogPrintTime = new AtomicLong(0);
- private AtomicLong lastCheckTime = new AtomicLong(0);
+ private final AtomicLong lastLogPrintTime = new AtomicLong(0);
+ private final AtomicLong lastCheckTime = new AtomicLong(0);
private long linkStatsDurationMs =
RpcConstants.CFG_LQ_STATS_DURATION_MS;
private long linkStatsForbiddenDurMs =
@@ -89,6 +89,8 @@ public class RpcServiceFactory {
/**
* initial with an tube clientFactory
+ *
+ * @param clientFactory the client factory
*/
public RpcServiceFactory(final ClientFactory clientFactory) {
this.clientFactory = clientFactory;
@@ -97,6 +99,9 @@ public class RpcServiceFactory {
/**
* initial with an tube clientFactory and rpc config
+ *
+ * @param clientFactory the client factory
+ * @param config the configure information
*/
public RpcServiceFactory(final ClientFactory clientFactory, final RpcConfig config) {
this.clientFactory = clientFactory;
@@ -125,8 +130,8 @@ public class RpcServiceFactory {
/**
* check if the remote address is forbidden or not
*
- * @param remoteAddr
- * @return
+ * @param remoteAddr the remote address
+ * @return whether is forbidden
*/
public boolean isRemoteAddrForbidden(String remoteAddr) {
Long forbiddenTime = forbiddenAddrMap.get(remoteAddr);
@@ -144,7 +149,7 @@ public class RpcServiceFactory {
/**
* get all Link abnormal Forbidden Address
*
- * @return
+ * @return the forbidden address map
*/
public ConcurrentHashMap<String, Long> getForbiddenAddrMap() {
return forbiddenAddrMap;
@@ -153,14 +158,16 @@ public class RpcServiceFactory {
/**
* get all service abnormal Forbidden brokerIds
*
- * @return
+ * @return the unavailable broker map
*/
public ConcurrentHashMap<Integer, Long> getUnavailableBrokerMap() {
return brokerUnavailableMap;
}
/**
- * @param remoteAddr
+ * Remove the remote address from forbidden address map
+ *
+ * @param remoteAddr the remote address need to removed
*/
public void resetRmtAddrErrCount(String remoteAddr) {
forbiddenAddrMap.remove(remoteAddr);
@@ -178,7 +185,9 @@ public class RpcServiceFactory {
}
/**
- * @param remoteAddr
+ * Accumulate a error count for the remote address
+ *
+ * @param remoteAddr the remote address
*/
public void addRmtAddrErrCount(String remoteAddr) {
RemoteConErrStats rmtConErrStats = remoteAddrMap.get(remoteAddr);
@@ -220,7 +229,7 @@ public class RpcServiceFactory {
}
int needForbiddenCount =
(int) Math.rint(remoteAddrMap.size() * linkStatsMaxAllowedForbiddenRate);
- needForbiddenCount = (needForbiddenCount > 30) ? 30 : needForbiddenCount;
+ needForbiddenCount = Math.min(needForbiddenCount, 30);
if (needForbiddenCount > totalCount) {
forbiddenAddrMap.put(remoteAddr, System.currentTimeMillis());
isAdded = true;
@@ -241,6 +250,10 @@ public class RpcServiceFactory {
}
}
+ /**
+ * Remove expired records
+ * All forbidden records will be removed after the specified time
+ */
public void rmvAllExpiredRecords() {
long curTime = System.currentTimeMillis();
Set<String> expiredAddrs = new HashSet<>();
@@ -291,6 +304,10 @@ public class RpcServiceFactory {
brokerUnavailableMap.put(brokerId, System.currentTimeMillis());
}
+ /**
+ * Remove unavailable records
+ * All unavailable records will be removed after the specified time
+ */
public void rmvExpiredUnavailableBrokers() {
long curTime = System.currentTimeMillis();
Set<Integer> expiredBrokers = new HashSet<>();
@@ -316,10 +333,12 @@ public class RpcServiceFactory {
}
/**
- * @param clazz
- * @param brokerInfo
- * @param config
- * @return
+ * Get broker's service
+ *
+ * @param clazz the class object
+ * @param brokerInfo the broker object
+ * @param config the configure
+ * @return the service instance for the broker
*/
public synchronized <T> T getService(Class<T> clazz,
BrokerInfo brokerInfo,
@@ -343,7 +362,7 @@ public class RpcServiceFactory {
/**
* check is service empty
*
- * @return
+ * @return whether is empty
*/
public boolean isServiceEmpty() {
return servicesCache.isEmpty();
@@ -405,12 +424,12 @@ public class RpcServiceFactory {
/**
* start an tube netty server
*
- * @param clazz
- * @param serviceInstance
- * @param listenPort
- * @param threadPool
- * @param config
- * @throws Exception
+ * @param clazz the class object
+ * @param serviceInstance the service instance
+ * @param listenPort the listen port
+ * @param threadPool the thread pool
+ * @param config the configure
+ * @throws Exception the excepition while processing
*/
public synchronized void publishService(Class clazz, Object serviceInstance,
int listenPort, ExecutorService threadPool,
@@ -588,5 +607,4 @@ public class RpcServiceFactory {
}
}
}
-
}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/benchemark/RcpService4BenchmarkClient.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/benchemark/RcpService4BenchmarkClient.java
index bd5b736..344748f 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/benchemark/RcpService4BenchmarkClient.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/benchemark/RcpService4BenchmarkClient.java
@@ -33,10 +33,18 @@ public class RcpService4BenchmarkClient {
private final int targetPort;
private final RpcServiceFactory rpcServiceFactory;
private final NettyClientFactory clientFactory = new NettyClientFactory();
- private SimpleService simpleService;
+ private final SimpleService simpleService;
private int threadNum = 10;
private int invokeTimes = 1000000;
+ /**
+ * Initial a benchmark client
+ *
+ * @param targetHost the target host
+ * @param targetPort the target port
+ * @param threadNum the thread count
+ * @param invokeTimes the invoke count
+ */
public RcpService4BenchmarkClient(String targetHost, int targetPort, int threadNum,
int invokeTimes) {
this.targetHost = targetHost;
@@ -59,6 +67,11 @@ public class RcpService4BenchmarkClient {
new RcpService4BenchmarkClient("127.0.0.1", 8088, 10, 100000).start();
}
+ /**
+ * Start benchmark test
+ *
+ * @throws Exception the exception
+ */
public void start() throws Exception {
for (int i = 0; i < threadNum; i++) {
executorService.submit(new Runnable() {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java
index 9e394d3..980a36c 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/codec/PbEnDecoder.java
@@ -75,13 +75,13 @@ public class PbEnDecoder {
}
/**
- * lizard forgives
+ * Decode pb content
*
- * @param isRequest
- * @param methodId
- * @param bytes
- * @return
- * @throws Exception
+ * @param isRequest whether a request message
+ * @param methodId the method id
+ * @param bytes the message content
+ * @return the message's object
+ * @throws Exception the exception while decoding messsage
*/
public static Object pbDecode(boolean isRequest, int methodId, byte[] bytes) throws Exception {
// #lizard forgives
@@ -236,13 +236,13 @@ public class PbEnDecoder {
}
/**
- * lizard forgives
+ * Valid service type and method parameters
*
- * @param serviceId
- * @param methodId
- * @param sBuilder
- * @return
- * @throws Exception
+ * @param serviceId the service id
+ * @param methodId the method id
+ * @param sBuilder the string buffer
+ * @return whether is valid content.
+ * @throws Exception the exception while processing.
*/
public static boolean isValidServiceTypeAndMethod(int serviceId,
int methodId,
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferInputStream.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferInputStream.java
index 12c83d7..7541428 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferInputStream.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferInputStream.java
@@ -28,7 +28,7 @@ import java.util.List;
* Copied from <a href="http://avro.apache.org">Apache Avro Project</a>
*/
public class ByteBufferInputStream extends InputStream {
- private List<ByteBuffer> buffers;
+ private final List<ByteBuffer> buffers;
private int current;
public ByteBufferInputStream(List<ByteBuffer> buffers) {
@@ -36,6 +36,9 @@ public class ByteBufferInputStream extends InputStream {
}
/**
+ * Read a byte at this buffer
+ *
+ * @return the read value
* @throws java.io.EOFException if EOF is reached.
* @see java.io.InputStream#read()
*/
@@ -45,6 +48,11 @@ public class ByteBufferInputStream extends InputStream {
}
/**
+ * Read content of specified length
+ *
+ * @param b the content buffer
+ * @param off the offset position
+ * @param len the content length
* @throws java.io.EOFException if EOF is reached before reading all the bytes.
* @see java.io.InputStream#read(byte[], int, int)
*/
@@ -67,6 +75,7 @@ public class ByteBufferInputStream extends InputStream {
/**
* Read a buffer from the input without copying, if possible.
*
+ * @param length the need read data length
* @throws java.io.EOFException if EOF is reached before reading all the bytes.
*/
public ByteBuffer readBuffer(int length) throws IOException {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferOutputStream.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferOutputStream.java
index b51b1ad..346e55d 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferOutputStream.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/ByteBufferOutputStream.java
@@ -50,6 +50,8 @@ public class ByteBufferOutputStream extends OutputStream {
/**
* Prepend a list of ByteBuffers to this stream.
+ *
+ * @param lists need to prepended content
*/
public void prepend(List<ByteBuffer> lists) {
for (ByteBuffer buffer : lists) {
@@ -60,6 +62,8 @@ public class ByteBufferOutputStream extends OutputStream {
/**
* Append a list of ByteBuffers to this stream.
+ *
+ * @param lists need to appended content
*/
public void append(List<ByteBuffer> lists) {
for (ByteBuffer buffer : lists) {
@@ -104,6 +108,8 @@ public class ByteBufferOutputStream extends OutputStream {
/**
* Add a buffer to the output without copying, if possible.
+ *
+ * @param buffer the content need to written
*/
public void writeBuffer(ByteBuffer buffer) throws IOException {
if (buffer.remaining() < RpcConstants.RPC_MAX_BUFFER_SIZE) {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
index 868fc65..6a14446 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClient.java
@@ -70,16 +70,18 @@ public class NettyClient implements Client {
new ConcurrentHashMap<>();
private final AtomicInteger serialNoGenerator =
new AtomicInteger(0);
- private AtomicBoolean released = new AtomicBoolean(false);
+ private final AtomicBoolean released = new AtomicBoolean(false);
private NodeAddrInfo addressInfo;
- private ClientFactory clientFactory;
+ private final ClientFactory clientFactory;
private Channel channel;
- private long connectTimeout;
- private volatile AtomicBoolean closed = new AtomicBoolean(true);
+ private final long connectTimeout;
+ private final AtomicBoolean closed = new AtomicBoolean(true);
/**
- * @param clientFactory
- * @param connectTimeout
+ * Initial a netty client object
+ *
+ * @param clientFactory the client factory
+ * @param connectTimeout the connection timeout
*/
public NettyClient(ClientFactory clientFactory, long connectTimeout) {
this.clientFactory = clientFactory;
@@ -94,8 +96,10 @@ public class NettyClient implements Client {
}
/**
- * @param channel
- * @param addressInfo
+ * Set a channel
+ *
+ * @param channel the channel
+ * @param addressInfo the address of the channel
*/
public void setChannel(Channel channel, final NodeAddrInfo addressInfo) {
this.channel = channel;
@@ -226,6 +230,8 @@ public class NettyClient implements Client {
* remove clientFactory cache
* handler unfinished callbacks
* and close the channel
+ *
+ * @param removeParent whether remove the object from client factory
*/
@Override
public void close(boolean removeParent) {
@@ -275,11 +281,13 @@ public class NettyClient implements Client {
*/
public class NettyClientHandler extends SimpleChannelUpstreamHandler {
- @Override
/**
- * Invoked when a message object (e.g: {@link ChannelBuffer}) was received
- * from a remote peer.
+ * Invoked when a message object was received from a remote peer.
+ *
+ * @param ctx the channel handler context
+ * @param e the message event
*/
+ @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (e.getMessage() instanceof RpcDataPack) {
RpcDataPack dataPack = (RpcDataPack) e.getMessage();
@@ -366,8 +374,10 @@ public class NettyClient implements Client {
}
/**
- * Invoked when an exception was raised by an I/O thread or a
- * {@link ChannelHandler}.
+ * Invoked when an exception was raised by an I/O thread or a {@link ChannelHandler}.
+ *
+ * @param ctx the channel handler context
+ * @param e the exception object
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
@@ -386,11 +396,13 @@ public class NettyClient implements Client {
}
}
- @Override
/**
- * Invoked when a {@link Channel} was closed and all its related resources
- * were released.
+ * Invoked when a {@link Channel} was closed and all its related resources were released.
+ *
+ * @param ctx the channel handler context
+ * @param e the channel state event
*/
+ @Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
NettyClient.this.close();
}
@@ -401,7 +413,7 @@ public class NettyClient implements Client {
*/
public class TimeoutTask implements TimerTask {
- private int serialNo;
+ private final int serialNo;
public TimeoutTask(int serialNo) {
this.serialNo = serialNo;
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
index 8c33506..9204419 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/netty/NettyClientFactory.java
@@ -83,8 +83,8 @@ public class NettyClientFactory implements ClientFactory {
/**
* initial the network by rpc config object
*
- * @param conf
- * @throws IllegalArgumentException
+ * @param conf the configure information
+ * @throws IllegalArgumentException the exception while configuring object
*/
public void configure(final RpcConfig conf) throws IllegalArgumentException {
if (this.init.compareAndSet(false, true)) {
@@ -239,11 +239,11 @@ public class NettyClientFactory implements ClientFactory {
/**
* create a netty client
*
- * @param addressInfo
- * @param connectTimeout
- * @param conf
- * @return
- * @throws Exception
+ * @param addressInfo the remote address information
+ * @param connectTimeout the connection timeout
+ * @param conf the configure information
+ * @return the client object
+ * @throws Exception the exception while creating object.
*/
private Client createClient(final NodeAddrInfo addressInfo,
int connectTimeout, final RpcConfig conf) throws Exception {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/MixUtils.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/MixUtils.java
index cbe126e..59af704 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/MixUtils.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/MixUtils.java
@@ -25,17 +25,31 @@ import org.apache.inlong.tubemq.corerpc.protocol.RpcProtocol;
public class MixUtils {
+ /**
+ * Substitute class name prefix
+ *
+ * After TubeMQ was donated to Apache, the package prefix name was not migrated to
+ * the name of apache for a long time, and some errors on the server side were returned
+ * to the client by throwing exceptions.
+ * In order to maintain the compatibility between the previous and previous versions,
+ * the the class name in exception information is replaced through this function
+ *
+ * @param className the class name
+ * @param toOldVersion whether the client is old version
+ * @param toProtocolVer the client's protocol version
+ * @return the translated class name
+ */
public static String replaceClassNamePrefix(String className,
boolean toOldVersion,
- int toPotocolVer) {
+ int toProtocolVer) {
- if (toPotocolVer == RpcProtocol.RPC_PROTOCOL_VERSION_OLD_1) {
+ if (toProtocolVer == RpcProtocol.RPC_PROTOCOL_VERSION_OLD_1) {
if (toOldVersion) {
return className.replace("org.apache.inlong.tubemq.", "com.tencent.tubemq.");
} else {
return className.replace("com.tencent.tubemq.", "org.apache.inlong.tubemq.");
}
- } else if (toPotocolVer == RpcProtocol.RPC_PROTOCOL_VERSION_TUBEMQ) {
+ } else if (toProtocolVer == RpcProtocol.RPC_PROTOCOL_VERSION_TUBEMQ) {
if (toOldVersion) {
return className.replace("org.apache.inlong.tubemq.", "com.apache.tubemq.");
} else {
@@ -46,6 +60,12 @@ public class MixUtils {
}
}
+ /**
+ * Construct the corresponding exception object according to the exception text
+ *
+ * @param exceptionMsg the exception text
+ * @return the exception object
+ */
public static Throwable unwrapException(String exceptionMsg) {
// Perform string to exception conversion processing
try {
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/TSSLEngineUtil.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/TSSLEngineUtil.java
index c163b16..5f0f227 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/TSSLEngineUtil.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corerpc/utils/TSSLEngineUtil.java
@@ -27,19 +27,34 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
+/**
+ * SSL-related general function classes
+ */
public class TSSLEngineUtil {
+ /**
+ * Create a SSL engine
+ *
+ * @param keyStoreStream the key-store stream
+ * @param keyStorePassword the key-sored password
+ * @param trustStoreStream the trust-store stream
+ * @param trustStorePassword the trust-store password
+ * @param isClientMode whether client mode
+ * @param needTwoWayAuth Whether require two-way authentication
+ * @return the SSL engine
+ * @throws Exception the exception information while creating
+ */
public static SSLEngine createSSLEngine(InputStream keyStoreStream,
String keyStorePassword,
InputStream trustStoreStream,
String trustStorePassword,
boolean isClientMode,
- boolean needTwyWayAuth)
+ boolean needTwoWayAuth)
throws Exception {
KeyManagerFactory kmf = null;
TrustManagerFactory tmf = null;
- if (isClientMode || needTwyWayAuth) {
+ if (isClientMode || needTwoWayAuth) {
KeyStore ts = KeyStore.getInstance("JKS");
try {
ts.load(trustStoreStream, trustStorePassword.toCharArray());
@@ -52,7 +67,7 @@ public class TSSLEngineUtil {
}
}
- if (!isClientMode || needTwyWayAuth) {
+ if (!isClientMode || needTwoWayAuth) {
KeyStore ks = KeyStore.getInstance("JKS");
try {
ks.load(keyStoreStream, keyStorePassword.toCharArray());
@@ -70,20 +85,32 @@ public class TSSLEngineUtil {
tmf == null ? null : tmf.getTrustManagers(), null);
SSLEngine sslEngine = serverContext.createSSLEngine();
sslEngine.setUseClientMode(isClientMode);
- sslEngine.setNeedClientAuth(needTwyWayAuth);
+ sslEngine.setNeedClientAuth(needTwoWayAuth);
return sslEngine;
}
+ /**
+ * Create a SSL engine
+ *
+ * @param keyStorePath the key-store file path
+ * @param trustStorePath the trust-store file path
+ * @param keyStorePassword the key-store password
+ * @param trustStorePassword the trust-store password
+ * @param isClientMode whether client mode
+ * @param needTwoWayAuth Whether require two-way authentication
+ * @return the SSL engine
+ * @throws Exception the exception information while creating
+ */
public static SSLEngine createSSLEngine(String keyStorePath, String trustStorePath,
String keyStorePassword, String trustStorePassword,
- boolean isClientMode, boolean needTwyWayAuth)
+ boolean isClientMode, boolean needTwoWayAuth)
throws Exception {
InputStream keyStoreStream = new FileInputStream(new File(keyStorePath));
InputStream trustStoreStream = new FileInputStream(new File(trustStorePath));
return createSSLEngine(keyStoreStream, keyStorePassword, trustStoreStream,
- trustStorePassword, isClientMode, needTwyWayAuth);
+ trustStorePassword, isClientMode, needTwoWayAuth);
}
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index f9f195a..16da439 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -125,7 +125,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
this.storeManager = tubeBroker.getStoreManager();
this.offsetManager = tubeBroker.getOffsetManager();
this.serverAuthHandler = tubeBroker.getServerAuthHandler();
- ServiceStatusHolder.setStatisParameters(tubeConfig.getAllowedReadIOExcptCnt(),
+ ServiceStatusHolder.setStatsParameters(tubeConfig.getAllowedReadIOExcptCnt(),
tubeConfig.getAllowedWriteIOExcptCnt(), tubeConfig.getIoExcptStatsDurationMs());
this.putCounterGroup = new TrafficStatsService("PutCounterGroup", "Producer", 60 * 1000);
this.getCounterGroup = new TrafficStatsService("GetCounterGroup", "Consumer", 60 * 1000);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
index 95e4430..6e6806f 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
@@ -46,7 +46,7 @@ public class BrokerDefMetadata {
// expire policy.
private String deletePolicy = "delete,168h";
// the max cache size for topic.
- private int memCacheMsgSize = 1 * 1024 * 1024;
+ private int memCacheMsgSize = 1024 * 1024;
// the max cache message count for topic.
private int memCacheMsgCnt = 5 * 1024;
// the max interval(milliseconds) that topic's memory cache will flush to disk.
@@ -56,9 +56,10 @@ public class BrokerDefMetadata {
}
- /*
- * Build BrokerDefMetadata from brokerDefMetaConfInfo, each segment is separated by ':'.
- * brokerDefMetaConfInfo is often generated by Master service. Only used for inner communication.
+ /**
+ * Initial broker meta-data object by configure info
+ *
+ * @param brokerDefMetaConfInfo the broker configure information.
*/
public BrokerDefMetadata(String brokerDefMetaConfInfo) {
if (TStringUtils.isBlank(brokerDefMetaConfInfo)) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
index 3444daf..487b95e 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
@@ -61,6 +61,16 @@ public class ConsumerNodeInfo {
new AtomicInteger(TBaseConstants.META_VALUE_UNDEFINED);
private long createTime = System.currentTimeMillis();
+ /**
+ * Initial consumer node information
+ *
+ * @param storeManager the store manager
+ * @param consumerId the consumer id
+ * @param filterCodes the filter condition items
+ * @param sessionKey the session key
+ * @param sessionTime the session create time
+ * @param partStr the partition information
+ */
public ConsumerNodeInfo(final MessageStoreManager storeManager,
final String consumerId, Set<String> filterCodes,
final String sessionKey, long sessionTime, final String partStr) {
@@ -68,6 +78,18 @@ public class ConsumerNodeInfo {
filterCodes, sessionKey, sessionTime, false, partStr);
}
+ /**
+ * Initial consumer node information
+ *
+ * @param storeManager the store manager
+ * @param qryPriorityId the query priority id
+ * @param consumerId the consumer id
+ * @param filterCodes the filter condition items
+ * @param sessionKey the session key
+ * @param sessionTime the session create time
+ * @param isSupportLimit whether to support limited consumption function
+ * @param partStr the partition information
+ */
public ConsumerNodeInfo(final MessageStoreManager storeManager,
final int qryPriorityId, final String consumerId,
Set<String> filterCodes, final String sessionKey,
@@ -92,7 +114,16 @@ public class ConsumerNodeInfo {
this.isSupportLimit = isSupportLimit;
}
- // #lizard forgives
+ /**
+ * Query the current allowed maximum consumption size
+ *
+ * @param storeKey the store block key
+ * @param flowCtrlRuleHandler the flow-control rule handler
+ * @param currMaxDataOffset the current max data offset
+ * @param maxMsgTransferSize the max message transfer size
+ * @param isEscFlowCtrl whether need escape flow-control process
+ * @return the allowed consumption size
+ */
public int getCurrentAllowedSize(final String storeKey,
final FlowCtrlRuleHandler flowCtrlRuleHandler,
final long currMaxDataOffset, int maxMsgTransferSize,
@@ -229,10 +260,10 @@ public class ConsumerNodeInfo {
/**
* Recalculate message limit value.
*
- * @param curDataDlt
- * @param currTime
- * @param maxMsgTransferSize
- * @param flowCtrlRuleHandler
+ * @param curDataDlt current data lag
+ * @param currTime current time
+ * @param maxMsgTransferSize the max message transfer size
+ * @param flowCtrlRuleHandler the flow-control rule handler
*/
private void recalcMsgLimitValue(long curDataDlt, long currTime, int maxMsgTransferSize,
final FlowCtrlRuleHandler flowCtrlRuleHandler) {
@@ -257,5 +288,4 @@ public class ConsumerNodeInfo {
currTime + TBaseConstants.CFG_FC_MAX_SAMPLING_PERIOD;
}
}
-
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/OffsetStorageInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/OffsetStorageInfo.java
index adb0959..310b97c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/OffsetStorageInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/OffsetStorageInfo.java
@@ -25,19 +25,38 @@ import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
public class OffsetStorageInfo implements Serializable {
private static final long serialVersionUID = -4232003748320500757L;
- private String topic;
- private int brokerId;
- private int partitionId;
- private AtomicLong offset = new AtomicLong(0);
+ private final String topic;
+ private final int brokerId;
+ private final int partitionId;
+ private final AtomicLong offset = new AtomicLong(0);
private long messageId;
private boolean firstCreate = false;
private boolean modified = false;
+ /**
+ * Initial offset storage information
+ *
+ * @param topic the topic name
+ * @param brokerId the broker id
+ * @param partitionId the partition id
+ * @param offset the offset
+ * @param messageId the message id
+ */
public OffsetStorageInfo(String topic, int brokerId, int partitionId,
long offset, long messageId) {
this(topic, brokerId, partitionId, offset, messageId, true);
}
+ /**
+ * Initial offset storage information
+ *
+ * @param topic the topic name
+ * @param brokerId the broker id
+ * @param partitionId the partition id
+ * @param offset the offset
+ * @param messageId the message id
+ * @param firstCreate whether is the first record creation
+ */
public OffsetStorageInfo(String topic, int brokerId, int partitionId,
long offset, long messageId, boolean firstCreate) {
this.topic = topic;
@@ -126,7 +145,7 @@ public class OffsetStorageInfo implements Serializable {
if (!topic.equals(that.topic)) {
return false;
}
- return offset != null ? offset.equals(that.offset) : that.offset == null;
+ return (offset.get() == that.offset.get());
}
@@ -135,7 +154,7 @@ public class OffsetStorageInfo implements Serializable {
int result = topic.hashCode();
result = 31 * result + brokerId;
result = 31 * result + partitionId;
- result = 31 * result + (offset != null ? offset.hashCode() : 0);
+ result = 31 * result + offset.hashCode();
result = 31 * result + (int) (messageId ^ (messageId >>> 32));
result = 31 * result + (firstCreate ? 1 : 0);
result = 31 * result + (modified ? 1 : 0);
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
index c19b99e..70aaa6c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/ZkOffsetStorage.java
@@ -65,9 +65,16 @@ public class ZkOffsetStorage implements OffsetStorage {
private final boolean isBroker;
private final int brokerId;
private final String strBrokerId;
- private ZKConfig zkConfig;
+ private final ZKConfig zkConfig;
private ZooKeeperWatcher zkw;
+ /**
+ * Initial ZooKeeper offset storage ojbect
+ *
+ * @param zkConfig the ZooKeeper configure
+ * @param isBroker whether used in broker node
+ * @param brokerId the broker id
+ */
public ZkOffsetStorage(final ZKConfig zkConfig, boolean isBroker, int brokerId) {
this.zkConfig = zkConfig;
this.isBroker = isBroker;
@@ -302,9 +309,8 @@ public class ZkOffsetStorage implements OffsetStorage {
/**
* Get offset stored in zookeeper, if not found or error, set null
- * <p/>
*
- * @return partitionId--offset map info
+ * @param groupTopicPartMap the group topic-partition map
*/
@Override
public void deleteGroupOffsetInfo(
@@ -371,5 +377,4 @@ public class ZkOffsetStorage implements OffsetStorage {
return root;
}
}
-
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java
index 49b3eb3..8ea1de1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/offsetstorage/zookeeper/ZooKeeperWatcher.java
@@ -431,6 +431,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
}
/**
+ * Get the active Zookeeper address
+ *
* @return Path to the currently active master.
*/
public String getMasterAddressZNode() {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Bytes.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Bytes.java
index 51e3627..ae467e5 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Bytes.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Bytes.java
@@ -39,6 +39,8 @@ public class Bytes {
private static final Logger logger = LoggerFactory.getLogger(Bytes.class);
/**
+ * Compare two arrays content.
+ *
* @param left left operand
* @param right right operand
* @return 0 if equal, < 0 if left is less than right, etc.
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java
index 72f0773..8819418 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/ClientSyncInfo.java
@@ -40,6 +40,12 @@ public class ClientSyncInfo {
}
+ /**
+ * Update the client reported subscription information
+ *
+ * @param brokerRunManager the broker run-manager
+ * @param clientSubRepInfo the client reported subscription information
+ */
public void updSubRepInfo(BrokerRunManager brokerRunManager,
ClientMaster.ClientSubRepInfo clientSubRepInfo) {
if (clientSubRepInfo == null) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/FileUtil.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/FileUtil.java
index fe5f4c5..c9dc001 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/FileUtil.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/FileUtil.java
@@ -29,6 +29,13 @@ public class FileUtil {
return dir.delete();
}
+ /**
+ * Delete the contents of files and subdirectories in the specified directory
+ *
+ * @param dir the specified directory
+ * @return the deleted count
+ * @throws IOException the exception while deleting contents
+ */
public static boolean fullyDeleteContents(File dir) throws IOException {
boolean deletionSucceeded = true;
File[] contents = dir.listFiles();
@@ -67,5 +74,4 @@ public class FileUtil {
.append(dir.getAbsolutePath()).toString());
}
}
-
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/HttpUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/HttpUtils.java
index 0d9afc2..8716fe1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/HttpUtils.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/HttpUtils.java
@@ -47,7 +47,12 @@ public class HttpUtils {
private static final Logger logger =
LoggerFactory.getLogger(HttpUtils.class);
- /* Send request to target server. */
+ /**
+ * Send request to target server.
+ *
+ * @param url the target url
+ * @param inParamMap the parameter map
+ */
public static JsonObject requestWebService(String url,
Map<String, String> inParamMap) throws Exception {
if (url == null) {
@@ -120,17 +125,20 @@ public class HttpUtils {
return jsonRes;
}
+ /**
+ * Test scenario:
+ * simulate where there are multiple Master nodes in the cluster,
+ * and there are nodes that do not take effect
+ * Call url:
+ * http://127.0.0.1:8080/webapi.htm?method=admin_query_topic_info
+ * Request parameters:
+ * topicName=test_1, brokerId=170399798
+ * Master nodes:
+ * 10.54.55.32:8080(invalid node),127.0.0.1:8080(valid node)
+ *
+ * @param args the call arguments
+ */
public static void main(String[] args) {
- /** Test scenario:
- * simulate where there are multiple Master nodes in the cluster,
- * and there are nodes that do not take effect
- * Call url:
- * http://127.0.0.1:8080/webapi.htm?method=admin_query_topic_info
- * Request parameters:
- * topicName=test_1, brokerId=170399798
- * Master nodes:
- * 10.54.55.32:8080(invalid node),127.0.0.1:8080(valid node)
- */
Map<String, String> inParamMap = new HashMap<>();
inParamMap.put("topicName", "test_1");
inParamMap.put("brokerId", "170399798");
@@ -157,5 +165,4 @@ public class HttpUtils {
System.out.println("query result is " + jsonRes.toString());
}
}
-
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/RowLock.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/RowLock.java
index ce15415..6b32ab6 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/RowLock.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/RowLock.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
*/
public class RowLock {
private static final Logger logger = LoggerFactory.getLogger(RowLock.class);
- private static Random rand = new Random();
+ private static final Random rand = new Random();
private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
@@ -103,6 +103,11 @@ public class RowLock {
}
}
+ /**
+ * Release row lock
+ *
+ * @param lockId the lock id
+ */
public void releaseRowLock(final Integer lockId) {
if (lockId == null) {
return;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Sleeper.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Sleeper.java
index 3658857..5679b28 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Sleeper.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/Sleeper.java
@@ -34,9 +34,11 @@ public class Sleeper {
private final int period;
private final Stoppable stopper;
private final Object sleepLock = new Object();
- private AtomicBoolean triggerWake = new AtomicBoolean(false);
+ private final AtomicBoolean triggerWake = new AtomicBoolean(false);
/**
+ * Initial a Sleeper object
+ *
* @param sleep sleep time in milliseconds
* @param stopper When {@link Stoppable#isStopped()} is true, this thread will cleanup and exit
* cleanly.
@@ -65,10 +67,9 @@ public class Sleeper {
}
/**
- * Sleep for period adjusted by passed <code>startTime<code>
+ * Sleep for period adjusted by passed startTime
*
- * @param startTime Time some task started previous to now. Time to sleep will be docked current
- * time minus passed <code>startTime<code>.
+ * @param startTime Time some task started previous to now.
*/
public void sleep(final long startTime) {
if (this.stopper.isStopped()) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index bbc7831..da6a876 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -1863,7 +1863,13 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
return result;
}
- // process unReset group balance
+ /**
+ * process unReset group balance
+ *
+ * @param rebalanceId the re-balance id
+ * @param isFirstReb whether is first re-balance
+ * @param groups the need re-balance group set
+ */
public void processRebalance(long rebalanceId, boolean isFirstReb, List<String> groups) {
// #lizard forgives
Map<String, Map<String, List<Partition>>> finalSubInfoMap = null;
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
index 95a2aef..2d6be25 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbBrokerConfEntity.java
@@ -64,7 +64,28 @@ public class BdbBrokerConfEntity implements Serializable {
public BdbBrokerConfEntity() {
}
- //Constructor
+ /**
+ * Build a broker configure entity
+ *
+ * @param brokerId the broker id
+ * @param brokerIp the broker ip
+ * @param brokerPort the broker port
+ * @param numPartitions the number of partition
+ * @param unflushThreshold the un-flushed message count
+ * @param unflushInterval the un-flushed time delta
+ * @param deleteWhen the delete time
+ * @param deletePolicy the delete policy
+ * @param manageStatus the manage status
+ * @param acceptPublish whether accept publish
+ * @param acceptSubscribe whether accept subscribe
+ * @param attributes the attribute information
+ * @param isConfDataUpdated whether the configure is updated
+ * @param isBrokerLoaded whether the broker has loaded
+ * @param createUser the creator
+ * @param createDate the create date
+ * @param modifyUser the modifier
+ * @param modifyDate the modify date
+ */
public BdbBrokerConfEntity(final int brokerId, final String brokerIp,
final int brokerPort, final int numPartitions,
final int unflushThreshold, final int unflushInterval,
@@ -97,6 +118,9 @@ public class BdbBrokerConfEntity implements Serializable {
/**
* Serialize config field to json format
+ *
+ * @param sb string buffer
+ * @return the content in json format
*/
public StringBuilder toJsonString(final StringBuilder sb) {
return sb.append("{\"type\":\"BdbBrokerConfEntity\",")
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
index 5fddada..ded8cb1 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbClusterSettingEntity.java
@@ -73,7 +73,31 @@ public class BdbClusterSettingEntity implements Serializable {
public BdbClusterSettingEntity() {
}
- //Constructor
+ /**
+ * Build cluster setting entity
+ *
+ * @param recordKey the record key
+ * @param configId the configure id
+ * @param brokerPort the broker port
+ * @param brokerTLSPort the broker TLS port
+ * @param brokerWebPort the broker web port
+ * @param numTopicStores the number of topic store
+ * @param numPartitions the number of partition
+ * @param unflushThreshold the un-flushed message count
+ * @param unflushInterval the un-flushed time delta
+ * @param unflushDataHold the un-flushed data size
+ * @param memCacheMsgCntInK the memory cached message count
+ * @param memCacheFlushIntvl the memory cached time delta
+ * @param memCacheMsgSizeInMB the memory cached message size
+ * @param acceptPublish whether accept publish
+ * @param acceptSubscribe whether accept subscribe
+ * @param deletePolicy the delete policy
+ * @param qryPriorityId the query priority id
+ * @param maxMsgSizeInB the default message max size
+ * @param attributes the attribute information
+ * @param modifyUser the modifier
+ * @param modifyDate the modify date
+ */
public BdbClusterSettingEntity(String recordKey, long configId, int brokerPort,
int brokerTLSPort, int brokerWebPort,
int numTopicStores, int numPartitions,
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
index 90b9ac1..f435497 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/bdbstore/bdbentitys/BdbTopicConfEntity.java
@@ -60,7 +60,27 @@ public class BdbTopicConfEntity implements Serializable {
public BdbTopicConfEntity() {
}
- //Constructor
+ /**
+ * Build topic configure entity
+ *
+ * @param brokerId the broker id
+ * @param brokerIp the broker ip
+ * @param brokerPort the broker port
+ * @param topicName the topic name
+ * @param numPartitions the number of partition
+ * @param unflushThreshold the un-flushed message count
+ * @param unflushInterval the un-flushed time delta
+ * @param deleteWhen the delete time
+ * @param deletePolicy the delete policy
+ * @param acceptPublish whether accept publish
+ * @param acceptSubscribe whether accept subscribe
+ * @param numTopicStores the number of topic store
+ * @param attributes the attribute information
+ * @param createUser the creator
+ * @param createDate the create date
+ * @param modifyUser the modifier
+ * @param modifyDate the modify date
+ */
public BdbTopicConfEntity(final int brokerId, final String brokerIp,
final int brokerPort, final String topicName,
final int numPartitions, final int unflushThreshold,
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
index b7c8934..f7f16f4 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/MetaDataManager.java
@@ -922,6 +922,19 @@ public class MetaDataManager implements Server {
// ////////////////////////////////////////////////////////////////////////////
+ /**
+ * Add or update topic deploy information
+ *
+ * @param isAddOp whether add operation
+ * @param opEntity the operation information
+ * @param brokerId the broker id
+ * @param topicName the topic name
+ * @param deployStatus the deploy status
+ * @param topicPropInfo the topic property set
+ * @param sBuffer the string buffer
+ * @param result the process result
+ * @return true if success otherwise false
+ */
public TopicProcessResult addOrUpdTopicDeployInfo(boolean isAddOp, BaseEntity opEntity,
int brokerId, String topicName,
TopicStatus deployStatus,
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
index b9823a6..65b8281 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
@@ -95,6 +95,11 @@ public class ConsumeGroupInfo {
private final AtomicLong lastMetaInfoFreshTime =
new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+ /**
+ * Initial a Consume group node information.
+ *
+ * @param consumer the consumer of consume group.
+ */
public ConsumeGroupInfo(ConsumerInfo consumer) {
this.groupName = consumer.getGroupName();
this.consumeType = consumer.getConsumeType();
@@ -107,6 +112,9 @@ public class ConsumeGroupInfo {
* Add consumer to consume group
*
* @param inConsumer consumer object
+ * @param sBuffer the string buffer
+ * @param result the process result
+ * @return whether the addition is successful
*/
public boolean addConsumer(ConsumerInfo inConsumer,
StringBuilder sBuffer,
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
index 4fbcedf..4828e98 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfo.java
@@ -50,6 +50,21 @@ public class ConsumerInfo implements Comparable<ConsumerInfo>, Serializable {
private long lstAssignedTime = TBaseConstants.META_VALUE_UNDEFINED;
private long usedTopicMetaInfoId = TBaseConstants.META_VALUE_UNDEFINED;
+ /**
+ * Initial Consumer node information
+ *
+ * @param consumerId the consumer id
+ * @param overTLS whether to communicate via TLS
+ * @param group the group name of the consumer
+ * @param topicSet the topic set subscribed
+ * @param topicConditions the topic filter condition set
+ * @param consumeType the consume type
+ * @param sessionKey the session key
+ * @param startTime the start time
+ * @param sourceCount the minimum consumer count of consume group
+ * @param selectedBig whether to choose a larger value if there is a conflict
+ * @param requiredPartition the required partitions
+ */
public ConsumerInfo(String consumerId, boolean overTLS, String group,
Set<String> topicSet, Map<String, TreeSet<String>> topicConditions,
ConsumeType consumeType, String sessionKey, long startTime,
@@ -73,6 +88,20 @@ public class ConsumerInfo implements Comparable<ConsumerInfo>, Serializable {
this.consumerViewInfo = toString();
}
+ /**
+ * Initial Consumer node information
+ *
+ * @param consumerId the consumer id
+ * @param overTLS whether to communicate via TLS
+ * @param group the group name of the consumer
+ * @param consumeType the consume type
+ * @param sourceCount the minimum consumer count of consume group
+ * @param nodeId the node id
+ * @param topicSet the topic set subscribed
+ * @param topicConditions the topic filter condition set
+ * @param curCsmCtrlId the node's consume control id
+ * @param syncInfo the consumer report information
+ */
public ConsumerInfo(String consumerId, boolean overTLS, String group,
ConsumeType consumeType, int sourceCount, int nodeId,
Set<String> topicSet, Map<String, TreeSet<String>> topicConditions,
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/common/BaseResult.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/common/BaseResult.java
index 024098b..a34fa96 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/common/BaseResult.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/common/BaseResult.java
@@ -20,10 +20,8 @@ package org.apache.inlong.tubemq.server.master.web.common;
import java.io.Serializable;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
@@ -476,42 +474,6 @@ public class BaseResult implements Serializable {
return new String[0];
}
- public List getTaobaoSlider() {
- List l = new ArrayList(10);
- int leftStart = 1;
- int leftEnd = 2;
- int mStart = this.getCurrentPage().intValue() - 2;
- int mEnd = this.getCurrentPage().intValue() + 2;
- int rStart = this.getTotalPage() - 1;
- int rEnd = this.getTotalPage();
- if (mStart <= leftEnd) {
- leftStart = 0;
- leftEnd = 0;
- mStart = 1;
- }
- if (mEnd >= rStart) {
- rStart = 0;
- rEnd = 0;
- mEnd = this.getTotalPage();
- }
- if (leftEnd > leftStart) {
- for (int i = leftStart; i <= leftEnd; ++i) {
- l.add(String.valueOf(i));
- }
- l.add("...");
- }
- for (int i = mStart; i <= mEnd; ++i) {
- l.add(String.valueOf(i));
- }
- if (rEnd > rStart) {
- l.add("...");
- for (int i = rStart; i <= rEnd; ++i) {
- l.add(String.valueOf(i));
- }
- }
- return l;
- }
-
/**
* Get ajax prefix value
*
@@ -645,6 +607,8 @@ public class BaseResult implements Serializable {
}
/**
+ * Set escape js status
+ *
* @param jsEscape The jsEscape to set.
*/
public final BaseResult setJsEscape(boolean jsEscape) {
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/simplemvc/RequestDispatcher.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/simplemvc/RequestDispatcher.java
index fa387af..523733c 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/simplemvc/RequestDispatcher.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/simplemvc/RequestDispatcher.java
@@ -89,6 +89,13 @@ public class RequestDispatcher {
executeTarget(context, context.getTarget(), TYPE_LAYOUT);
}
+ /**
+ * Execute required method service
+ * @param context the context
+ * @param target the target information
+ * @param type the operation type
+ * @throws Exception the exception
+ */
public void executeTarget(RequestContext context,
String target, String type) throws Exception {
String targetKey = getActionKey(type, target);
@@ -162,6 +169,11 @@ public class RequestDispatcher {
.append("/").append(target).append(".vm").toString();
}
+ /**
+ * Get layout information
+ * @param target the target information
+ * @return the layout information
+ */
public String getLayout(String target) {
String layout = null;
String[] targetPaths = target.split("/");