You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by al...@apache.org on 2020/05/27 07:53:36 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-158]
nextWithAuthInfo2B status should be managed independently according to
Broker (#101)
This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 78a29d3 [TUBEMQ-158] nextWithAuthInfo2B status should be managed independently according to Broker (#101)
78a29d3 is described below
commit 78a29d3932ad7bad3ea6c6b73da5c9432c45e9f7
Author: gosonzhang <46...@qq.com>
AuthorDate: Wed May 27 07:53:28 2020 +0000
[TUBEMQ-158] nextWithAuthInfo2B status should be managed independently according to Broker (#101)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../client/consumer/BaseMessageConsumer.java | 69 ++++++++++++++--------
.../tubemq/client/producer/ProducerManager.java | 17 +++---
2 files changed, 55 insertions(+), 31 deletions(-)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index b9aa993..3d806ca 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -121,7 +121,8 @@ public class BaseMessageConsumer implements MessageConsumer {
private long lastHeartbeatTime2Master = 0;
private long lastHeartbeatTime2Broker = 0;
private AtomicBoolean nextWithAuthInfo2M = new AtomicBoolean(false);
- private AtomicBoolean nextWithAuthInfo2B = new AtomicBoolean(false);
+ private ConcurrentHashMap<Integer, AtomicBoolean> nextWithAuthInfo2BMap
+ = new ConcurrentHashMap<Integer, AtomicBoolean>();
/**
* Construct a BaseMessageConsumer object.
@@ -960,9 +961,9 @@ public class BaseMessageConsumer implements MessageConsumer {
if (subInfoList != null) {
builder.addAllSubscribeInfo(DataConverterUtil.formatSubInfo(subInfoList));
}
- ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = genMasterCertificateInfo(true);
+ ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = genMasterCertificateInfo(false);
if (authInfoBuilder != null) {
- builder.setAuthInfo(authInfoBuilder.build());
+ builder.setAuthInfo(authInfoBuilder);
}
return builder.build();
}
@@ -974,7 +975,7 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setGroupName(this.consumerConfig.getConsumerGroup());
ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = genMasterCertificateInfo(true);
if (authInfoBuilder != null) {
- builder.setAuthInfo(authInfoBuilder.build());
+ builder.setAuthInfo(authInfoBuilder);
}
return builder.build();
}
@@ -1003,9 +1004,9 @@ public class BaseMessageConsumer implements MessageConsumer {
}
}
ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
- genBrokerAuthenticInfo(true);
+ genBrokerAuthenticInfo(partition.getBrokerId(), false);
if (authInfoBuilder != null) {
- builder.setAuthInfo(authInfoBuilder.build());
+ builder.setAuthInfo(authInfoBuilder);
}
return builder.build();
}
@@ -1025,15 +1026,15 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setReadStatus(1);
}
ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
- genBrokerAuthenticInfo(true);
+ genBrokerAuthenticInfo(partition.getBrokerId(), true);
if (authInfoBuilder != null) {
- builder.setAuthInfo(authInfoBuilder.build());
+ builder.setAuthInfo(authInfoBuilder);
}
return builder.build();
}
private ClientBroker.HeartBeatRequestC2B createBrokerHeartBeatRequest(
- List<String> partitionList) {
+ int brokerId, List<String> partitionList) {
ClientBroker.HeartBeatRequestC2B.Builder builder =
ClientBroker.HeartBeatRequestC2B.newBuilder();
builder.setClientId(consumerId);
@@ -1042,9 +1043,9 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
builder.addAllPartitionInfo(partitionList);
ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
- genBrokerAuthenticInfo(true);
+ genBrokerAuthenticInfo(brokerId, false);
if (authInfoBuilder != null) {
- builder.setAuthInfo(authInfoBuilder.build());
+ builder.setAuthInfo(authInfoBuilder);
}
return builder.build();
}
@@ -1103,6 +1104,7 @@ public class BaseMessageConsumer implements MessageConsumer {
boolean needAdd = false;
ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = null;
if (this.consumerConfig.isEnableUserAuthentic()) {
+ authInfoBuilder = ClientMaster.MasterCertificateInfo.newBuilder();
if (force) {
needAdd = true;
nextWithAuthInfo2M.set(false);
@@ -1111,27 +1113,37 @@ public class BaseMessageConsumer implements MessageConsumer {
needAdd = true;
}
}
- }
- if (needAdd) {
- authInfoBuilder = ClientMaster.MasterCertificateInfo.newBuilder();
- authInfoBuilder.setAuthInfo(authenticateHandler
+ if (needAdd) {
+ authInfoBuilder.setAuthInfo(authenticateHandler
.genMasterAuthenticateToken(consumerConfig.getUsrName(),
- consumerConfig.getUsrPassWord()).build());
+ consumerConfig.getUsrPassWord()));
+ } else {
+ authInfoBuilder.setAuthorizedToken(authAuthorizedTokenRef.get());
+ }
}
return authInfoBuilder;
}
- private ClientBroker.AuthorizedInfo.Builder genBrokerAuthenticInfo(boolean force) {
+ private ClientBroker.AuthorizedInfo.Builder genBrokerAuthenticInfo(int brokerId, boolean force) {
ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
ClientBroker.AuthorizedInfo.newBuilder();
authInfoBuilder.setVisitAuthorizedToken(visitToken.get());
if (this.consumerConfig.isEnableUserAuthentic()) {
boolean needAdd = false;
+ AtomicBoolean authStatus = nextWithAuthInfo2BMap.get(brokerId);
+ if (authStatus == null) {
+ AtomicBoolean tmpAuthStatus = new AtomicBoolean(false);
+ authStatus =
+ nextWithAuthInfo2BMap.putIfAbsent(brokerId, tmpAuthStatus);
+ if (authStatus == null) {
+ authStatus = tmpAuthStatus;
+ }
+ }
if (force) {
needAdd = true;
- nextWithAuthInfo2B.set(false);
- } else if (nextWithAuthInfo2B.get()) {
- if (nextWithAuthInfo2B.compareAndSet(true, false)) {
+ authStatus.set(false);
+ } else if (authStatus.get()) {
+ if (authStatus.compareAndSet(true, false)) {
needAdd = true;
}
}
@@ -1572,7 +1584,7 @@ public class BaseMessageConsumer implements MessageConsumer {
}
// Send heartbeat request to the broker connect by the client
for (BrokerInfo brokerInfo : rmtDataCache.getAllRegisterBrokers()) {
- List<String> partStrSet = new ArrayList<>();
+ List<String> partStrSet = new ArrayList<String>();
try {
// Handle the heartbeat response for partitions belong to the same broker.
List<Partition> partitions =
@@ -1583,14 +1595,25 @@ public class BaseMessageConsumer implements MessageConsumer {
}
ClientBroker.HeartBeatResponseB2C heartBeatResponseV2 =
getBrokerService(brokerInfo).consumerHeartbeatC2B(
- createBrokerHeartBeatRequest(partStrSet),
+ createBrokerHeartBeatRequest(brokerInfo.getBrokerId(), partStrSet),
AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
// When response is success
if (heartBeatResponseV2.getSuccess()) {
// If the peer require authentication, set a flag.
// The following request will attach the auth information.
if (heartBeatResponseV2.hasRequireAuth()) {
- nextWithAuthInfo2B.set(heartBeatResponseV2.getRequireAuth());
+ AtomicBoolean authStatus =
+ nextWithAuthInfo2BMap.get(brokerInfo.getBrokerId());
+ if (authStatus == null) {
+ AtomicBoolean tmpAuthStatus = new AtomicBoolean(false);
+ authStatus =
+ nextWithAuthInfo2BMap.putIfAbsent(
+ brokerInfo.getBrokerId(), tmpAuthStatus);
+ if (authStatus == null) {
+ authStatus = tmpAuthStatus;
+ }
+ }
+ authStatus.set(heartBeatResponseV2.getRequireAuth());
}
// If the heartbeat response report failed partitions, release the
// corresponding local partition and log the operation
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
index 1135f9c..cc1037b 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
@@ -452,8 +452,7 @@ public class ProducerManager {
this.lastBrokerUpdatedTime = System.currentTimeMillis();
}
builder.setHostName(AddressUtils.getLocalAddress());
- ClientMaster.MasterCertificateInfo.Builder authInfoBuilder =
- genMasterCertificateInfo(true);
+ ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = genMasterCertificateInfo(false);
if (authInfoBuilder != null) {
builder.setAuthInfo(authInfoBuilder.build());
}
@@ -467,7 +466,7 @@ public class ProducerManager {
ClientMaster.MasterCertificateInfo.Builder authInfoBuilder =
genMasterCertificateInfo(true);
if (authInfoBuilder != null) {
- builder.setAuthInfo(authInfoBuilder.build());
+ builder.setAuthInfo(authInfoBuilder);
}
return builder.build();
}
@@ -577,6 +576,7 @@ public class ProducerManager {
boolean needAdd = false;
ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = null;
if (this.tubeClientConfig.isEnableUserAuthentic()) {
+ authInfoBuilder = ClientMaster.MasterCertificateInfo.newBuilder();
if (force) {
needAdd = true;
nextWithAuthInfo2M.set(false);
@@ -585,12 +585,13 @@ public class ProducerManager {
needAdd = true;
}
}
- }
- if (needAdd) {
- authInfoBuilder = ClientMaster.MasterCertificateInfo.newBuilder();
- authInfoBuilder.setAuthInfo(authenticateHandler
+ if (needAdd) {
+ authInfoBuilder.setAuthInfo(authenticateHandler
.genMasterAuthenticateToken(tubeClientConfig.getUsrName(),
- tubeClientConfig.getUsrPassWord()).build());
+ tubeClientConfig.getUsrPassWord()));
+ } else {
+ authInfoBuilder.setAuthorizedToken(authAuthorizedTokenRef.get());
+ }
}
return authInfoBuilder;
}