You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2021/11/18 02:04:11 UTC
[incubator-inlong] branch master updated: [INLONG-1805]Add ClientBalanceConsumer class implementation (#1806)
This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 20aa154 [INLONG-1805]Add ClientBalanceConsumer class implementation (#1806)
20aa154 is described below
commit 20aa154fa51c5e3acdba060a37af9fd9f0196cec
Author: gosonzhang <46...@qq.com>
AuthorDate: Thu Nov 18 10:04:03 2021 +0800
[INLONG-1805]Add ClientBalanceConsumer class implementation (#1806)
---
.../client/consumer/BaseMessageConsumer.java | 200 +--
.../tubemq/client/consumer/ClientSubInfo.java | 14 +
.../tubemq/client/consumer/RmtDataCache.java | 613 ++++++-
.../consumer/SimpleClientBalanceConsumer.java | 1712 ++++++++++++++++++++
.../client/factory/TubeBaseSessionFactory.java | 13 +-
.../client/factory/TubeMultiSessionFactory.java | 5 +-
.../client/factory/TubeSingleSessionFactory.java | 5 +-
.../tubemq/client/consumer/RmtDataCacheTest.java | 9 +-
.../example/ClientBalanceConsumerExample.java | 261 +++
9 files changed, 2645 insertions(+), 187 deletions(-)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
index 4386004..f59e49c 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
@@ -53,7 +53,6 @@ import org.apache.inlong.tubemq.corebase.balance.EventType;
import org.apache.inlong.tubemq.corebase.cluster.BrokerInfo;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.corebase.cluster.SubscribeInfo;
-import org.apache.inlong.tubemq.corebase.policies.FlowCtrlRuleHandler;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
import org.apache.inlong.tubemq.corebase.utils.AddressUtils;
@@ -92,12 +91,6 @@ public class BaseMessageConsumer implements MessageConsumer {
private final BlockingQueue<ConsumerEvent> rebalanceResults =
new ArrayBlockingQueue<>(REBALANCE_QUEUE_SIZE);
// flowctrl
- private boolean isCurGroupCtrl = false;
- private AtomicLong lastCheckTime = new AtomicLong(0);
- private final FlowCtrlRuleHandler groupFlowCtrlRuleHandler =
- new FlowCtrlRuleHandler(false);
- private final FlowCtrlRuleHandler defFlowCtrlRuleHandler =
- new FlowCtrlRuleHandler(true);
private final ConsumerSamplePrint samplePrintCtrl =
new ConsumerSamplePrint();
private final RpcConfig rpcConfig = new RpcConfig();
@@ -121,9 +114,6 @@ public class BaseMessageConsumer implements MessageConsumer {
private int rebalanceRetryTimes = 0;
private long lastHeartbeatTime2Master = 0;
private long lastHeartbeatTime2Broker = 0;
- private AtomicBoolean nextWithAuthInfo2M = new AtomicBoolean(false);
- private ConcurrentHashMap<Integer, AtomicBoolean> nextWithAuthInfo2BMap
- = new ConcurrentHashMap<Integer, AtomicBoolean>();
/**
* Construct a BaseMessageConsumer object.
@@ -151,8 +141,7 @@ public class BaseMessageConsumer implements MessageConsumer {
throw new TubeClientException("Get consumer id failed!", e);
}
this.rmtDataCache =
- new RmtDataCache(this.defFlowCtrlRuleHandler,
- this.groupFlowCtrlRuleHandler, null);
+ new RmtDataCache(this.consumerConfig, null);
this.rpcServiceFactory =
this.sessionFactory.getRpcServiceFactory();
this.rpcConfig.put(RpcConstants.CONNECT_TIMEOUT, 3000);
@@ -714,11 +703,9 @@ public class BaseMessageConsumer implements MessageConsumer {
for (SubscribeInfo info : subscribeInfoList) {
BrokerInfo broker = new BrokerInfo(info.getBrokerId(), info.getHost(), info.getPort());
Partition partition = new Partition(broker, info.getTopic(), info.getPartitionId());
- List<Partition> curPartList = registerInfoMap.get(broker);
- if (curPartList == null) {
- curPartList = new ArrayList<>();
- registerInfoMap.put(broker, curPartList);
- }
+ List<Partition> curPartList =
+ registerInfoMap.computeIfAbsent(
+ broker, k -> new ArrayList<>());
if (!curPartList.contains(partition)) {
curPartList.add(partition);
}
@@ -779,7 +766,7 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setClientId(this.consumerId);
builder.setGroupName(this.consumerConfig.getConsumerGroup());
builder.setTopicName(partition.getTopic());
- builder.setEscFlowCtrl(isCurGroupCtrl());
+ builder.setEscFlowCtrl(rmtDataCache.isCurGroupInFlowCtrl());
builder.setPartitionId(partition.getPartitionId());
builder.setLastPackConsumed(isLastConsumed);
builder.setManualCommitOffset(false);
@@ -945,9 +932,9 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setGroupName(this.consumerConfig.getConsumerGroup());
builder.setSessionTime(this.consumeSubInfo.getSubscribedTime());
builder.addAllTopicList(this.consumeSubInfo.getSubscribedTopics());
- builder.setDefFlowCheckId(defFlowCtrlRuleHandler.getFlowCtrlId());
- builder.setGroupFlowCheckId(groupFlowCtrlRuleHandler.getFlowCtrlId());
- builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
+ builder.setDefFlowCheckId(rmtDataCache.getDefFlowCtrlId());
+ builder.setQryPriorityId(rmtDataCache.getQryPriorityId());
+ builder.setGroupFlowCheckId(rmtDataCache.getGroupFlowCtrlId());
List<SubscribeInfo> subInfoList =
this.rmtDataCache.getSubscribeInfoList(consumerId,
this.consumerConfig.getConsumerGroup());
@@ -962,9 +949,9 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setRequiredPartition(this.consumeSubInfo.getRequiredPartition());
builder.setNotAllocated(this.consumeSubInfo.getIsNotAllocated());
}
- ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = genMasterCertificateInfo(true);
- if (authInfoBuilder != null) {
- builder.setAuthInfo(authInfoBuilder.build());
+ ClientMaster.MasterCertificateInfo authInfo = genMasterCertificateInfo(true);
+ if (authInfo != null) {
+ builder.setAuthInfo(authInfo);
}
return builder.build();
}
@@ -1003,9 +990,9 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setClientId(this.consumerId);
builder.setGroupName(this.consumerConfig.getConsumerGroup());
builder.setReportSubscribeInfo(reportSubscribeInfo);
- builder.setDefFlowCheckId(defFlowCtrlRuleHandler.getFlowCtrlId());
- builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
- builder.setGroupFlowCheckId(groupFlowCtrlRuleHandler.getFlowCtrlId());
+ builder.setDefFlowCheckId(rmtDataCache.getDefFlowCtrlId());
+ builder.setQryPriorityId(rmtDataCache.getQryPriorityId());
+ builder.setGroupFlowCheckId(rmtDataCache.getGroupFlowCtrlId());
if (event != null) {
ClientMaster.EventProto.Builder eventProtoBuilder =
ClientMaster.EventProto.newBuilder();
@@ -1020,9 +1007,9 @@ public class BaseMessageConsumer implements MessageConsumer {
if (subInfoList != null) {
builder.addAllSubscribeInfo(DataConverterUtil.formatSubInfo(subInfoList));
}
- ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = genMasterCertificateInfo(false);
- if (authInfoBuilder != null) {
- builder.setAuthInfo(authInfoBuilder);
+ ClientMaster.MasterCertificateInfo authInfo = genMasterCertificateInfo(false);
+ if (authInfo != null) {
+ builder.setAuthInfo(authInfo);
}
return builder.build();
}
@@ -1032,9 +1019,9 @@ public class BaseMessageConsumer implements MessageConsumer {
ClientMaster.CloseRequestC2M.newBuilder();
builder.setClientId(this.consumerId);
builder.setGroupName(this.consumerConfig.getConsumerGroup());
- ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = genMasterCertificateInfo(true);
- if (authInfoBuilder != null) {
- builder.setAuthInfo(authInfoBuilder);
+ ClientMaster.MasterCertificateInfo authInfo = genMasterCertificateInfo(true);
+ if (authInfo != null) {
+ builder.setAuthInfo(authInfo);
}
return builder.build();
}
@@ -1047,7 +1034,7 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setOpType(RpcConstants.MSG_OPTYPE_REGISTER);
builder.setTopicName(partition.getTopic());
builder.setPartitionId(partition.getPartitionId());
- builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
+ builder.setQryPriorityId(rmtDataCache.getQryPriorityId());
builder.setReadStatus(getGroupInitReadStatus(rmtDataCache.bookPartition(partition.getPartitionKey())));
TopicProcessor topicProcessor =
this.consumeSubInfo.getTopicProcessor(partition.getTopic());
@@ -1062,11 +1049,7 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setCurrOffset(currOffset);
}
}
- ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
- genBrokerAuthenticInfo(partition.getBrokerId(), false);
- if (authInfoBuilder != null) {
- builder.setAuthInfo(authInfoBuilder);
- }
+ builder.setAuthInfo(genBrokerAuthenticInfo(partition.getBrokerId(), false));
return builder.build();
}
@@ -1084,11 +1067,7 @@ public class BaseMessageConsumer implements MessageConsumer {
} else {
builder.setReadStatus(1);
}
- ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
- genBrokerAuthenticInfo(partition.getBrokerId(), true);
- if (authInfoBuilder != null) {
- builder.setAuthInfo(authInfoBuilder);
- }
+ builder.setAuthInfo(genBrokerAuthenticInfo(partition.getBrokerId(), true));
return builder.build();
}
@@ -1099,13 +1078,9 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setClientId(consumerId);
builder.setGroupName(this.consumerConfig.getConsumerGroup());
builder.setReadStatus(getGroupInitReadStatus(false));
- builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
+ builder.setQryPriorityId(rmtDataCache.getQryPriorityId());
builder.addAllPartitionInfo(partitionList);
- ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
- genBrokerAuthenticInfo(brokerId, false);
- if (authInfoBuilder != null) {
- builder.setAuthInfo(authInfoBuilder);
- }
+ builder.setAuthInfo(genBrokerAuthenticInfo(brokerId, false));
return builder.build();
}
@@ -1113,29 +1088,7 @@ public class BaseMessageConsumer implements MessageConsumer {
if (response.hasNotAllocated() && !response.getNotAllocated()) {
consumeSubInfo.compareAndSetIsNotAllocated(true, false);
}
- if (response.hasGroupFlowCheckId()) {
- final int qryPriorityId = response.hasQryPriorityId()
- ? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
- if (response.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) {
- try {
- defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
- response.getDefFlowCheckId(), response.getDefFlowControlInfo());
- } catch (Exception e1) {
- logger.warn("[Register response] found parse default flowCtrl rules failure", e1);
- }
- }
- if (response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
- try {
- groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
- response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
- } catch (Exception e1) {
- logger.warn("[Register response] found parse group flowCtrl rules failure", e1);
- }
- }
- if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
- groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
- }
- }
+ rmtDataCache.updFlowCtrlInfoInfo(response);
}
private void processRegAuthorizedToken(ClientMaster.RegisterResponseM2C response) {
@@ -1148,87 +1101,39 @@ public class BaseMessageConsumer implements MessageConsumer {
if (response.hasNotAllocated() && !response.getNotAllocated()) {
consumeSubInfo.compareAndSetIsNotAllocated(true, false);
}
- if (response.hasGroupFlowCheckId()) {
- final int qryPriorityId = response.hasQryPriorityId()
- ? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
- if (response.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) {
- try {
- defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
- response.getDefFlowCheckId(), response.getDefFlowControlInfo());
- } catch (Exception e1) {
- logger.warn(
- "[Heartbeat response] found parse default flowCtrl rules failure", e1);
- }
- }
- if (response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
- try {
- groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
- response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
- } catch (Exception e1) {
- logger.warn(
- "[Heartbeat response] found parse group flowCtrl rules failure", e1);
- }
- }
- if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
- groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
- }
- }
+ rmtDataCache.updFlowCtrlInfoInfo(response);
}
- private ClientMaster.MasterCertificateInfo.Builder genMasterCertificateInfo(boolean force) {
- boolean needAdd = false;
+ private ClientMaster.MasterCertificateInfo genMasterCertificateInfo(boolean force) {
ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = null;
if (this.consumerConfig.isEnableUserAuthentic()) {
authInfoBuilder = ClientMaster.MasterCertificateInfo.newBuilder();
- if (force) {
- needAdd = true;
- nextWithAuthInfo2M.set(false);
- } else if (nextWithAuthInfo2M.get()) {
- if (nextWithAuthInfo2M.compareAndSet(true, false)) {
- needAdd = true;
- }
- }
- if (needAdd) {
+ if (rmtDataCache.markAndGetAuthStatus(force)) {
authInfoBuilder.setAuthInfo(authenticateHandler
- .genMasterAuthenticateToken(consumerConfig.getUsrName(),
- consumerConfig.getUsrPassWord()));
+ .genMasterAuthenticateToken(consumerConfig.getUsrName(),
+ consumerConfig.getUsrPassWord()));
} else {
authInfoBuilder.setAuthorizedToken(authAuthorizedTokenRef.get());
}
}
- return authInfoBuilder;
+ if (authInfoBuilder != null) {
+ return authInfoBuilder.build();
+ }
+ return null;
}
- private ClientBroker.AuthorizedInfo.Builder genBrokerAuthenticInfo(int brokerId, boolean force) {
+ private ClientBroker.AuthorizedInfo genBrokerAuthenticInfo(int brokerId, boolean force) {
ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
ClientBroker.AuthorizedInfo.newBuilder();
authInfoBuilder.setVisitAuthorizedToken(visitToken.get());
if (this.consumerConfig.isEnableUserAuthentic()) {
- boolean needAdd = false;
- AtomicBoolean authStatus = nextWithAuthInfo2BMap.get(brokerId);
- if (authStatus == null) {
- AtomicBoolean tmpAuthStatus = new AtomicBoolean(false);
- authStatus =
- nextWithAuthInfo2BMap.putIfAbsent(brokerId, tmpAuthStatus);
- if (authStatus == null) {
- authStatus = tmpAuthStatus;
- }
- }
- if (force) {
- needAdd = true;
- authStatus.set(false);
- } else if (authStatus.get()) {
- if (authStatus.compareAndSet(true, false)) {
- needAdd = true;
- }
- }
- if (needAdd) {
+ if (rmtDataCache.markAndGetBrokerAuthStatus(brokerId, force)) {
authInfoBuilder.setAuthAuthorizedToken(authenticateHandler
.genBrokerAuthenticateToken(consumerConfig.getUsrName(),
consumerConfig.getUsrPassWord()));
}
}
- return authInfoBuilder;
+ return authInfoBuilder.build();
}
private void processHeartBeatAuthorizedToken(ClientMaster.HeartResponseM2C response) {
@@ -1437,17 +1342,6 @@ public class BaseMessageConsumer implements MessageConsumer {
}
}
- private boolean isCurGroupCtrl() {
- long curCheckTime = this.lastCheckTime.get();
- if (System.currentTimeMillis() - curCheckTime >= 10000) {
- if (this.lastCheckTime.compareAndSet(curCheckTime, System.currentTimeMillis())) {
- this.isCurGroupCtrl =
- this.groupFlowCtrlRuleHandler.getCurDataLimit(Long.MAX_VALUE) != null;
- }
- }
- return this.isCurGroupCtrl;
- }
-
/**
* Stopped the message listeners.
*/
@@ -1590,7 +1484,7 @@ public class BaseMessageConsumer implements MessageConsumer {
// Check if master requires to check authorization next time. If so, set the flag
// and exchange the authorize information next time.
if (response.hasRequireAuth()) {
- nextWithAuthInfo2M.set(response.getRequireAuth());
+ rmtDataCache.storeMasterAuthRequire(response.getRequireAuth());
}
// Get the latest rebalance task
ClientMaster.EventProto eventProto = response.getEvent();
@@ -1681,20 +1575,8 @@ public class BaseMessageConsumer implements MessageConsumer {
&& heartBeatResponseV2.getSuccess()) {
// If the peer require authentication, set a flag.
// The following request will attach the auth information.
- if (heartBeatResponseV2.hasRequireAuth()) {
- AtomicBoolean authStatus =
- nextWithAuthInfo2BMap.get(brokerInfo.getBrokerId());
- if (authStatus == null) {
- AtomicBoolean tmpAuthStatus = new AtomicBoolean(false);
- authStatus =
- nextWithAuthInfo2BMap.putIfAbsent(
- brokerInfo.getBrokerId(), tmpAuthStatus);
- if (authStatus == null) {
- authStatus = tmpAuthStatus;
- }
- }
- authStatus.set(heartBeatResponseV2.getRequireAuth());
- }
+ rmtDataCache.bookBrokerRequireAuthInfo(
+ brokerInfo.getBrokerId(), heartBeatResponseV2);
// If the heartbeat response report failed partitions, release the
// corresponding local partition and log the operation
if (heartBeatResponseV2.getHasPartFailure()) {
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ClientSubInfo.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ClientSubInfo.java
index 31f8567..3348801 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ClientSubInfo.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ClientSubInfo.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.inlong.tubemq.corebase.TokenConstants;
@@ -64,6 +65,19 @@ public class ClientSubInfo {
return this.topicCondRegistry.get(topic);
}
+ public void storeConsumeTarget(Map<String, TreeSet<String>> consumeTarget) {
+ TopicProcessor topicProcessor;
+ for (Map.Entry<String, TreeSet<String>> entry : consumeTarget.entrySet()) {
+ topicProcessor = new TopicProcessor(null, entry.getValue());
+ this.topicCondRegistry.put(entry.getKey(), topicProcessor);
+ this.subscribedTopics.add(entry.getKey());
+ this.topicFilterMap.put(entry.getKey(),
+ (!(entry.getValue() == null || entry.getValue().isEmpty())));
+ }
+ this.requireBound = false;
+ this.subscribedTime = System.currentTimeMillis();
+ }
+
public TopicProcessor putIfAbsentTopicProcessor(String topic,
TopicProcessor topicProcessor) {
TopicProcessor topicProcessor1 =
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
index 219a139..b7d111c 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/RmtDataCache.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
@@ -30,13 +31,21 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.corebase.TBaseConstants;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corebase.TokenConstants;
import org.apache.inlong.tubemq.corebase.cluster.BrokerInfo;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
import org.apache.inlong.tubemq.corebase.cluster.SubscribeInfo;
import org.apache.inlong.tubemq.corebase.policies.FlowCtrlRuleHandler;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.corebase.utils.DataConverterUtil;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
+import org.apache.inlong.tubemq.corebase.utils.Tuple2;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
@@ -51,8 +60,43 @@ public class RmtDataCache implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(RmtDataCache.class);
private static final AtomicLong refCont = new AtomicLong(0);
private static Timer timer;
- private final FlowCtrlRuleHandler groupFlowCtrlRuleHandler;
- private final FlowCtrlRuleHandler defFlowCtrlRuleHandler;
+ private final ConsumerConfig consumerConfig;
+ // store flow control rules
+ private final AtomicLong lstRegMasterTime = new AtomicLong(0);
+ private final AtomicBoolean isCurGroupCtrl = new AtomicBoolean(false);
+ private final AtomicLong lastCheckTime = new AtomicLong(0);
+ private final FlowCtrlRuleHandler groupFlowCtrlRuleHandler =
+ new FlowCtrlRuleHandler(false);
+ private final FlowCtrlRuleHandler defFlowCtrlRuleHandler =
+ new FlowCtrlRuleHandler(true);
+ // store broker configure info
+ private long lastEmptyBrokerPrintTime = 0;
+ private long lastEmptyTopicPrintTime = 0;
+ private long lastBrokerUpdatedTime = System.currentTimeMillis();
+ private AtomicLong lstBrokerConfigId =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+ private Map<Integer, BrokerInfo> brokersMap =
+ new ConcurrentHashMap<>();
+ // require Auth info
+ private final AtomicBoolean nextWithAuthInfo2M = new AtomicBoolean(false);
+ private final ConcurrentHashMap<Integer, AtomicBoolean> nextWithAuthInfo2BMap
+ = new ConcurrentHashMap<Integer, AtomicBoolean>();
+ // consume control info
+ private final AtomicLong reqMaxOffsetCsmId =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+ private final AtomicBoolean csmFromMaxOffset =
+ new AtomicBoolean(false);
+ // meta query result
+ private final AtomicLong topicMetaInfoId =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+ private final Set<String> metaInfoSet = new TreeSet<>();
+ private ConcurrentHashMap<String, Tuple2<Partition, Integer>> configuredPartInfoMap =
+ new ConcurrentHashMap<>();
+ private final AtomicLong topicMetaUpdatedTime =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+ private boolean isFirstReport = true;
+ private long reportIntCount = 0;
+ // partition cache
private final AtomicInteger waitCont = new AtomicInteger(0);
private final ConcurrentHashMap<String, Timeout> timeouts =
new ConcurrentHashMap<>();
@@ -78,18 +122,14 @@ public class RmtDataCache implements Closeable {
/**
* Construct a remote data cache object.
*
- * @param defFlowCtrlRuleHandler default flow control rule
- * @param groupFlowCtrlRuleHandler group flow control rule
- * @param partitionList partition list
+ * @param consumerConfig consumer configure
+ * @param partitionList partition list
*/
- public RmtDataCache(final FlowCtrlRuleHandler defFlowCtrlRuleHandler,
- final FlowCtrlRuleHandler groupFlowCtrlRuleHandler,
- List<Partition> partitionList) {
+ public RmtDataCache(ConsumerConfig consumerConfig, List<Partition> partitionList) {
+ this.consumerConfig = consumerConfig;
if (refCont.incrementAndGet() == 1) {
timer = new HashedWheelTimer();
}
- this.defFlowCtrlRuleHandler = defFlowCtrlRuleHandler;
- this.groupFlowCtrlRuleHandler = groupFlowCtrlRuleHandler;
Map<Partition, ConsumeOffsetInfo> tmpPartOffsetMap = new HashMap<>();
if (partitionList != null) {
for (Partition partition : partitionList) {
@@ -102,6 +142,432 @@ public class RmtDataCache implements Closeable {
addPartitionsInfo(tmpPartOffsetMap);
}
+ public void bookBrokerRequireAuthInfo(int brokerId,
+ ClientBroker.HeartBeatResponseB2C heartBeatResponseV2) {
+ if (!heartBeatResponseV2.hasRequireAuth()) {
+ return;
+ }
+ AtomicBoolean authStatus = nextWithAuthInfo2BMap.get(brokerId);
+ if (authStatus == null) {
+ AtomicBoolean tmpAuthStatus = new AtomicBoolean(false);
+ authStatus =
+ nextWithAuthInfo2BMap.putIfAbsent(brokerId, tmpAuthStatus);
+ if (authStatus == null) {
+ authStatus = tmpAuthStatus;
+ }
+ }
+ authStatus.set(heartBeatResponseV2.getRequireAuth());
+ }
+
+ /**
+ * update ops task in cache
+ *
+ * @param opsTaskInfo ops task info
+ *
+ */
+ public void updOpsTaskInfo(ClientMaster.OpsTaskInfo opsTaskInfo) {
+ if (opsTaskInfo == null) {
+ return;
+ }
+ // update flowctrl info
+ if (opsTaskInfo.hasGroupFlowCheckId()) {
+ if (opsTaskInfo.getGroupFlowCheckId() >= 0
+ && opsTaskInfo.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ opsTaskInfo.getGroupFlowCheckId(), opsTaskInfo.getGroupFlowControlInfo());
+ } catch (Exception e1) {
+ logger.warn("[Remote Data Cache] found parse group flowCtrl rules failure", e1);
+ }
+ }
+ }
+ if (opsTaskInfo.hasDefFlowCheckId()) {
+ if (opsTaskInfo.getDefFlowCheckId() >= 0
+ && opsTaskInfo.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ opsTaskInfo.getDefFlowCheckId(), opsTaskInfo.getDefFlowControlInfo());
+ } catch (Exception e1) {
+ logger.warn("[Remote Data Cache] found parse default flowCtrl rules failure", e1);
+ }
+ }
+ }
+ // update priority id
+ int qryPriorityId = opsTaskInfo.hasQryPriorityId()
+ ? opsTaskInfo.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
+ if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
+ groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
+ }
+ // update consume control info
+ if (opsTaskInfo.hasCsmFrmMaxOffsetCtrlId()
+ && opsTaskInfo.getCsmFrmMaxOffsetCtrlId() >= 0) {
+ if (reqMaxOffsetCsmId.get() != opsTaskInfo.getCsmFrmMaxOffsetCtrlId()) {
+ reqMaxOffsetCsmId.set(opsTaskInfo.getCsmFrmMaxOffsetCtrlId());
+ if (opsTaskInfo.getCsmFrmMaxOffsetCtrlId() > lstRegMasterTime.get()) {
+ csmFromMaxOffset.set(true);
+ }
+ }
+ }
+ // update master require auth
+ if (opsTaskInfo.hasRequireAuth()) {
+ storeMasterAuthRequire(opsTaskInfo.getRequireAuth());
+ }
+ }
+
+ /**
+ * update ops task in cache
+ *
+ * @param response master register response
+ *
+ */
+ public void updFlowCtrlInfoInfo(ClientMaster.RegisterResponseM2C response) {
+ if (response == null) {
+ return;
+ }
+ // update flowctrl info
+ if (response.hasGroupFlowCheckId()) {
+ if (response.getGroupFlowCheckId() >= 0
+ && response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
+ } catch (Exception e1) {
+ logger.warn("[Remote Data Cache] found parse group flowCtrl rules failure", e1);
+ }
+ }
+ }
+ if (response.hasDefFlowCheckId()) {
+ if (response.getDefFlowCheckId() >= 0
+ && response.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ response.getDefFlowCheckId(), response.getDefFlowControlInfo());
+ } catch (Exception e1) {
+ logger.warn("[Remote Data Cache] found parse default flowCtrl rules failure", e1);
+ }
+ }
+ }
+ // update priority id
+ int qryPriorityId = response.hasQryPriorityId()
+ ? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
+ if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
+ groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
+ }
+ }
+
+ /**
+ * update ops task in cache
+ *
+ * @param response master register response
+ *
+ */
+ public void updFlowCtrlInfoInfo(ClientMaster.HeartResponseM2C response) {
+ if (response == null) {
+ return;
+ }
+ // update flowctrl info
+ if (response.hasGroupFlowCheckId()) {
+ if (response.getGroupFlowCheckId() >= 0
+ && response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ groupFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
+ } catch (Exception e1) {
+ logger.warn("[Remote Data Cache] found parse group flowCtrl rules failure", e1);
+ }
+ }
+ }
+ if (response.hasDefFlowCheckId()) {
+ if (response.getDefFlowCheckId() >= 0
+ && response.getDefFlowCheckId() != defFlowCtrlRuleHandler.getFlowCtrlId()) {
+ try {
+ defFlowCtrlRuleHandler.updateFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
+ response.getDefFlowCheckId(), response.getDefFlowControlInfo());
+ } catch (Exception e1) {
+ logger.warn("[Remote Data Cache] found parse default flowCtrl rules failure", e1);
+ }
+ }
+ }
+ // update priority id
+ int qryPriorityId = response.hasQryPriorityId()
+ ? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
+ if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
+ groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
+ }
+ }
+
+ public boolean isCsmFromMaxOffset() {
+ if (csmFromMaxOffset.get()) {
+ return csmFromMaxOffset.compareAndSet(true, false);
+ }
+ return false;
+ }
+
+ public int getQryPriorityId() {
+ return this.groupFlowCtrlRuleHandler.getQryPriorityId();
+ }
+
+ public long getDefFlowCtrlId() {
+ return this.defFlowCtrlRuleHandler.getFlowCtrlId();
+ }
+
+ public long getGroupFlowCtrlId() {
+ return this.groupFlowCtrlRuleHandler.getFlowCtrlId();
+ }
+
+ public void storeTopicMetaInfo(long curTopicMetaInfoId, List<String> curMetaInfoSet) {
+ if (curTopicMetaInfoId < 0
+ || curTopicMetaInfoId == this.topicMetaInfoId.get()) {
+ return;
+ }
+ if (curMetaInfoSet == null || curMetaInfoSet.isEmpty()) {
+ return;
+ }
+ ConcurrentHashMap<String, Tuple2<Partition, Integer>> curConfMetaInfoMap =
+ new ConcurrentHashMap<>();
+ for (String metaInfo : curMetaInfoSet) {
+ if (TStringUtils.isBlank(metaInfo)) {
+ continue;
+ }
+ String[] strInfo = metaInfo.split(TokenConstants.SEGMENT_SEP);
+ String[] strPartInfoSet = strInfo[1].split(TokenConstants.ARRAY_SEP);
+ for (String partStr : strPartInfoSet) {
+ String[] strPartInfo = partStr.split(TokenConstants.ATTR_SEP);
+ BrokerInfo brokerInfo = brokersMap.get(Integer.parseInt(strPartInfo[0]));
+ if (brokerInfo == null) {
+ continue;
+ }
+ int storeId = Integer.parseInt(strPartInfo[1]);
+ int partCnt = Integer.parseInt(strPartInfo[2]);
+ int statusId = Integer.parseInt(strPartInfo[3]);
+ for (int j = 0; j < storeId; j++) {
+ int baseValue = j * TBaseConstants.META_STORE_INS_BASE;
+ for (int i = 0; i < partCnt; i++) {
+ Partition partition =
+ new Partition(brokerInfo, strInfo[0], baseValue + i);
+ curConfMetaInfoMap.put(partition.getPartitionKey(),
+ new Tuple2<>(partition, statusId));
+ }
+ }
+ }
+ }
+ if (curConfMetaInfoMap.isEmpty()) {
+ return;
+ }
+ this.metaInfoSet.clear();
+ this.metaInfoSet.addAll(curMetaInfoSet);
+ this.configuredPartInfoMap = curConfMetaInfoMap;
+ this.topicMetaUpdatedTime.set(System.currentTimeMillis());
+ this.topicMetaInfoId.set(curTopicMetaInfoId);
+ }
+
+ public Map<String, Boolean> getConfPartMetaInfo() {
+ Map<String, Boolean> configMap = new HashMap<>();
+ for (Map.Entry<String, Tuple2<Partition, Integer>> entry
+ : configuredPartInfoMap.entrySet()) {
+ if (entry == null || entry.getKey() == null || entry.getValue() == null) {
+ continue;
+ }
+ configMap.put(entry.getKey(), (entry.getValue().getF1() == 1));
+ }
+ return configMap;
+ }
+
+ public boolean isPartSubscribable(String partitionKey) {
+ Tuple2<Partition, Integer> partConfig =
+ configuredPartInfoMap.get(partitionKey);
+ if (partConfig == null
+ || partConfig.getF0() == null
+ || partConfig.getF1() == null) {
+ return false;
+ }
+ return (partConfig.getF1() == 1);
+ }
+
+ public boolean getSubscribablePartition(String partitionKey,
+ ProcessResult result,
+ StringBuilder sBuffer) {
+ Tuple2<Partition, Integer> partStatusInfo =
+ configuredPartInfoMap.get(partitionKey);
+ if (partStatusInfo == null) {
+ result.setFailResult(TErrCodeConstants.NOT_FOUND,
+ sBuffer.append("PartitionKey ").append(partitionKey)
+ .append(" not found in partition-meta Information set!")
+ .toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ if (partStatusInfo.getF1() != 1) {
+ result.setFailResult(TErrCodeConstants.PARTITION_UNSUBSCRIBABLE,
+ sBuffer.append("PartitionKey ").append(partitionKey)
+ .append(" not available for subscription now!")
+ .toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ result.setSuccResult(partStatusInfo.getF0());
+ return true;
+ }
+
+ public void updateReg2MasterTime() {
+ this.lstRegMasterTime.set(System.currentTimeMillis());
+ }
+
+ public long getRegMasterTime() {
+ return this.lstRegMasterTime.get();
+ }
+
+ /**
+ * Update broker configure info
+ *
+ * @param pkgCheckSum checkSum Id of packaged information
+ * @param pkgBrokerInfos packaged broker info string list
+ * @param sBuilder string process buffer
+ */
+ public void updateBrokerInfoList(long pkgCheckSum,
+ List<String> pkgBrokerInfos,
+ StringBuilder sBuilder) {
+ if (pkgCheckSum != lstBrokerConfigId.get()) {
+ if (pkgBrokerInfos != null) {
+ brokersMap =
+ DataConverterUtil.convertBrokerInfo(pkgBrokerInfos);
+ lstBrokerConfigId.set(pkgCheckSum);
+ lastBrokerUpdatedTime = System.currentTimeMillis();
+ if (pkgBrokerInfos.isEmpty()) {
+ if (System.currentTimeMillis() - lastEmptyBrokerPrintTime > 60000) {
+ logger.warn(sBuilder
+ .append("[Meta Info] Found empty brokerList, changed checksum is ")
+ .append(lstBrokerConfigId).toString());
+ sBuilder.delete(0, sBuilder.length());
+ lastEmptyBrokerPrintTime = System.currentTimeMillis();
+ }
+ } else {
+ logger.info(sBuilder
+ .append("[Meta Info] Changed brokerList checksum is ")
+ .append(lstBrokerConfigId).toString());
+ sBuilder.delete(0, sBuilder.length());
+ }
+ }
+ }
+ }
+
+ public void storeMasterAuthRequire(boolean requireAuth) {
+ nextWithAuthInfo2M.set(requireAuth);
+ }
+
+ public boolean markAndGetAuthStatus(boolean isForce) {
+ boolean needAuth = false;
+ if (isForce) {
+ nextWithAuthInfo2M.set(false);
+ } else if (nextWithAuthInfo2M.get()) {
+ if (nextWithAuthInfo2M.compareAndSet(true, false)) {
+ needAuth = true;
+ }
+ }
+ return needAuth;
+ }
+
+ public boolean markAndGetBrokerAuthStatus(int brokerId, boolean isForce) {
+ boolean needAuth = false;
+ AtomicBoolean authStatus = nextWithAuthInfo2BMap.get(brokerId);
+ if (authStatus == null) {
+ AtomicBoolean tmpAuthStatus = new AtomicBoolean(false);
+ authStatus =
+ nextWithAuthInfo2BMap.putIfAbsent(brokerId, tmpAuthStatus);
+ if (authStatus == null) {
+ authStatus = tmpAuthStatus;
+ }
+ }
+ if (isForce) {
+ needAuth = true;
+ authStatus.set(false);
+ } else if (authStatus.get()) {
+ if (authStatus.compareAndSet(true, false)) {
+ needAuth = true;
+ }
+ }
+ return needAuth;
+ }
+
+ /**
+ * get client report ops task info id set
+ *
+ * @return ops task info id set
+ */
+ public ClientMaster.OpsTaskInfo buildOpsTaskInfo() {
+ boolean hasData = false;
+ ClientMaster.OpsTaskInfo.Builder builder =
+ ClientMaster.OpsTaskInfo.newBuilder();
+ if (defFlowCtrlRuleHandler.getFlowCtrlId() >= 0) {
+ builder.setDefFlowCheckId(defFlowCtrlRuleHandler.getFlowCtrlId());
+ hasData = true;
+ }
+ if (groupFlowCtrlRuleHandler.getFlowCtrlId() >= 0) {
+ builder.setGroupFlowCheckId(groupFlowCtrlRuleHandler.getFlowCtrlId());
+ hasData = true;
+ }
+ if (groupFlowCtrlRuleHandler.getQryPriorityId() >= 0) {
+ builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
+ hasData = true;
+ }
+ if (reqMaxOffsetCsmId.get() >= 0) {
+ builder.setCsmFrmMaxOffsetCtrlId(reqMaxOffsetCsmId.get());
+ hasData = true;
+ }
+ if (hasData) {
+ return builder.build();
+ } else {
+ return null;
+ }
+ }
+
+ public ClientMaster.ClientSubRepInfo buildClientSubRepInfo() {
+ ClientMaster.ClientSubRepInfo.Builder builder =
+ ClientMaster.ClientSubRepInfo.newBuilder();
+ builder.setBrokerConfigId(this.lstBrokerConfigId.get());
+ builder.setTopicMetaInfoId(this.topicMetaInfoId.get());
+ if (this.topicMetaUpdatedTime.get() >= 0) {
+ builder.setLstAssignedTime(this.topicMetaUpdatedTime.get());
+ }
+ builder.setReportSubInfo(false);
+ if (isFirstReport) {
+ if (!this.partitionMap.isEmpty()) {
+ isFirstReport = false;
+ builder.setReportSubInfo(true);
+ builder.addAllPartSubInfo(getSubscribedPartitionInfo());
+ }
+ } else if ((++this.reportIntCount)
+ % consumerConfig.getMaxSubInfoReportIntvlTimes() == 0) {
+ builder.setReportSubInfo(true);
+ builder.addAllPartSubInfo(getSubscribedPartitionInfo());
+ }
+ return builder.build();
+ }
+
+ public long getLastBrokerConfigId() {
+ return this.lstBrokerConfigId.get();
+ }
+
+ public long getlastTopicMetaInfoId() {
+ return this.topicMetaInfoId.get();
+ }
+
+ /**
+ * Judge whether the consumer group is in flow control management
+ *
+ * @return true in control, false not
+ */
+ public boolean isCurGroupInFlowCtrl() {
+ long curCheckTime = this.lastCheckTime.get();
+ if (System.currentTimeMillis() - curCheckTime >= 10000) {
+ if (this.lastCheckTime.compareAndSet(curCheckTime, System.currentTimeMillis())) {
+ this.isCurGroupCtrl.set(
+ groupFlowCtrlRuleHandler.getCurDataLimit(Long.MAX_VALUE) != null);
+ }
+ }
+ return this.isCurGroupCtrl.get();
+ }
+
/**
* Set partition context information.
*
@@ -318,6 +784,10 @@ public class RmtDataCache implements Closeable {
return false;
}
+ public boolean isPartitionInUse(String partitionKey) {
+ return (partitionMap.get(partitionKey) != null);
+ }
+
public Partition getPartitionByKey(String partitionKey) {
return partitionMap.get(partitionKey);
}
@@ -417,6 +887,13 @@ public class RmtDataCache implements Closeable {
}
}
+ public void updPartOffsetInfo(String partitionKey, long currOffset, long maxOffset) {
+ PartitionExt partitionExt = this.partitionMap.get(partitionKey);
+ if (partitionExt != null) {
+ updateOffsetCache(partitionKey, currOffset, maxOffset);
+ }
+ }
+
private void releaseIdlePartition(long waitDlt, String partitionKey) {
Long frozenTime = partitionFrozenMap.get(partitionKey);
if (frozenTime == null) {
@@ -490,6 +967,35 @@ public class RmtDataCache implements Closeable {
return subscribeInfoList;
}
+ /**
+ * Get the subscribe partitionKey set.
+ *
+ * @return subscribe information list
+ */
+ private List<String> getSubscribedPartitionInfo() {
+ List<String> strSubInfoList = new ArrayList<>();
+ Map<String, StringBuilder> tmpSubInfoMap = new HashMap<>();
+ for (Partition partition : partitionMap.values()) {
+ if (partition == null) {
+ continue;
+ }
+ StringBuilder sBuffer = tmpSubInfoMap.get(partition.getTopic());
+ if (sBuffer == null) {
+ sBuffer = new StringBuilder(512);
+ tmpSubInfoMap.put(partition.getTopic(), sBuffer);
+ sBuffer.append(partition.getTopic()).append(TokenConstants.SEGMENT_SEP);
+ } else {
+ sBuffer.append(TokenConstants.ARRAY_SEP);
+ }
+ sBuffer.append(partition.getBrokerId())
+ .append(TokenConstants.ATTR_SEP).append(partition.getPartitionId());
+ }
+ for (Map.Entry<String, StringBuilder> entry : tmpSubInfoMap.entrySet()) {
+ strSubInfoList.add(entry.getValue().toString());
+ }
+ return strSubInfoList;
+ }
+
public Map<BrokerInfo, List<PartitionSelectResult>> removeAndGetPartition(
Map<BrokerInfo, List<Partition>> unRegisterInfoMap,
List<String> partitionKeys, long inUseWaitPeriodMs,
@@ -541,11 +1047,8 @@ public class RmtDataCache implements Closeable {
new PartitionSelectResult(true, TErrCodeConstants.SUCCESS,
"Ok!", partition, 0, lastPackConsumed);
List<PartitionSelectResult> targetPartitionList =
- unNewRegisterInfoMap.get(entry.getKey());
- if (targetPartitionList == null) {
- targetPartitionList = new ArrayList<>();
- unNewRegisterInfoMap.put(entry.getKey(), targetPartitionList);
- }
+ unNewRegisterInfoMap.computeIfAbsent(
+ entry.getKey(), k -> new ArrayList<>());
targetPartitionList.add(partitionRet);
}
}
@@ -556,6 +1059,60 @@ public class RmtDataCache implements Closeable {
return unNewRegisterInfoMap;
}
+ public boolean removeAndGetPartition(String partitionKey, long inUseWaitPeriodMs,
+ boolean isWaitTimeoutRollBack, ProcessResult result,
+ StringBuilder sBuffer) {
+ boolean lastPackConsumed = false;
+ List<String> partitionKeys = new ArrayList<>();
+ partitionKeys.add(partitionKey);
+ pauseProcess();
+ try {
+ waitPartitions(partitionKeys, inUseWaitPeriodMs);
+ PartitionExt partitionExt =
+ partitionMap.remove(partitionKey);
+ if (partitionExt == null) {
+ result.setSuccResult(null);
+ return result.isSuccess();
+ }
+ lastPackConsumed = partitionExt.isLastPackConsumed();
+ if (!cancelTimeTask(partitionKey)
+ && !indexPartition.remove(partitionKey)) {
+ logger.info(sBuffer.append("[Process Interrupt] Partition : ")
+ .append(partitionExt.toString())
+ .append(", data in processing, canceled").toString());
+ sBuffer.delete(0, sBuffer.length());
+ if (lastPackConsumed) {
+ if (isWaitTimeoutRollBack) {
+ lastPackConsumed = false;
+ }
+ }
+ }
+ ConcurrentLinkedQueue<Partition> oldPartitionList =
+ topicPartitionConMap.get(partitionExt.getTopic());
+ if (oldPartitionList != null) {
+ oldPartitionList.remove(partitionExt);
+ if (oldPartitionList.isEmpty()) {
+ topicPartitionConMap.remove(partitionExt.getTopic());
+ }
+ }
+ ConcurrentLinkedQueue<Partition> regMapPartitionList =
+ brokerPartitionConMap.get(partitionExt.getBroker());
+ if (regMapPartitionList != null) {
+ regMapPartitionList.remove(partitionExt);
+ if (regMapPartitionList.isEmpty()) {
+ brokerPartitionConMap.remove(partitionExt.getBroker());
+ }
+ }
+ partitionOffsetMap.remove(partitionKey);
+ partitionUsedMap.remove(partitionKey);
+ partitionExt.setLastPackConsumed(lastPackConsumed);
+ result.setSuccResult(partitionExt);
+ return result.isSuccess();
+ } finally {
+ resumeProcess();
+ }
+ }
+
/**
* Remove a partition.
*
@@ -585,6 +1142,17 @@ public class RmtDataCache implements Closeable {
}
}
+ public Set<String> getCurRegisteredPartSet() {
+ Set<String> partKeySet = new TreeSet<>();
+ for (String partKey : partitionMap.keySet()) {
+ if (partKey == null) {
+ continue;
+ }
+ partKeySet.add(partKey);
+ }
+ return partKeySet;
+ }
+
/**
* Get current partition information.
*
@@ -605,16 +1173,21 @@ public class RmtDataCache implements Closeable {
return tmpPartitionMap;
}
+ public long getMaxOffsetOfPartition(String partitionKey) {
+ ConsumeOffsetInfo offsetInfo = partitionOffsetMap.get(partitionKey);
+ if (offsetInfo == null) {
+ return -1L;
+ }
+ return offsetInfo.getMaxOffset();
+ }
+
public Map<BrokerInfo, List<PartitionSelectResult>> getAllPartitionListWithStatus() {
Map<BrokerInfo, List<PartitionSelectResult>> registeredInfoMap =
new HashMap<>();
for (PartitionExt partitionExt : partitionMap.values()) {
List<PartitionSelectResult> registerPartitionList =
- registeredInfoMap.get(partitionExt.getBroker());
- if (registerPartitionList == null) {
- registerPartitionList = new ArrayList<>();
- registeredInfoMap.put(partitionExt.getBroker(), registerPartitionList);
- }
+ registeredInfoMap.computeIfAbsent(
+ partitionExt.getBroker(), k -> new ArrayList<>());
registerPartitionList.add(new PartitionSelectResult(true,
TErrCodeConstants.SUCCESS, "Ok!",
partitionExt, 0, partitionExt.isLastPackConsumed()));
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
new file mode 100644
index 0000000..52e7461
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/SimpleClientBalanceConsumer.java
@@ -0,0 +1,1712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.consumer;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.inlong.tubemq.client.common.ConfirmResult;
+import org.apache.inlong.tubemq.client.common.ConsumeResult;
+import org.apache.inlong.tubemq.client.common.QueryMetaResult;
+import org.apache.inlong.tubemq.client.common.TClientConstants;
+import org.apache.inlong.tubemq.client.common.TubeClientVersion;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.InnerSessionFactory;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corebase.TokenConstants;
+import org.apache.inlong.tubemq.corebase.aaaclient.ClientAuthenticateHandler;
+import org.apache.inlong.tubemq.corebase.aaaclient.SimpleClientAuthenticateHandler;
+import org.apache.inlong.tubemq.corebase.cluster.BrokerInfo;
+import org.apache.inlong.tubemq.corebase.cluster.Partition;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientBroker;
+import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.corebase.utils.AddressUtils;
+import org.apache.inlong.tubemq.corebase.utils.DataConverterUtil;
+import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
+import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
+import org.apache.inlong.tubemq.corerpc.RpcConfig;
+import org.apache.inlong.tubemq.corerpc.RpcConstants;
+import org.apache.inlong.tubemq.corerpc.RpcServiceFactory;
+import org.apache.inlong.tubemq.corerpc.service.BrokerReadService;
+import org.apache.inlong.tubemq.corerpc.service.MasterService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SimpleClientBalanceConsumer implements ClientBalanceConsumer {
+ private static final Logger logger =
+ LoggerFactory.getLogger(SimpleClientBalanceConsumer.class);
+
+ private static final AtomicInteger consumerCounter =
+ new AtomicInteger(0);
+ protected final String consumerId;
+ protected final ConsumerConfig consumerConfig;
+ private final InnerSessionFactory sessionFactory;
+ private final RpcServiceFactory rpcServiceFactory;
+ private final MasterService masterService;
+ // client status
+ //
+ // 0, not initial
+ // 1, starting
+ // 2, started
+ // 3, stopping
+ private final AtomicInteger clientStatus = new AtomicInteger(0);
+ private int sourceCount = TBaseConstants.META_VALUE_UNDEFINED;
+ private int nodeId = TBaseConstants.META_VALUE_UNDEFINED;
+ protected final ClientSubInfo consumeSubInfo = new ClientSubInfo();
+ protected final RmtDataCache clientRmtDataCache;
+ private final ConsumerSamplePrint samplePrintCtrl =
+ new ConsumerSamplePrint();
+ private final RpcConfig rpcConfig = new RpcConfig();
+ private final AtomicLong visitToken =
+ new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
+ private AtomicReference<String> authAuthorizedTokenRef =
+ new AtomicReference<>("");
+ private ClientAuthenticateHandler authenticateHandler =
+ new SimpleClientAuthenticateHandler();
+ private final ScheduledExecutorService heartService2Master;
+ private final AtomicInteger metaReqStatusId = new AtomicInteger(0);
+ private final AtomicLong lstMetaQueryTime = new AtomicLong(0);
+ private final AtomicBoolean needMetaSelfChk = new AtomicBoolean(false);
+ private int heartbeat2MRetryTimes = 0;
+ private long lastHeartbeatTime2Master = 0;
+ private Thread heartBeatThread2Broker;
+ private long lastHeartbeatTime2Broker = 0;
+ private final ConcurrentHashMap<String, Long> partRegFreqCtrlMap =
+ new ConcurrentHashMap<>();
+
+ public SimpleClientBalanceConsumer(final InnerSessionFactory messageSessionFactory,
+ final ConsumerConfig consumerConfig) throws TubeClientException {
+ java.security.Security.setProperty("networkaddress.cache.ttl", "3");
+ java.security.Security.setProperty("networkaddress.cache.negative.ttl", "1");
+ if (messageSessionFactory == null || consumerConfig == null) {
+ throw new TubeClientException(
+ "Illegal parameter: messageSessionFactory or consumerConfig is null!");
+ }
+ this.sessionFactory = messageSessionFactory;
+ this.consumerConfig = consumerConfig;
+ try {
+ this.consumerId = generateConsumerID();
+ } catch (Exception e) {
+ throw new TubeClientException("Get consumer id failed!", e);
+ }
+ this.clientRmtDataCache =
+ new RmtDataCache(this.consumerConfig, null);
+ this.rpcServiceFactory =
+ this.sessionFactory.getRpcServiceFactory();
+ this.rpcConfig.put(RpcConstants.CONNECT_TIMEOUT, 3000);
+ this.rpcConfig.put(RpcConstants.REQUEST_TIMEOUT,
+ this.consumerConfig.getRpcTimeoutMs());
+ this.rpcConfig.put(RpcConstants.WORKER_THREAD_NAME,
+ "tube_consumer_netty_worker-");
+ this.rpcConfig.put(RpcConstants.CALLBACK_WORKER_COUNT,
+ this.consumerConfig.getRpcRspCallBackThreadCnt());
+ this.masterService =
+ rpcServiceFactory.getFailoverService(MasterService.class,
+ this.consumerConfig.getMasterInfo(), this.rpcConfig);
+ this.heartService2Master =
+ Executors.newScheduledThreadPool(1, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, new StringBuilder(512)
+ .append("Master-Heartbeat-Thread-")
+ .append(consumerId).toString());
+ t.setPriority(Thread.MAX_PRIORITY);
+ return t;
+ }
+ });
+ }
+
+ @Override
+ public boolean start(Map<String, TreeSet<String>> topicAndFilterCondMap,
+ int sourceCount, int nodeId,
+ ProcessResult result) throws TubeClientException {
+ if (result == null) {
+ throw new TubeClientException("Illegal parameter: parameter result is null!");
+ }
+ StringBuilder sBuffer = new StringBuilder(512);
+ if (!validAndStoreConsumeTarget(topicAndFilterCondMap, sBuffer, result)) {
+ return result.isSuccess();
+ }
+ if (sourceCount > 0) {
+ if (nodeId < 0 || nodeId > (sourceCount - 1)) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ "When groupNodeCnt is valid, the nodeId value must be between in [0, sourceCount-1]!");
+ return result.isSuccess();
+ }
+ }
+ // judge client status
+ if (clientStatus.get() != 0) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ "The SDK is running, please shutdown first!");
+ return result.isSuccess();
+ }
+ if (!clientStatus.compareAndSet(0, 1)) {
+ switch (clientStatus.get()) {
+ case 2: {
+ result.setSuccResult();
+ }
+ break;
+
+ case 3: {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ "The client is shutting down. Please try again later!");
+ }
+ break;
+
+ case 0:
+ case 1:
+ default: {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ "Duplicated calls, the client is starting, please wait a minute!");
+ }
+ }
+ return result.isSuccess();
+ }
+ if (sourceCount > 0) {
+ this.sourceCount = sourceCount;
+ this.nodeId = nodeId;
+ }
+ // store consume target information
+ Map<String, TreeSet<String>> consumeTargetMap =
+ (Map<String, TreeSet<String>>) result.getRetData();
+ this.consumeSubInfo.storeConsumeTarget(consumeTargetMap);
+ if (!startMasterAndBrokerThreads(result, sBuffer)) {
+ clientStatus.compareAndSet(1, 0);
+ return result.isSuccess();
+ }
+ clientStatus.compareAndSet(1, 2);
+ result.setSuccResult();
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ int tmpStatusId = this.clientStatus.get();
+ return (tmpStatusId <= 0 || tmpStatusId > 2);
+ }
+
+ @Override
+ public ConsumerConfig getConsumerConfig() {
+ return this.consumerConfig;
+ }
+
+ @Override
+ public String getConsumerId() {
+ return this.consumerId;
+ }
+
+ @Override
+ public boolean isFilterConsume(String topic) {
+ return this.consumeSubInfo.isFilterConsume(topic);
+ }
+
+ @Override
+ public int getSourceCount() {
+ return this.sourceCount;
+ }
+
+ @Override
+ public int getNodeId() {
+ return this.nodeId;
+ }
+
+ @Override
+ public String getClientVersion() {
+ return TubeClientVersion.CONSUMER_VERSION;
+ }
+
+ @Override
+ public void shutdown() throws Throwable {
+ StringBuilder strBuffer = new StringBuilder(512);
+ if (!clientStatus.compareAndSet(2, 3)) {
+ switch (clientStatus.get()) {
+ case 3: {
+ logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] ")
+ .append(this.consumerId)
+ .append(" is shutting down, do nothing...").toString());
+ }
+ break;
+
+ case 0: {
+ logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] ")
+ .append(this.consumerId)
+ .append(" was already shutdown, do nothing...").toString());
+
+ }
+ break;
+
+ case 1: {
+ logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] ")
+ .append(this.consumerId)
+ .append(" is starting, please wait a minute!").toString());
+ }
+ break;
+ }
+ return;
+ }
+ logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] Shutting down consumer:")
+ .append(this.consumerId).toString());
+ strBuffer.delete(0, strBuffer.length());
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ //
+ }
+ //
+ this.clientRmtDataCache.close();
+ Map<BrokerInfo, List<PartitionSelectResult>> unRegisterInfoMap =
+ clientRmtDataCache.getAllPartitionListWithStatus();
+ unregisterPartitions(unRegisterInfoMap);
+ this.sessionFactory.removeClient(this);
+ if (this.heartService2Master != null) {
+ try {
+ this.heartService2Master.shutdownNow();
+ } catch (Throwable ee) {
+ //
+ }
+ }
+ if (this.heartBeatThread2Broker != null) {
+ try {
+ this.heartBeatThread2Broker.interrupt();
+ heartBeatThread2Broker.join();
+ this.heartBeatThread2Broker = null;
+ } catch (Throwable ee) {
+ //
+ }
+ }
+ logger.info(strBuffer
+ .append("[SHUTDOWN_CONSUMER] Partitions unregistered, consumer :")
+ .append(this.consumerId).toString());
+ strBuffer.delete(0, strBuffer.length());
+ try {
+ masterService.consumerCloseClientC2M(createMasterCloseRequest(),
+ AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+ } catch (Throwable e) {
+ strBuffer.delete(0, strBuffer.length());
+ logger.warn(strBuffer
+ .append("[SHUTDOWN_CONSUMER] call closeRequest failure, error is ")
+ .append(e.getMessage()).toString());
+ strBuffer.delete(0, strBuffer.length());
+ }
+ logger.info(strBuffer.append("[SHUTDOWN_CONSUMER] Client closed, consumer : ")
+ .append(this.consumerId).toString());
+ this.clientStatus.set(0);
+ }
+
+ @Override
+ public Set<String> getCurRegisteredPartSet() {
+ return clientRmtDataCache.getCurRegisteredPartSet();
+ }
+
+ @Override
+ public Map<String, ConsumeOffsetInfo> getCurPartitionOffsetInfos() {
+ return this.clientRmtDataCache.getCurPartitionInfoMap();
+ }
+
+ @Override
+ public boolean isPartitionsReady(long maxWaitTime) {
+ return clientRmtDataCache.isPartitionsReady(maxWaitTime);
+ }
+
+ @Override
+ public boolean getPartitionMetaInfo(QueryMetaResult result) throws TubeClientException {
+ if (result == null) {
+ throw new TubeClientException("Illegal parameter: parameter result is null!");
+ }
+ StringBuilder sBuffer = new StringBuilder(512);
+ if (isShutdown()) {
+ result.setFailResult(TErrCodeConstants.CLIENT_SHUTDOWN,
+ "The client is not started or closed!");
+ return result.isSuccess();
+ }
+ if (System.currentTimeMillis() - lstMetaQueryTime.get()
+ >= consumerConfig.getPartMetaInfoCheckPeriodMs()) {
+ if (metaReqStatusId.compareAndSet(0, 1)) {
+ try {
+ ClientMaster.GetPartMetaResponseM2C response =
+ masterService.consumerGetPartMetaInfoC2M(createMasterGetPartMetaRequest(),
+ AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+ if (response == null) {
+ result.setFailResult(TErrCodeConstants.CONNECT_RETURN_NULL,
+ sBuffer.append("Query Failed: ").append(consumerId)
+ .append(" query master and return null!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ needMetaSelfChk.set(false);
+ lstMetaQueryTime.set(System.currentTimeMillis());
+ if (response.getErrCode() != TErrCodeConstants.SUCCESS) {
+ // If the consumer group is forbidden, output the log
+ result.setFailResult(response.getErrCode(), response.getErrMsg());
+ return result.isSuccess();
+ }
+ // Process the successful response
+ if (response.hasBrokerConfigId()) {
+ clientRmtDataCache.updateBrokerInfoList(response.getBrokerConfigId(),
+ response.getBrokerConfigListList(), sBuffer);
+ }
+ if (response.hasTopicMetaInfoId()) {
+ // update local cache meta information
+ clientRmtDataCache.storeTopicMetaInfo(response.getTopicMetaInfoId(),
+ response.getTopicMetaInfoListList());
+ // clear unsubscribable partitions
+ clearUnSubscribablePartitions();
+ }
+ } catch (Throwable e) {
+ result.setFailResult(TErrCodeConstants.INTERNAL_SERVER_ERROR,
+ sBuffer.append("Query MetaInfo throw exception: ")
+ .append(e.getCause()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ } finally {
+ metaReqStatusId.set(0);
+ }
+ }
+ }
+ result.setSuccResult(clientRmtDataCache.getConfPartMetaInfo());
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean connect2Partition(String partitionKey,
+ long boostrapOffset,
+ ProcessResult result) throws TubeClientException {
+ if (result == null) {
+ throw new TubeClientException("Illegal parameter: parameter result is null!");
+ }
+ final StringBuilder sBuffer = new StringBuilder(512);
+ if (TStringUtils.isBlank(partitionKey)) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ "Parameter partitionKey is blank!");
+ return result.isSuccess();
+ }
+ if (isShutdown()) {
+ result.setFailResult(TErrCodeConstants.CLIENT_SHUTDOWN,
+ "The client is not started or closed!");
+ return result.isSuccess();
+ }
+ if (clientRmtDataCache.isPartitionInUse(partitionKey)) {
+ result.setSuccResult();
+ return result.isSuccess();
+ }
+ if (!clientRmtDataCache.getSubscribablePartition(
+ partitionKey, result, sBuffer)) {
+ return result.isSuccess();
+ }
+ // check if high frequency request for failure partition
+ Long lstTime = partRegFreqCtrlMap.get(partitionKey);
+ if (lstTime != null && (System.currentTimeMillis() - lstTime
+ < TClientConstants.CFG_MIN_META_QUERY_WAIT_PERIOD_MS)) {
+ result.setFailResult(TErrCodeConstants.CLIENT_HIGH_FREQUENCY_REQUEST,
+ sBuffer.append("High-frequency request, please call ").append(partitionKey)
+ .append(" at least ").append(TClientConstants.CFG_MIN_META_QUERY_WAIT_PERIOD_MS)
+ .append("ms interval!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ Partition partition = (Partition) result.getRetData();
+ final String uniqueId =
+ sBuffer.append(consumerConfig.getConsumerGroup())
+ .append("#").append(partitionKey).toString();
+ sBuffer.delete(0, sBuffer.length());
+ synchronized (uniqueId) {
+ registerPartitions(partition, boostrapOffset, result, sBuffer);
+ }
+ if (!result.isSuccess()
+ && (result.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED
+ || result.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE)) {
+ // only partition occupied or certificate failure need control frequency
+ partRegFreqCtrlMap.put(partitionKey, System.currentTimeMillis());
+ }
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean disconnectFromPartition(String partitionKey,
+ ProcessResult result) throws TubeClientException {
+ if (result == null) {
+ throw new TubeClientException("Illegal parameter: parameter result is null!");
+ }
+ final StringBuilder sBuffer = new StringBuilder(512);
+ if (TStringUtils.isBlank(partitionKey)) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ "Parameter partitionKey is blank!");
+ return result.isSuccess();
+ }
+ if (isShutdown()) {
+ result.setFailResult(TErrCodeConstants.CLIENT_SHUTDOWN,
+ "The client is not started or closed!");
+ return result.isSuccess();
+ }
+ if (!clientRmtDataCache.isPartitionInUse(partitionKey)) {
+ result.setSuccResult();
+ return result.isSuccess();
+ }
+ clientRmtDataCache.removeAndGetPartition(partitionKey,
+ this.consumerConfig.getPullRebConfirmWaitPeriodMs(),
+ this.consumerConfig.isPullRebConfirmTimeoutRollBack(), result, sBuffer);
+ PartitionExt partitionExt = (PartitionExt) result.getRetData();
+ if (partitionExt == null) {
+ result.setSuccResult();
+ return result.isSuccess();
+ }
+ unregisterPartition(partitionExt, partitionExt.isLastPackConsumed(), sBuffer);
+ result.setSuccResult();
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean getMessage(ConsumeResult result) throws TubeClientException {
+ if (result == null) {
+ throw new TubeClientException("Illegal parameter: parameter result is null!");
+ }
+ if (isShutdown()) {
+ result.setFailResult(TErrCodeConstants.CLIENT_SHUTDOWN,
+ "The client is not started or closed!");
+ return result.isSuccess();
+ }
+ PartitionSelectResult selectResult = null;
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ if (isShutdown()) {
+ result.setFailResult(TErrCodeConstants.CLIENT_SHUTDOWN,
+ "The client has been shutdown!");
+ return result.isSuccess();
+ }
+ selectResult = clientRmtDataCache.getCurrPartsStatus();
+ if (selectResult.isSuccess()) {
+ break;
+ }
+ if ((consumerConfig.getPullConsumeReadyWaitPeriodMs() >= 0)
+ && (System.currentTimeMillis() - startTime
+ >= consumerConfig.getPullConsumeReadyWaitPeriodMs())) {
+ result.setFailResult(selectResult.getErrCode(), selectResult.getErrMsg());
+ return result.isSuccess();
+ }
+ if (consumerConfig.getPullConsumeReadyChkSliceMs() > 10) {
+ ThreadUtils.sleep(consumerConfig.getPullConsumeReadyChkSliceMs());
+ }
+ }
+ StringBuilder sBuilder = new StringBuilder(512);
+ // Check the data cache first
+ selectResult = clientRmtDataCache.pullSelect();
+ if (!selectResult.isSuccess()) {
+ result.setFailResult(selectResult.getErrCode(), selectResult.getErrMsg());
+ return result.isSuccess();
+ }
+ FetchContext taskContext = fetchMessage(selectResult, sBuilder);
+ result.setProcessResult(taskContext);
+ return result.isSuccess();
+ }
+
+ @Override
+ public boolean confirmConsume(String confirmContext, boolean isConsumed,
+ ConfirmResult result) throws TubeClientException {
+ if (result == null) {
+ throw new TubeClientException("Illegal parameter: parameter result is null!");
+ }
+ StringBuilder sBuilder = new StringBuilder(512);
+ if (isShutdown()) {
+ result.setFailResult(TErrCodeConstants.CLIENT_SHUTDOWN,
+ "The client is not started or closed!");
+ return result.isSuccess();
+ }
+ long currOffset = TBaseConstants.META_VALUE_UNDEFINED;
+ long maxOffset = TBaseConstants.META_VALUE_UNDEFINED;
+ // Verify if the confirmContext is valid
+ if (TStringUtils.isBlank(confirmContext)) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ "ConfirmContext is null!");
+ return result.isSuccess();
+ }
+ String[] strConfirmContextItems =
+ confirmContext.split(TokenConstants.ATTR_SEP);
+ if (strConfirmContextItems.length != 4) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ "ConfirmContext format error: value must be aaaa:bbbb:cccc:ddddd !");
+ return result.isSuccess();
+ }
+ for (String itemStr : strConfirmContextItems) {
+ if (TStringUtils.isBlank(itemStr)) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ sBuilder.append("ConfirmContext's format error: item (")
+ .append(itemStr).append(") is null !").toString());
+ sBuilder.delete(0, sBuilder.length());
+ return result.isSuccess();
+ }
+ }
+ String keyId = sBuilder.append(strConfirmContextItems[0].trim())
+ .append(TokenConstants.ATTR_SEP).append(strConfirmContextItems[1].trim())
+ .append(TokenConstants.ATTR_SEP).append(strConfirmContextItems[2].trim()).toString();
+ sBuilder.delete(0, sBuilder.length());
+ String topicName = strConfirmContextItems[1].trim();
+ long timeStamp = Long.parseLong(strConfirmContextItems[3]);
+ if (!clientRmtDataCache.isPartitionInUse(keyId, timeStamp)) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ "The confirmContext's value invalid!");
+ return result.isSuccess();
+ }
+ Partition curPartition =
+ clientRmtDataCache.getPartitionByKey(keyId);
+ if (curPartition == null) {
+ result.setFailResult(TErrCodeConstants.NOT_FOUND,
+ sBuilder.append("Not found the partition by confirmContext:")
+ .append(confirmContext).toString());
+ sBuilder.delete(0, sBuilder.length());
+ return result.isSuccess();
+ }
+ if (this.consumerConfig.isPullConfirmInLocal()) {
+ clientRmtDataCache.succRspRelease(keyId, topicName,
+ timeStamp, isConsumed, isFilterConsume(topicName), currOffset, maxOffset);
+ result.setSuccResult(topicName, curPartition, currOffset, maxOffset);
+ return result.isSuccess();
+ } else {
+ try {
+ ClientBroker.CommitOffsetResponseB2C response =
+ getBrokerService(curPartition.getBroker())
+ .consumerCommitC2B(createBrokerCommitRequest(curPartition, isConsumed),
+ AddressUtils.getLocalAddress(), getConsumerConfig().isTlsEnable());
+ if (response == null) {
+ result.setFailResult(TErrCodeConstants.CONNECT_RETURN_NULL,
+ sBuilder.append("Confirm ").append(confirmContext)
+ .append("'s offset failed, response is null!").toString());
+ sBuilder.delete(0, sBuilder.length());
+ return result.isSuccess();
+ } else {
+ if (response.hasCurrOffset() && response.getCurrOffset() >= 0) {
+ currOffset = response.getCurrOffset();
+ }
+ if (response.hasMaxOffset() && response.getMaxOffset() >= 0) {
+ maxOffset = response.getMaxOffset();
+ }
+ result.setProcessResult(response.getSuccess(),
+ response.getErrCode(), response.getErrMsg(),
+ topicName, curPartition, currOffset, maxOffset);
+ return result.isSuccess();
+ }
+ } catch (Throwable e) {
+ sBuilder.delete(0, sBuilder.length());
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ sBuilder.append("Confirm ").append(confirmContext)
+ .append("'s offset failed, exception is ")
+ .append(e.toString()).toString());
+ sBuilder.delete(0, sBuilder.length());
+ return result.isSuccess();
+ } finally {
+ clientRmtDataCache.succRspRelease(keyId, topicName, timeStamp,
+ isConsumed, isFilterConsume(topicName), currOffset, maxOffset);
+ }
+ }
+ }
+
+ /**
+ * Register partitions.
+ *
+ * @param partition need register partition
+ * @param boostrapOffset boostrap offset
+ * @param result process result
+ * @param sBuffer string buffer
+ *
+ * @return process result
+ */
+ private boolean registerPartitions(Partition partition, long boostrapOffset,
+ ProcessResult result, StringBuilder sBuffer) {
+ int maxRegisterRetryTimes = 2;
+ int retryTimesRegister2Broker = 0;
+ while (retryTimesRegister2Broker < maxRegisterRetryTimes) {
+ if (isShutdown()) {
+ result.setFailResult(TErrCodeConstants.CLIENT_SHUTDOWN,
+ "The client is not started or closed!");
+ return result.isSuccess();
+ }
+ if (clientRmtDataCache.isPartitionInUse(partition.getPartitionKey())) {
+ result.setSuccResult();
+ return result.isSuccess();
+ }
+ if (tryRegister2Broker(partition, boostrapOffset, result, sBuffer)) {
+ return result.isSuccess();
+ }
+ logger.warn(sBuffer.append("register ")
+ .append(partition.toString()).append(" failure(")
+ .append(retryTimesRegister2Broker).append("), return ")
+ .append(result.getErrMsg()).toString());
+ retryTimesRegister2Broker++;
+ ThreadUtils.sleep(1000);
+ }
+ return result.isSuccess();
+ }
+
+ private FetchContext fetchMessage(PartitionSelectResult partSelectResult,
+ StringBuilder sBuffer) {
+ // Fetch task context based on selected partition
+ FetchContext taskContext =
+ new FetchContext(partSelectResult);
+ Partition partition = taskContext.getPartition();
+ String topic = partition.getTopic();
+ String partitionKey = partition.getPartitionKey();
+ // Response from broker
+ ClientBroker.GetMessageResponseB2C msgRspB2C = null;
+ try {
+ msgRspB2C =
+ getBrokerService(partition.getBroker())
+ .getMessagesC2B(createBrokerGetMessageRequest(
+ partition, taskContext.isLastConsumed()),
+ AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+ } catch (Throwable ee) {
+ // Process the exception
+ clientRmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
+ taskContext.setFailProcessResult(400, sBuffer
+ .append("Get message error, reason is ")
+ .append(ee.toString()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return taskContext;
+ }
+ if (msgRspB2C == null) {
+ clientRmtDataCache.errReqRelease(partitionKey, taskContext.getUsedToken(), false);
+ taskContext.setFailProcessResult(500, "Get message null");
+ return taskContext;
+ }
+ try {
+ // Process the response based on the return code
+ switch (msgRspB2C.getErrCode()) {
+ case TErrCodeConstants.SUCCESS: {
+ int msgSize = 0;
+ // Convert the message payload data
+ List<Message> tmpMessageList =
+ DataConverterUtil.convertMessage(topic, msgRspB2C.getMessagesList());
+ boolean isEscLimit =
+ (msgRspB2C.hasEscFlowCtrl() && msgRspB2C.getEscFlowCtrl());
+ // Filter the message based on its content
+ // Calculate the message size and do some flow control
+ boolean needFilter = false;
+ Set<String> topicFilterSet = null;
+ TopicProcessor topicProcessor = consumeSubInfo.getTopicProcessor(topic);
+ if (topicProcessor != null) {
+ topicFilterSet = topicProcessor.getFilterConds();
+ if (topicFilterSet != null && !topicFilterSet.isEmpty()) {
+ needFilter = true;
+ }
+ }
+ List<Message> messageList = new ArrayList<>();
+ for (Message message : tmpMessageList) {
+ if (message == null) {
+ continue;
+ }
+ if (needFilter && (TStringUtils.isBlank(message.getMsgType())
+ || !topicFilterSet.contains(message.getMsgType()))) {
+ continue;
+ }
+ messageList.add(message);
+ msgSize += message.getData().length;
+ }
+ // Set the process result of current stage. Process the result based on the response
+ long dataDltVal = msgRspB2C.hasCurrDataDlt()
+ ? msgRspB2C.getCurrDataDlt() : -1;
+ long currOffset = msgRspB2C.hasCurrOffset()
+ ? msgRspB2C.getCurrOffset() : TBaseConstants.META_VALUE_UNDEFINED;
+ long maxOffset = msgRspB2C.hasMaxOffset()
+ ? msgRspB2C.getMaxOffset() : TBaseConstants.META_VALUE_UNDEFINED;
+ boolean isRequireSlow =
+ (msgRspB2C.hasRequireSlow() && msgRspB2C.getRequireSlow());
+ clientRmtDataCache
+ .setPartitionContextInfo(partitionKey, currOffset, 1,
+ msgRspB2C.getErrCode(), isEscLimit, msgSize, 0,
+ dataDltVal, isRequireSlow, maxOffset);
+ taskContext.setSuccessProcessResult(currOffset,
+ sBuffer.append(partitionKey).append(TokenConstants.ATTR_SEP)
+ .append(taskContext.getUsedToken()).toString(), messageList, maxOffset);
+ sBuffer.delete(0, sBuffer.length());
+ break;
+ }
+ case TErrCodeConstants.HB_NO_NODE:
+ case TErrCodeConstants.CERTIFICATE_FAILURE:
+ case TErrCodeConstants.DUPLICATE_PARTITION: {
+ // Release the partitions when meeting these error codes
+ clientRmtDataCache.removePartition(partition);
+ taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
+ break;
+ }
+ case TErrCodeConstants.SERVER_CONSUME_SPEED_LIMIT: {
+ // Process with server side speed limit
+ long defDltTime =
+ msgRspB2C.hasMinLimitTime()
+ ? msgRspB2C.getMinLimitTime() : consumerConfig.getMsgNotFoundWaitPeriodMs();
+ clientRmtDataCache.errRspRelease(partitionKey, topic,
+ taskContext.getUsedToken(), false, TBaseConstants.META_VALUE_UNDEFINED,
+ 0, msgRspB2C.getErrCode(), false, 0,
+ defDltTime, isFilterConsume(topic), TBaseConstants.META_VALUE_UNDEFINED,
+ TBaseConstants.META_VALUE_UNDEFINED);
+ taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
+ break;
+ }
+ case TErrCodeConstants.NOT_FOUND:
+ case TErrCodeConstants.FORBIDDEN:
+ case TErrCodeConstants.SERVICE_UNAVAILABLE:
+ case TErrCodeConstants.MOVED:
+ default: {
+ // Slow down the request based on the limitation configuration when meet these errors
+ long limitDlt = 300;
+ switch (msgRspB2C.getErrCode()) {
+ case TErrCodeConstants.FORBIDDEN: {
+ limitDlt = 2000;
+ break;
+ }
+ case TErrCodeConstants.SERVICE_UNAVAILABLE: {
+ limitDlt = 300;
+ break;
+ }
+ case TErrCodeConstants.MOVED: {
+ limitDlt = 200;
+ break;
+ }
+ case TErrCodeConstants.NOT_FOUND: {
+ limitDlt = consumerConfig.getMsgNotFoundWaitPeriodMs();
+ break;
+ }
+ default: {
+ //
+ }
+ }
+ clientRmtDataCache.errRspRelease(partitionKey, topic,
+ taskContext.getUsedToken(), false, TBaseConstants.META_VALUE_UNDEFINED,
+ 0, msgRspB2C.getErrCode(), false, 0,
+ limitDlt, isFilterConsume(topic), -1, TBaseConstants.META_VALUE_UNDEFINED);
+ taskContext.setFailProcessResult(msgRspB2C.getErrCode(), msgRspB2C.getErrMsg());
+ break;
+ }
+ }
+ return taskContext;
+ } catch (Throwable ee) {
+ logger.error("Process response code error", ee);
+ clientRmtDataCache.succRspRelease(partitionKey, topic,
+ taskContext.getUsedToken(), false, isFilterConsume(topic),
+ TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED);
+ taskContext.setFailProcessResult(500, sBuffer
+ .append("Get message failed,topic=")
+ .append(topic).append(",partition=").append(partition)
+ .append(", throw info is ").append(ee.toString()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ }
+ return taskContext;
+ }
+
+ /**
+ * Start heartbeat thread threads.
+ *
+ * @param result process result
+ * @param sBuffer string buffer
+ *
+ * @return process result
+ */
+ private boolean startMasterAndBrokerThreads(ProcessResult result,
+ StringBuilder sBuffer) {
+ int registerRetryTimes = 0;
+ while (registerRetryTimes < consumerConfig.getMaxRegisterRetryTimes()) {
+ if (tryRegister2Master(result, sBuffer)) {
+ logger.info(sBuffer.append("[Registered] ")
+ .append(consumerId).toString());
+ sBuffer.delete(0, sBuffer.length());
+ break;
+ } else {
+ logger.error(result.getErrMsg());
+ ThreadUtils.sleep(this.consumerConfig.getRegFailWaitPeriodMs());
+ }
+ if (++registerRetryTimes >= consumerConfig.getMaxRegisterRetryTimes()) {
+ logger.error(result.getErrMsg());
+ return result.isSuccess();
+ }
+ }
+ // to master heartbeat
+ this.lastHeartbeatTime2Master = System.currentTimeMillis();
+ this.heartService2Master.scheduleWithFixedDelay(new HeartTask2MasterWorker(),
+ 0, consumerConfig.getHeartbeatPeriodMs(), TimeUnit.MILLISECONDS);
+ // to broker
+ this.lastHeartbeatTime2Broker = System.currentTimeMillis();
+ this.heartBeatThread2Broker = new Thread(new HeartTask2BrokerWorker());
+ heartBeatThread2Broker.setName(sBuffer
+ .append("Broker-Heartbeat-Thread-")
+ .append(consumerId).toString());
+ sBuffer.delete(0, sBuffer.length());
+ heartBeatThread2Broker.setPriority(Thread.MAX_PRIORITY);
+ heartBeatThread2Broker.start();
+ result.setSuccResult();
+ return result.isSuccess();
+ }
+
+ private boolean validAndStoreConsumeTarget(Map<String, TreeSet<String>> consumeTargetMap,
+ StringBuilder sBuffer, ProcessResult result) {
+ if (consumeTargetMap == null
+ || consumeTargetMap.isEmpty()) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ "Parameter error: the subscribed target is null or empty!");
+ return result.isSuccess();
+ }
+ String topicName;
+ String tmpFilter;
+ TreeSet<String> filterCondSet;
+ TreeSet<String> newFilterCondSet;
+ Map<String, TreeSet<String>> newConsumeTargetMap = new HashMap<>();
+ for (Map.Entry<String, TreeSet<String>> entry : consumeTargetMap.entrySet()) {
+ // check topic value
+ topicName = entry.getKey();
+ if (TStringUtils.isBlank(topicName)) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ "Parameter error: an blank Topic field,topic is blank in map!");
+ return result.isSuccess();
+ }
+ topicName = topicName.trim();
+ if (topicName.length() > TBaseConstants.META_MAX_TOPICNAME_LENGTH) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ sBuffer.append("Parameter error: the max length of ")
+ .append(topicName).append(" in topicName parameter over ")
+ .append(TBaseConstants.META_MAX_TOPICNAME_LENGTH)
+ .append(" characters").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ if (!topicName.matches(TBaseConstants.META_TMP_STRING_VALUE)) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ sBuffer.append("Parameter error: the value of ").append(topicName)
+ .append(" in topicName parameter must begin with a letter,")
+ .append(" can only contain characters,numbers,and underscores")
+ .toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ // check topic's filter condition value
+ filterCondSet = entry.getValue();
+ newFilterCondSet = new TreeSet<>();
+ if ((filterCondSet != null) && (!filterCondSet.isEmpty())) {
+ if (filterCondSet.size() > TBaseConstants.CFG_FLT_MAX_FILTER_ITEM_COUNT) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ sBuffer.append("Parameter error: over max allowed filter count of ")
+ .append(topicName).append(", allowed count is ")
+ .append(TBaseConstants.CFG_FLT_MAX_FILTER_ITEM_COUNT)
+ .toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ for (String filter : filterCondSet) {
+ if (TStringUtils.isBlank(filter)) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ sBuffer.append("Parameter error: include blank filter value of ")
+ .append(topicName).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ tmpFilter = filter.trim();
+ if (tmpFilter.length() > TBaseConstants.CFG_FLT_MAX_FILTER_ITEM_LENGTH) {
+ result.setFailResult(TErrCodeConstants.BAD_REQUEST,
+ sBuffer.append("Parameter error: over max allowed filter length, ")
+ .append(tmpFilter).append(" in ").append(topicName)
+ .append(", allowed length is ")
+ .append(TBaseConstants.CFG_FLT_MAX_FILTER_ITEM_LENGTH)
+ .toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ newFilterCondSet.add(tmpFilter);
+ }
+ }
+ newConsumeTargetMap.put(topicName, newFilterCondSet);
+ }
+ result.setSuccResult(newConsumeTargetMap);
+ return true;
+ }
+
+ // #lizard forgives
+ private class HeartTask2MasterWorker implements Runnable {
+ // Heartbeat logic between master and worker
+ @Override
+ public void run() {
+ ProcessResult result = new ProcessResult();
+ StringBuilder strBuffer = new StringBuilder(512);
+ try {
+ clientRmtDataCache.resumeTimeoutConsumePartitions(false,
+ consumerConfig.getPullProtectConfirmTimeoutMs());
+ // Send heartbeat request to master
+ ClientMaster.HeartResponseM2CV2 response =
+ masterService.consumerHeartbeatC2MV2(createMasterHeartBeatRequest(),
+ AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+ // Process unsuccessful response
+ if (response == null) {
+ logger.warn(strBuffer.append("[Heartbeat Failed] ")
+ .append("return result is null!").toString());
+ strBuffer.delete(0, strBuffer.length());
+ heartbeat2MRetryTimes++;
+ return;
+ }
+ if (response.getErrCode() != TErrCodeConstants.SUCCESS) {
+ // If master replies that cannot find current consumer node, re-register
+ if (response.getErrCode() == TErrCodeConstants.HB_NO_NODE) {
+ if (tryRegister2Master(result, strBuffer)) {
+ logger.info(strBuffer.append("[Re-register] ")
+ .append(consumerId).toString());
+ strBuffer.delete(0, strBuffer.length());
+ } else {
+ logger.info(result.getErrMsg());
+ }
+ return;
+ }
+ logger.error(strBuffer.append("[Heartbeat Failed] ")
+ .append(response.getErrMsg()).toString());
+ if (response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
+ adjustHeartBeatPeriod("certificate failure", strBuffer);
+ } else {
+ heartbeat2MRetryTimes++;
+ }
+ return;
+ }
+ // Process the heartbeat success response
+ heartbeat2MRetryTimes = 0;
+ clientRmtDataCache.updateBrokerInfoList(
+ response.getBrokerConfigId(),
+ response.getBrokerConfigListList(), strBuffer);
+ if (response.hasTopicMetaInfoId()) {
+ // update local cache meta information
+ needMetaSelfChk.compareAndSet(false, true);
+ clientRmtDataCache.storeTopicMetaInfo(response.getTopicMetaInfoId(),
+ response.getTopicMetaInfoListList());
+ lstMetaQueryTime.set(System.currentTimeMillis());
+ }
+ // Get the authorization rules and update the local rules
+ clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo());
+ // Get the latest authorized token
+ processHeartBeatAuthorizedToken(response);
+ // Warning if heartbeat interval is too long
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - lastHeartbeatTime2Master)
+ > consumerConfig.getHeartbeatPeriodMs() * 2) {
+ logger.warn(strBuffer.append(consumerId)
+ .append(" heartbeat interval to master is too long,please check! Total time : ")
+ .append(currentTime - lastHeartbeatTime2Master).toString());
+ strBuffer.delete(0, strBuffer.length());
+ }
+ lastHeartbeatTime2Master = currentTime;
+ } catch (InterruptedException ee) {
+ logger.info("To Master Heartbeat thread is interrupted,existed!");
+ } catch (Throwable e) {
+ // Print the log when meeting heartbeat errors.
+ // Reduce the heartbeat request frequency when failure count exceed the threshold
+ if (!isShutdown()) {
+ samplePrintCtrl.printExceptionCaught(e);
+ }
+ adjustHeartBeatPeriod("heartbeat exception", strBuffer);
+ }
+ }
+
+ private void adjustHeartBeatPeriod(String reason, StringBuilder sBuilder) {
+ lastHeartbeatTime2Master = System.currentTimeMillis();
+ heartbeat2MRetryTimes++;
+ if (!isShutdown()
+ && heartbeat2MRetryTimes > consumerConfig.getMaxHeartBeatRetryTimes()) {
+ logger.warn(sBuilder.append("Adjust HeartbeatPeriod for ").append(reason)
+ .append(", sleep ").append(consumerConfig.getHeartbeatPeriodAfterFail())
+ .append(" Ms").toString());
+ sBuilder.delete(0, sBuilder.length());
+ ThreadUtils.sleep(consumerConfig.getHeartbeatPeriodAfterFail());
+ }
+ }
+ }
+
+ private boolean tryRegister2Broker(Partition partition, long boostrapOffset,
+ ProcessResult result, StringBuilder sBuffer) {
+ try {
+ ClientBroker.RegisterResponseB2C response =
+ getBrokerService(partition.getBroker()).consumerRegisterC2B(
+ createBrokerRegisterRequest(partition, boostrapOffset),
+ AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+ if (response == null) {
+ result.setFailResult(TErrCodeConstants.CONNECT_RETURN_NULL,
+ sBuffer.append(" register ").append(partition.toString())
+ .append(" return null!").toString());
+ return result.isSuccess();
+ }
+ if (response.getSuccess()) {
+ long currOffset = response.hasCurrOffset()
+ ? response.getCurrOffset() : TBaseConstants.META_VALUE_UNDEFINED;
+ long maxOffset = response.hasMaxOffset()
+ ? response.getMaxOffset() : TBaseConstants.META_VALUE_UNDEFINED;
+ clientRmtDataCache.addPartition(partition, currOffset, maxOffset);
+ logger.info(sBuffer.append("Registered partition: consumer is ")
+ .append(consumerId).append(", partition=")
+ .append(partition.toString()).append(", boostrapOffset=")
+ .append(boostrapOffset).toString());
+ sBuffer.delete(0, sBuffer.length());
+ result.setSuccResult();
+ return result.isSuccess();
+ } else {
+ if (response.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED
+ || response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
+ clientRmtDataCache.removePartition(partition);
+ if (response.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED) {
+ result.setFailResult(response.getErrCode(),
+ sBuffer.append("[Partition occupied], curr consumerId: ")
+ .append(consumerId).append(", returned message : ")
+ .append(response.getErrMsg()).toString());
+ } else {
+ result.setFailResult(response.getErrCode(),
+ sBuffer.append("[Certificate failure], curr consumerId: ")
+ .append(consumerId).append(", returned message : ")
+ .append(response.getErrMsg()).toString());
+ }
+ } else {
+ result.setFailResult(response.getErrCode(),
+ sBuffer.append(" register ").append(partition.toString())
+ .append(" return ").append(response.getErrMsg()).toString());
+ }
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ } catch (Throwable e) {
+ sBuffer.delete(0, sBuffer.length());
+ result.setFailResult(TErrCodeConstants.UNSPECIFIED_ABNORMAL,
+ sBuffer.append("register ").append(partition.toString())
+ .append(" throw exception ").append(e.toString()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ }
+
+ private boolean tryRegister2Master(ProcessResult result, StringBuilder sBuffer) {
+ try {
+ ClientMaster.RegisterResponseM2CV2 response =
+ masterService.consumerRegisterC2MV2(createMasterRegisterRequest(),
+ AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+ if (response == null) {
+ result.setFailResult(TErrCodeConstants.CONNECT_RETURN_NULL,
+ sBuffer.append("Register Failed: ").append(consumerId)
+ .append(" register to master return null!").toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ if (response.getErrCode() != TErrCodeConstants.SUCCESS) {
+ // If the consumer group is forbidden, output the log
+ if (response.getErrCode()
+ == TErrCodeConstants.CONSUME_GROUP_FORBIDDEN) {
+ result.setFailResult(response.getErrCode(),
+ sBuffer.append("Register Failed: ").append(consumerId)
+ .append("'s ConsumeGroup forbidden, ")
+ .append(response.getErrMsg()).toString());
+ } else {
+ result.setFailResult(response.getErrCode(),
+ sBuffer.append("Register Failed: ").append(consumerId)
+ .append(" ").append(response.getErrMsg()).toString());
+ }
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ // Process the successful response
+ clientRmtDataCache.updateReg2MasterTime();
+ clientRmtDataCache.updateBrokerInfoList(response.getBrokerConfigId(),
+ response.getBrokerConfigListList(), sBuffer);
+ clientRmtDataCache.updOpsTaskInfo(response.getOpsTaskInfo());
+ processRegAuthorizedToken(response);
+ result.setSuccResult();
+ return result.isSuccess();
+ } catch (Throwable e) {
+ result.setFailResult(sBuffer.append("Register Failed: register to master throw ")
+ .append(e.getCause()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ return result.isSuccess();
+ }
+ }
+
+ // #lizard forgives
+ private class HeartTask2BrokerWorker implements Runnable {
+ @Override
+ public void run() {
+ StringBuilder strBuffer = new StringBuilder(512);
+ while (!isShutdown()) {
+ try {
+ // First check the last heartbeat interval. If it's larger than two periods,
+ // there may be some system hang up(e.g. long time gc, CPU is too busy).
+ // Print the warning message.
+ long currentTime = System.currentTimeMillis();
+ if ((currentTime - lastHeartbeatTime2Broker)
+ > (consumerConfig.getHeartbeatPeriodMs() * 2)) {
+ logger.warn(strBuffer.append(consumerId)
+ .append(" heartbeat to broker is too long, please check! Total time : ")
+ .append(currentTime - lastHeartbeatTime2Broker).toString());
+ strBuffer.delete(0, strBuffer.length());
+ }
+ // Send heartbeat request to the broker connect by the client
+ processBrokerHeatBeat(strBuffer);
+ if (needMetaSelfChk.compareAndSet(true, false)) {
+ // clear unsubscribable partitions
+ clearUnSubscribablePartitions();
+ }
+ if (clientRmtDataCache.isCsmFromMaxOffset()) {
+ resetCsmFromMaxOffset(strBuffer);
+ }
+ // Wait for next heartbeat
+ lastHeartbeatTime2Broker = System.currentTimeMillis();
+ Thread.sleep(consumerConfig.getHeartbeatPeriodMs());
+ } catch (Throwable e) {
+ lastHeartbeatTime2Broker = System.currentTimeMillis();
+ if (!isShutdown()) {
+ logger.error("heartbeat thread error 3 : ", e);
+ }
+ }
+ }
+ }
+
+ private void resetCsmFromMaxOffset(StringBuilder sBuffer) {
+ Set<String> regPartSet = clientRmtDataCache.getCurRegisteredPartSet();
+ if (regPartSet.isEmpty()) {
+ return;
+ }
+ Partition partition;
+ long boostrapOffset;
+ for (String partitionKey : regPartSet) {
+ if (TStringUtils.isBlank(partitionKey)) {
+ continue;
+ }
+ if (isShutdown()) {
+ break;
+ }
+ boostrapOffset =
+ clientRmtDataCache.getMaxOffsetOfPartition(partitionKey);
+ final String uniqueId =
+ sBuffer.append(consumerConfig.getConsumerGroup())
+ .append("#").append(partitionKey).toString();
+ sBuffer.delete(0, sBuffer.length());
+ synchronized (uniqueId) {
+ partition = clientRmtDataCache.getPartitionByKey(partitionKey);
+ if (partition == null) {
+ continue;
+ }
+ try {
+ ClientBroker.RegisterResponseB2C response =
+ getBrokerService(partition.getBroker()).consumerRegisterC2B(
+ createBrokerRegisterRequest(partition, boostrapOffset),
+ AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+ if (response == null) {
+ continue;
+ }
+ if (response.getSuccess()) {
+ long currOffset = response.hasCurrOffset()
+ ? response.getCurrOffset() : TBaseConstants.META_VALUE_UNDEFINED;
+ long maxOffset = response.hasMaxOffset()
+ ? response.getMaxOffset() : TBaseConstants.META_VALUE_UNDEFINED;
+ clientRmtDataCache.updPartOffsetInfo(
+ partitionKey, currOffset, maxOffset);
+ logger.info(sBuffer.append("[Admin Reset] consumer is ")
+ .append(consumerId).append(", partition=")
+ .append(partition.toString()).append(", consume from max=")
+ .append(currOffset).toString());
+ sBuffer.delete(0, sBuffer.length());
+ } else {
+ if (response.getErrCode() == TErrCodeConstants.PARTITION_OCCUPIED
+ || response.getErrCode() == TErrCodeConstants.CERTIFICATE_FAILURE) {
+ clientRmtDataCache.removePartition(partition);
+ }
+ }
+ } catch (Throwable e) {
+ sBuffer.delete(0, sBuffer.length());
+ logger.info(sBuffer.append("register ").append(partition.toString())
+ .append(" throw exception ").append(e.toString()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ }
+ }
+ }
+ }
+
+ private void processBrokerHeatBeat(StringBuilder sBuffer) {
+ List<String> partStrSet;
+ List<Partition> partitions;
+ List<String> strFailInfoList;
+ for (BrokerInfo brokerInfo : clientRmtDataCache.getAllRegisterBrokers()) {
+ if (isShutdown()) {
+ break;
+ }
+ partStrSet = new ArrayList<>();
+ try {
+ // Handle the heartbeat response for partitions belong to the same broker.
+ partitions = clientRmtDataCache.getBrokerPartitionList(brokerInfo);
+ if ((partitions != null) && (!partitions.isEmpty())) {
+ for (Partition partition : partitions) {
+ partStrSet.add(partition.toString());
+ }
+ ClientBroker.HeartBeatResponseB2C response =
+ getBrokerService(brokerInfo).consumerHeartbeatC2B(
+ createBrokerHeartBeatRequest(brokerInfo.getBrokerId(), partStrSet),
+ AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+ if (response == null) {
+ continue;
+ }
+ if (response.getSuccess()) {
+ // store require authenticate information
+ clientRmtDataCache.bookBrokerRequireAuthInfo(
+ brokerInfo.getBrokerId(), response);
+ // If the heartbeat response report failed partitions, release the
+ // corresponding local partition and log the operation
+ if (response.getHasPartFailure()) {
+ try {
+ strFailInfoList = response.getFailureInfoList();
+ for (String strFailInfo : strFailInfoList) {
+ final int index =
+ strFailInfo.indexOf(TokenConstants.ATTR_SEP);
+ if (index < 0) {
+ logger.error(sBuffer
+ .append("Parse Heartbeat response error : ")
+ .append("invalid response, ")
+ .append(strFailInfo).toString());
+ sBuffer.delete(0, sBuffer.length());
+ continue;
+ }
+ int errorCode =
+ Integer.parseInt(strFailInfo.substring(0, index));
+ Partition failPartition =
+ new Partition(strFailInfo.substring(index + 1));
+ clientRmtDataCache.removePartition(failPartition);
+ logger.warn(sBuffer
+ .append("[heart2broker error] partition:")
+ .append(failPartition.toString())
+ .append(", errorCode=")
+ .append(errorCode).toString());
+ sBuffer.delete(0, sBuffer.length());
+ }
+ } catch (Throwable ee) {
+ if (!isShutdown()) {
+ sBuffer.delete(0, sBuffer.length());
+ logger.error(sBuffer
+ .append("Parse Heartbeat response error :")
+ .append(ee.getMessage()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ }
+ }
+ }
+ } else if (response.getErrCode()
+ == TErrCodeConstants.CERTIFICATE_FAILURE) {
+ for (Partition partition : partitions) {
+ clientRmtDataCache.removePartition(partition);
+ }
+ logger.warn(sBuffer
+ .append("[heart2broker error] certificate failure, ")
+ .append(brokerInfo.getBrokerStrInfo())
+ .append("'s partitions area released, ")
+ .append(response.getErrMsg()).toString());
+ sBuffer.delete(0, sBuffer.length());
+ }
+ }
+ } catch (Throwable ee) {
+ // If there's error in the heartbeat, collect the log and print out.
+ // Release the log string buffer.
+ if (!isShutdown()) {
+ samplePrintCtrl.printExceptionCaught(ee);
+ if (!partStrSet.isEmpty()) {
+ sBuffer.delete(0, sBuffer.length());
+ for (String partitionStr : partStrSet) {
+ Partition tmpPartition = new Partition(partitionStr);
+ clientRmtDataCache.removePartition(tmpPartition);
+ logger.warn(sBuffer
+ .append("[heart2broker Throwable] release partition:")
+ .append(partitionStr).toString());
+ sBuffer.delete(0, sBuffer.length());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void unregisterPartition(Partition partition,
+ boolean isLastConsumed, StringBuilder sBuffer) {
+ try {
+ getBrokerService(partition.getBroker())
+ .consumerRegisterC2B(createBrokerUnregisterRequest(partition,
+ isLastConsumed),
+ AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+ logger.info(sBuffer.append("Unregister partition: consumer is ")
+ .append(consumerId).append(", partition=")
+ .append(partition.toString()).append(", isLastPackConsumed=")
+ .append(isLastConsumed).toString());
+ } catch (Throwable e) {
+ logger.error(sBuffer.append("Disconnect to Broker error! broker:")
+ .append(partition.getBroker().toString()).toString(), e);
+ sBuffer.delete(0, sBuffer.length());
+ }
+ }
+
+ /**
+ * Unregister partitions.
+ *
+ * @param unRegisterInfoMap partitions to be unregister
+ */
+ private void unregisterPartitions(
+ Map<BrokerInfo, List<PartitionSelectResult>> unRegisterInfoMap) {
+ StringBuilder strBuffer = new StringBuilder(512);
+ strBuffer.append("Unregister info:");
+ for (Map.Entry<BrokerInfo, List<PartitionSelectResult>> entry
+ : unRegisterInfoMap.entrySet()) {
+ for (PartitionSelectResult partResult : entry.getValue()) {
+ try {
+ getBrokerService(partResult.getPartition().getBroker())
+ .consumerRegisterC2B(createBrokerUnregisterRequest(partResult.getPartition(),
+ partResult.isLastPackConsumed()),
+ AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
+ } catch (Throwable e) {
+ logger.error(new StringBuilder(512)
+ .append("Disconnect to Broker error! broker:")
+ .append(partResult.getPartition().getBroker().toString()).toString(), e);
+ }
+ strBuffer.append(partResult.getPartition().toString());
+ strBuffer.append("\n");
+ }
+ }
+ logger.info(strBuffer.toString());
+ }
+
+ private ClientMaster.RegisterRequestC2MV2 createMasterRegisterRequest() throws Exception {
+ ClientMaster.RegisterRequestC2MV2.Builder builder =
+ ClientMaster.RegisterRequestC2MV2.newBuilder();
+ builder.setClientId(this.consumerId);
+ builder.setHostName(AddressUtils.getLocalAddress());
+ builder.setSourceCount(this.sourceCount);
+ builder.setNodeId(this.nodeId);
+ builder.setJdkVersion(MixedUtils.getJavaVersion());
+ builder.setGroupName(this.consumerConfig.getConsumerGroup());
+ builder.addAllTopicList(this.consumeSubInfo.getSubscribedTopics());
+ builder.addAllTopicCondition(formatTopicCondInfo(
+ this.consumeSubInfo.getTopicCondRegistry()));
+ builder.setSubRepInfo(this.clientRmtDataCache.buildClientSubRepInfo());
+ ClientMaster.OpsTaskInfo opsTaskInfo =
+ this.clientRmtDataCache.buildOpsTaskInfo();
+ if (opsTaskInfo != null) {
+ builder.setOpsTaskInfo(opsTaskInfo);
+ }
+ ClientMaster.MasterCertificateInfo authInfo = genMasterCertificateInfo(true);
+ if (authInfo != null) {
+ builder.setAuthInfo(authInfo);
+ }
+ return builder.build();
+ }
+
+ private ClientMaster.HeartRequestC2MV2 createMasterHeartBeatRequest() {
+ ClientMaster.HeartRequestC2MV2.Builder builder =
+ ClientMaster.HeartRequestC2MV2.newBuilder();
+ builder.setClientId(consumerId);
+ builder.setGroupName(this.consumerConfig.getConsumerGroup());
+ builder.setSubRepInfo(this.clientRmtDataCache.buildClientSubRepInfo());
+ ClientMaster.OpsTaskInfo opsTaskInfo = clientRmtDataCache.buildOpsTaskInfo();
+ if (opsTaskInfo != null) {
+ builder.setOpsTaskInfo(opsTaskInfo);
+ }
+ ClientMaster.MasterCertificateInfo authInfo = genMasterCertificateInfo(false);
+ if (authInfo != null) {
+ builder.setAuthInfo(authInfo);
+ }
+ return builder.build();
+ }
+
+ private ClientMaster.GetPartMetaRequestC2M createMasterGetPartMetaRequest() {
+ ClientMaster.GetPartMetaRequestC2M.Builder builder =
+ ClientMaster.GetPartMetaRequestC2M.newBuilder();
+ builder.setClientId(consumerId);
+ builder.setGroupName(this.consumerConfig.getConsumerGroup());
+ builder.setBrokerConfigId(this.clientRmtDataCache.getLastBrokerConfigId());
+ builder.setTopicMetaInfoId(this.clientRmtDataCache.getlastTopicMetaInfoId());
+ ClientMaster.MasterCertificateInfo authInfo = genMasterCertificateInfo(false);
+ if (authInfo != null) {
+ builder.setAuthInfo(authInfo);
+ }
+ return builder.build();
+ }
+
+ private ClientMaster.CloseRequestC2M createMasterCloseRequest() {
+ ClientMaster.CloseRequestC2M.Builder builder =
+ ClientMaster.CloseRequestC2M.newBuilder();
+ builder.setClientId(this.consumerId);
+ builder.setGroupName(this.consumerConfig.getConsumerGroup());
+ ClientMaster.MasterCertificateInfo authInfo =
+ genMasterCertificateInfo(false);
+ if (authInfo != null) {
+ builder.setAuthInfo(authInfo);
+ }
+ return builder.build();
+ }
+
+ private ClientBroker.RegisterRequestC2B createBrokerRegisterRequest(Partition partition,
+ long boostrapOffset) {
+ ClientBroker.RegisterRequestC2B.Builder builder =
+ ClientBroker.RegisterRequestC2B.newBuilder();
+ builder.setClientId(consumerId);
+ builder.setGroupName(this.consumerConfig.getConsumerGroup());
+ builder.setOpType(RpcConstants.MSG_OPTYPE_REGISTER);
+ builder.setTopicName(partition.getTopic());
+ builder.setPartitionId(partition.getPartitionId());
+ builder.setQryPriorityId(clientRmtDataCache.getQryPriorityId());
+ builder.setReadStatus(getGroupInitReadStatus(
+ clientRmtDataCache.bookPartition(partition.getPartitionKey())));
+ TopicProcessor topicProcessor =
+ this.consumeSubInfo.getTopicProcessor(partition.getTopic());
+ if (topicProcessor != null && topicProcessor.getFilterConds() != null) {
+ builder.addAllFilterCondStr(topicProcessor.getFilterConds());
+ }
+ if (boostrapOffset >= 0) {
+ builder.setCurrOffset(boostrapOffset);
+ }
+ builder.setAuthInfo(genBrokerAuthenticInfo(partition.getBrokerId(), false));
+ return builder.build();
+ }
+
+ private ClientBroker.RegisterRequestC2B createBrokerUnregisterRequest(Partition partition,
+ boolean isLastConsumered) {
+ ClientBroker.RegisterRequestC2B.Builder builder =
+ ClientBroker.RegisterRequestC2B.newBuilder();
+ builder.setClientId(consumerId);
+ builder.setGroupName(this.consumerConfig.getConsumerGroup());
+ builder.setOpType(RpcConstants.MSG_OPTYPE_UNREGISTER);
+ builder.setTopicName(partition.getTopic());
+ builder.setPartitionId(partition.getPartitionId());
+ if (isLastConsumered) {
+ builder.setReadStatus(0);
+ } else {
+ builder.setReadStatus(1);
+ }
+ builder.setAuthInfo(genBrokerAuthenticInfo(partition.getBrokerId(), true));
+ return builder.build();
+ }
+
+ private ClientBroker.HeartBeatRequestC2B createBrokerHeartBeatRequest(
+ int brokerId, List<String> partitionList) {
+ ClientBroker.HeartBeatRequestC2B.Builder builder =
+ ClientBroker.HeartBeatRequestC2B.newBuilder();
+ builder.setClientId(consumerId);
+ builder.setGroupName(this.consumerConfig.getConsumerGroup());
+ builder.setReadStatus(getGroupInitReadStatus(false));
+ builder.setQryPriorityId(clientRmtDataCache.getQryPriorityId());
+ builder.addAllPartitionInfo(partitionList);
+ builder.setAuthInfo(genBrokerAuthenticInfo(brokerId, false));
+ return builder.build();
+ }
+
+ /**
+ * Construct a get message request.
+ *
+ * @param partition message partition
+ * @param isLastConsumed if the last package consumed
+ * @return message request
+ */
+ protected ClientBroker.GetMessageRequestC2B createBrokerGetMessageRequest(
+ Partition partition, boolean isLastConsumed) {
+ ClientBroker.GetMessageRequestC2B.Builder builder =
+ ClientBroker.GetMessageRequestC2B.newBuilder();
+ builder.setClientId(this.consumerId);
+ builder.setGroupName(this.consumerConfig.getConsumerGroup());
+ builder.setTopicName(partition.getTopic());
+ builder.setEscFlowCtrl(clientRmtDataCache.isCurGroupInFlowCtrl());
+ builder.setPartitionId(partition.getPartitionId());
+ builder.setLastPackConsumed(isLastConsumed);
+ builder.setManualCommitOffset(false);
+ return builder.build();
+ }
+
+ /**
+ * Create a commit request.
+ *
+ * @param partition partition to be commit
+ * @param isConsumed if the last package consumed
+ * @return commit request
+ */
+ protected ClientBroker.CommitOffsetRequestC2B createBrokerCommitRequest(
+ Partition partition, boolean isConsumed) {
+ ClientBroker.CommitOffsetRequestC2B.Builder builder =
+ ClientBroker.CommitOffsetRequestC2B.newBuilder();
+ builder.setClientId(this.consumerId);
+ builder.setGroupName(this.consumerConfig.getConsumerGroup());
+ builder.setTopicName(partition.getTopic());
+ builder.setPartitionId(partition.getPartitionId());
+ builder.setLastPackConsumed(isConsumed);
+ return builder.build();
+ }
+
+ private List<String> formatTopicCondInfo(
+ final ConcurrentHashMap<String, TopicProcessor> topicCondMap) {
+ final StringBuilder strBuffer = new StringBuilder(512);
+ List<String> strTopicCondList = new ArrayList<>();
+ if ((topicCondMap != null) && (!topicCondMap.isEmpty())) {
+ for (Map.Entry<String, TopicProcessor> entry : topicCondMap.entrySet()) {
+ if (entry.getKey() == null || entry.getValue() == null) {
+ continue;
+ }
+ Set<String> condSet = entry.getValue().getFilterConds();
+ if (condSet != null && !condSet.isEmpty()) {
+ int i = 0;
+ strBuffer.append(entry.getKey()).append(TokenConstants.SEGMENT_SEP);
+ for (String condStr : condSet) {
+ if (i++ > 0) {
+ strBuffer.append(TokenConstants.ARRAY_SEP);
+ }
+ strBuffer.append(condStr);
+ }
+ strTopicCondList.add(strBuffer.toString());
+ strBuffer.delete(0, strBuffer.length());
+ }
+ }
+ }
+ return strTopicCondList;
+ }
+
+ private void clearUnSubscribablePartitions() throws Exception {
+ ProcessResult tmpResult = new ProcessResult();
+ Set<String> regPartSet = clientRmtDataCache.getCurRegisteredPartSet();
+ for (String partKey : regPartSet) {
+ if (!clientRmtDataCache.isPartSubscribable(partKey)) {
+ if (!disconnectFromPartition(partKey, tmpResult)
+ && tmpResult.getErrCode() == TErrCodeConstants.CLIENT_SHUTDOWN) {
+ break;
+ }
+ }
+ }
+ }
+
+ private ClientBroker.AuthorizedInfo genBrokerAuthenticInfo(int brokerId, boolean force) {
+ ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
+ ClientBroker.AuthorizedInfo.newBuilder();
+ authInfoBuilder.setVisitAuthorizedToken(visitToken.get());
+ if (this.consumerConfig.isEnableUserAuthentic()) {
+ if (clientRmtDataCache.markAndGetBrokerAuthStatus(brokerId, force)) {
+ authInfoBuilder.setAuthAuthorizedToken(authenticateHandler
+ .genBrokerAuthenticateToken(consumerConfig.getUsrName(),
+ consumerConfig.getUsrPassWord()));
+ }
+ }
+ return authInfoBuilder.build();
+ }
+
+ private ClientMaster.MasterCertificateInfo genMasterCertificateInfo(boolean force) {
+ ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = null;
+ if (this.consumerConfig.isEnableUserAuthentic()) {
+ authInfoBuilder = ClientMaster.MasterCertificateInfo.newBuilder();
+ if (clientRmtDataCache.markAndGetAuthStatus(force)) {
+ authInfoBuilder.setAuthInfo(authenticateHandler
+ .genMasterAuthenticateToken(consumerConfig.getUsrName(),
+ consumerConfig.getUsrPassWord()));
+ } else {
+ authInfoBuilder.setAuthorizedToken(authAuthorizedTokenRef.get());
+ }
+ }
+ if (authInfoBuilder != null) {
+ return authInfoBuilder.build();
+ }
+ return null;
+ }
+
+ private void processRegAuthorizedToken(ClientMaster.RegisterResponseM2CV2 response) {
+ if (response.hasAuthorizedInfo()) {
+ processAuthorizedToken(response.getAuthorizedInfo());
+ }
+ }
+
+ private void processHeartBeatAuthorizedToken(ClientMaster.HeartResponseM2CV2 response) {
+ if (response.hasAuthorizedInfo()) {
+ processAuthorizedToken(response.getAuthorizedInfo());
+ }
+ }
+
+ private void processAuthorizedToken(ClientMaster.MasterAuthorizedInfo inAuthorizedTokenInfo) {
+ if (inAuthorizedTokenInfo != null) {
+ visitToken.set(inAuthorizedTokenInfo.getVisitAuthorizedToken());
+ if (inAuthorizedTokenInfo.hasAuthAuthorizedToken()) {
+ String inAuthAuthorizedToken = inAuthorizedTokenInfo.getAuthAuthorizedToken();
+ if (TStringUtils.isNotBlank(inAuthAuthorizedToken)) {
+ String curAuthAuthorizedToken = authAuthorizedTokenRef.get();
+ if (!inAuthAuthorizedToken.equals(curAuthAuthorizedToken)) {
+ authAuthorizedTokenRef.set(inAuthAuthorizedToken);
+ }
+ }
+ }
+ }
+ }
+
+ private int getGroupInitReadStatus(boolean isFistReg) {
+ int readStatus = TBaseConstants.CONSUME_MODEL_READ_NORMAL;
+ switch (consumerConfig.getConsumePosition()) {
+ case CONSUMER_FROM_LATEST_OFFSET: {
+ if (isFistReg) {
+ readStatus = TBaseConstants.CONSUME_MODEL_READ_FROM_MAX;
+ logger.info("[Consume From Max Offset]" + consumerId);
+ }
+ break;
+ }
+ case CONSUMER_FROM_MAX_OFFSET_ALWAYS: {
+ if (isFistReg) {
+ readStatus = TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS;
+ logger.info("[Consume From Max Offset Always]" + consumerId);
+ }
+ break;
+ }
+ default: {
+ readStatus = TBaseConstants.CONSUME_MODEL_READ_NORMAL;
+ }
+ }
+ return readStatus;
+ }
+
+ /**
+ * Get the broker read service.
+ *
+ * @param brokerInfo broker information
+ * @return broker read service
+ */
+ protected BrokerReadService getBrokerService(BrokerInfo brokerInfo) {
+ return rpcServiceFactory.getService(BrokerReadService.class, brokerInfo, rpcConfig);
+ }
+
+ /**
+ * Generate consumer id.
+ *
+ * @return consumer id
+ * @throws Exception
+ */
+ private String generateConsumerID() throws Exception {
+ String pidName = ManagementFactory.getRuntimeMXBean().getName();
+ if (pidName != null && pidName.contains("@")) {
+ pidName = pidName.split("@")[0];
+ }
+ return new StringBuilder(256)
+ .append(this.consumerConfig.getConsumerGroup())
+ .append("_").append(AddressUtils.getLocalAddress())
+ .append("-").append(pidName)
+ .append("-").append(System.currentTimeMillis())
+ .append("-").append(consumerCounter.incrementAndGet())
+ .append("-Balance-")
+ .append(TubeClientVersion.CONSUMER_VERSION).toString();
+ }
+}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
index b9b1c81..357b4b4 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
@@ -24,6 +24,7 @@ import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.consumer.ClientBalanceConsumer;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
+import org.apache.inlong.tubemq.client.consumer.SimpleClientBalanceConsumer;
import org.apache.inlong.tubemq.client.consumer.SimplePullMessageConsumer;
import org.apache.inlong.tubemq.client.consumer.SimplePushMessageConsumer;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -50,7 +51,7 @@ public class TubeBaseSessionFactory implements InnerSessionFactory {
private final CopyOnWriteArrayList<Shutdownable> clientLst =
new CopyOnWriteArrayList<>();
private final DefaultBrokerRcvQltyStats brokerRcvQltyStats;
- private AtomicBoolean shutdown = new AtomicBoolean(false);
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
public TubeBaseSessionFactory(final ClientFactory clientFactory,
final TubeClientConfig tubeClientConfig) throws TubeClientException {
@@ -189,7 +190,15 @@ public class TubeBaseSessionFactory implements InnerSessionFactory {
@Override
public ClientBalanceConsumer createBalanceConsumer(ConsumerConfig consumerConfig)
throws TubeClientException {
- return null;
+ if (!tubeClientConfig.getMasterInfo().equals(consumerConfig.getMasterInfo())) {
+ throw new TubeClientException(new StringBuilder(512)
+ .append("consumerConfig's masterInfo not equal!")
+ .append(" SessionFactory's masterInfo is ")
+ .append(tubeClientConfig.getMasterInfo().getMasterClusterStr())
+ .append(", consumerConfig's masterInfo is ")
+ .append(consumerConfig.getMasterInfo().getMasterClusterStr()).toString());
+ }
+ return this.addClient(new SimpleClientBalanceConsumer(this, consumerConfig));
}
public boolean isShutdown() {
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeMultiSessionFactory.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeMultiSessionFactory.java
index 715f08e..c48f7b9 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeMultiSessionFactory.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeMultiSessionFactory.java
@@ -88,6 +88,9 @@ public class TubeMultiSessionFactory implements MessageSessionFactory {
@Override
public ClientBalanceConsumer createBalanceConsumer(ConsumerConfig consumerConfig)
throws TubeClientException {
- return null;
+ if (isShutDown.get()) {
+ throw new TubeClientException("Please initialize the object first!");
+ }
+ return this.baseSessionFactory.createBalanceConsumer(consumerConfig);
}
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeSingleSessionFactory.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeSingleSessionFactory.java
index f5d4411..962f3c2 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeSingleSessionFactory.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeSingleSessionFactory.java
@@ -104,7 +104,10 @@ public class TubeSingleSessionFactory implements MessageSessionFactory {
@Override
public ClientBalanceConsumer createBalanceConsumer(ConsumerConfig consumerConfig)
throws TubeClientException {
- return null;
+ if (isShutDown.get()) {
+ throw new TubeClientException("Please initialize the object first!");
+ }
+ return baseSessionFactory.createBalanceConsumer(consumerConfig);
}
public NettyClientFactory getRpcServiceFactory() {
diff --git a/inlong-tubemq/tubemq-client/src/test/java/org/apache/inlong/tubemq/client/consumer/RmtDataCacheTest.java b/inlong-tubemq/tubemq-client/src/test/java/org/apache/inlong/tubemq/client/consumer/RmtDataCacheTest.java
index 9ca7405..3b0ed5c 100644
--- a/inlong-tubemq/tubemq-client/src/test/java/org/apache/inlong/tubemq/client/consumer/RmtDataCacheTest.java
+++ b/inlong-tubemq/tubemq-client/src/test/java/org/apache/inlong/tubemq/client/consumer/RmtDataCacheTest.java
@@ -25,23 +25,24 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.corebase.cluster.BrokerInfo;
import org.apache.inlong.tubemq.corebase.cluster.Partition;
-import org.apache.inlong.tubemq.corebase.policies.FlowCtrlRuleHandler;
import org.junit.Test;
public class RmtDataCacheTest {
@Test
public void testRmtDataCache() {
- FlowCtrlRuleHandler groupFlowCtrlRuleHandler = new FlowCtrlRuleHandler(false);
- FlowCtrlRuleHandler defFlowCtrlRuleHandler = new FlowCtrlRuleHandler(true);
List<Partition> partitions = new ArrayList<>();
BrokerInfo brokerInfo = new BrokerInfo(1, "127.0.0.1", 18080);
Partition expectPartition = new Partition(brokerInfo, "test", 1);
partitions.add(expectPartition);
+ String masterAddrInfo = "127.0.0.1:8069";
+ String consumerGroup = "testGroup";
+ ConsumerConfig consumerConfig = new ConsumerConfig(masterAddrInfo, consumerGroup);
- RmtDataCache cache = new RmtDataCache(defFlowCtrlRuleHandler, groupFlowCtrlRuleHandler, partitions);
+ RmtDataCache cache = new RmtDataCache(consumerConfig, partitions);
List<Partition> brokerPartitions = cache.getBrokerPartitionList(brokerInfo);
assertEquals(1, brokerPartitions.size());
assertEquals(expectPartition.getPartitionId(), brokerPartitions.get(0).getPartitionId());
diff --git a/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/ClientBalanceConsumerExample.java b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/ClientBalanceConsumerExample.java
new file mode 100644
index 0000000..c495234
--- /dev/null
+++ b/inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/ClientBalanceConsumerExample.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.example;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.inlong.tubemq.client.common.ConfirmResult;
+import org.apache.inlong.tubemq.client.common.ConsumeResult;
+import org.apache.inlong.tubemq.client.common.QueryMetaResult;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ClientBalanceConsumer;
+import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.corebase.utils.MixedUtils;
+import org.apache.inlong.tubemq.corebase.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This demo shows how to consume message by client balance consumer.
+ *
+ * <p>Consume message in client balance mode achieved by
+ * {@link ClientBalanceConsumer#getMessage(ConsumeResult)}.
+ * Note that whenever {@link ClientBalanceConsumer#getMessage(ConsumeResult)}
+ * returns successfully, the return value(whether or not to be {@code null})
+ * should be processed by {@link ClientBalanceConsumer#confirmConsume(
+ * String, boolean, ConfirmResult)}.
+ */
+public final class ClientBalanceConsumerExample {
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(ClientBalanceConsumerExample.class);
+
+ private static final MsgSendReceiveStats msgRcvStats =
+ new MsgSendReceiveStats(false);
+ private static ClientBalanceConsumer consumer;
+ private static MessageSessionFactory messageSessionFactory;
+ // 0. Map of partitionKey and last success offset
+ // You can persist the information and use it when restarting or
+ // re-rolling to keep the current consumption to start from
+ // the offset required in the last round
+ private static final ConcurrentHashMap<String, Long> partitionOffsetMap =
+ new ConcurrentHashMap<>();
+
+ public static void main(String[] args) throws Throwable {
+ // 1. get and initial parameters
+ final String masterServers = args[0];
+ final String topics = args[1];
+ final String group = args[2];
+ final int msgCount = Integer.parseInt(args[3]);
+ final int totalGroupNodeCnt =
+ (args.length > 4) ? Integer.parseInt(args[4]) : 1;
+ final int nodeIndexId =
+ (args.length > 5) ? Integer.parseInt(args[5]) : 0;
+
+ // 2. initial consumer object
+ ConsumerConfig consumerConfig = new ConsumerConfig(masterServers, group);
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
+ messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
+ consumer = messageSessionFactory.createBalanceConsumer(consumerConfig);
+ final Map<String, TreeSet<String>> topicAndFiltersMap =
+ MixedUtils.parseTopicParam(topics);
+ final long metaInfoFetchInterval =
+ consumer.getConsumerConfig().getPartMetaInfoCheckPeriodMs();
+ // 3. start consumer
+ ProcessResult procResult = new ProcessResult();
+ if (!consumer.start(topicAndFiltersMap, totalGroupNodeCnt, nodeIndexId, procResult)) {
+ logger.info("Initial balance consumer failure, errcode is {}, errMsg is {}",
+ procResult.getErrCode(), procResult.getErrMsg());
+ return;
+ }
+
+ // 4. initial partition assign thread
+ Thread metaInfoUpdater = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ QueryMetaResult qryResult = new QueryMetaResult();
+ ProcessResult procResult = new ProcessResult();
+ do {
+ try {
+ // 4.1 judge consumer is shutdown
+ if (consumer.isShutdown()) {
+ logger.info("Consumer is shutdown!");
+ break;
+ }
+ // 4.2 get partition meta info
+ if (!consumer.getPartitionMetaInfo(qryResult)) {
+ // 4.2.1 judge consumer is shutdown
+ if (qryResult.getErrCode() == TErrCodeConstants.CLIENT_SHUTDOWN) {
+ logger.info("Consumer is shutdown!");
+ break;
+ }
+ } else {
+ // 4.2.2.1 get latest partition meta info
+ Map<String, Boolean> partMetaInfoMap = qryResult.getPartStatusMap();
+ if (partMetaInfoMap != null && !partMetaInfoMap.isEmpty()) {
+ // 4.2.2.2 assign partitions to current node
+ // by totalGroupNodeCnt and nodeIndexId parameters
+ Set<String> configuredTopicPartitions = partMetaInfoMap.keySet();
+ Set<String> assignedPartitions =
+ configuredTopicPartitions.stream().filter(p ->
+ (((((long) p.hashCode()) & 0xffffffffL)
+ % consumer.getSourceCount()) == consumer.getNodeId()))
+ .collect(Collectors.toCollection(TreeSet::new));
+ Set<String> rsvRegisteredPartSet = new TreeSet<>();
+ // 4.2.2.3 get current registered partition set
+ Set<String> curRegisteredPartSet =
+ consumer.getCurRegisteredPartSet();
+ // 4.2.2.4 remove unassigned or unsubscribable partitions
+ for (String partKey : curRegisteredPartSet) {
+ if (!assignedPartitions.contains(partKey)
+ || partMetaInfoMap.get(partKey) == Boolean.FALSE) {
+ if (!consumer.disconnectFromPartition(partKey, procResult)
+ && procResult.getErrCode() == TErrCodeConstants.CLIENT_SHUTDOWN) {
+ logger.info("Consumer is shutdown!");
+ break;
+ }
+ logger.info("Unregister " + partKey
+ + ", process result is " + procResult.isSuccess()
+ + ", err info is " + procResult.getErrMsg());
+ } else {
+ rsvRegisteredPartSet.add(partKey);
+ }
+ }
+ // 4.2.2.5 add assigned and subscribable partitions
+ for (String partKey : assignedPartitions) {
+ if (!rsvRegisteredPartSet.contains(partKey)
+ && partMetaInfoMap.get(partKey) == Boolean.TRUE) {
+ // Note: if you do not need to reset the boostrap
+ // consumption offset value, please set it to
+ // a negative number
+ Long boostrapOffset = partitionOffsetMap.get(partKey);
+ if (!consumer.connect2Partition(partKey,
+ boostrapOffset == null ? -1L : boostrapOffset,
+ procResult)) {
+ // 4.2.2.5.1 if client shutdown, the thread need exit!
+ if (procResult.getErrCode()
+ == TErrCodeConstants.CLIENT_SHUTDOWN) {
+ logger.info("Consumer is shutdown!");
+ break;
+ }
+ }
+ logger.info("Register " + partKey
+ + ", process result is " + procResult.isSuccess()
+ + ", err info is " + procResult.getErrMsg());
+ }
+ }
+ }
+ }
+ // 4.3 wait next assign interval
+ ThreadUtils.sleep(metaInfoFetchInterval);
+ } catch (Throwable e) {
+ logger.error("Consume messages failed!", e);
+ }
+ } while (true);
+ logger.info("Consumer existed client balance thread!");
+ }
+ }, "partition_assigner");
+
+ // 5. initial fetch threads
+ Thread[] fetchRunners = new Thread[3];
+ for (int i = 0; i < fetchRunners.length; i++) {
+ fetchRunners[i] = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ int getCount = msgCount;
+ ConsumeResult csmResult = new ConsumeResult();
+ ConfirmResult cfmResult = new ConfirmResult();
+ // wait partition status ready
+ do {
+ if (consumer.isPartitionsReady(5000)
+ || consumer.isShutdown()) {
+ break;
+ }
+ } while (true);
+ // consume messages
+ do {
+ // 5.1 judge consumer is shutdown
+ if (consumer.isShutdown()) {
+ logger.info("Consumer is shutdown!");
+ break;
+ }
+ // 5.2 get messages
+ if (consumer.getMessage(csmResult)) {
+ // 5.2.1.1 process messages if success
+ List<Message> messageList = csmResult.getMessageList();
+ if (messageList != null && !messageList.isEmpty()) {
+ msgRcvStats.addMsgCount(
+ csmResult.getTopicName(), messageList.size());
+ }
+ // 5.2.1.2 store current offset
+ partitionOffsetMap.put(
+ csmResult.getPartitionKey(), csmResult.getCurrOffset());
+ // 5.2.1.3 confirm messages to server
+ if (consumer.confirmConsume(
+ csmResult.getConfirmContext(), true, cfmResult)) {
+ // store confirmed offset
+ partitionOffsetMap.put(
+ cfmResult.getPartitionKey(), cfmResult.getCurrOffset());
+ }
+ } else {
+ // 5.2.2.1 print unexpected error
+ if (csmResult.getErrCode() == TErrCodeConstants.CLIENT_SHUTDOWN) {
+ logger.info("Found that the client has shutdown, exit!");
+ }
+ }
+ // judge reached required message count
+ if (msgCount > 0) {
+ if (--getCount <= 0) {
+ break;
+ }
+ }
+ } while (true);
+ } catch (Throwable e) {
+ logger.error("Consume messages failed!", e);
+ }
+ msgRcvStats.stopStats();
+ logger.info("Fetch runner exit!");
+ }
+ }, "_fetch_runner_" + i);
+ }
+
+ // 6. start threads
+ // 6.1 start partition assign thread
+ metaInfoUpdater.start();
+ // 6.2 start fetch threads
+ for (Thread thread : fetchRunners) {
+ thread.start();
+ }
+ // 6.3 initial statistic thread
+ Thread statisticThread =
+ new Thread(msgRcvStats, "Receive Statistic Thread");
+ statisticThread.start();
+ }
+}
+
+