You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by al...@apache.org on 2020/05/27 07:53:36 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-158] nextWithAuthInfo2B status should be managed independently according to Broker (#101)

This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 78a29d3  [TUBEMQ-158] nextWithAuthInfo2B status should be managed independently according to Broker (#101)
78a29d3 is described below

commit 78a29d3932ad7bad3ea6c6b73da5c9432c45e9f7
Author: gosonzhang <46...@qq.com>
AuthorDate: Wed May 27 07:53:28 2020 +0000

    [TUBEMQ-158] nextWithAuthInfo2B status should be managed independently according to Broker (#101)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../client/consumer/BaseMessageConsumer.java       | 69 ++++++++++++++--------
 .../tubemq/client/producer/ProducerManager.java    | 17 +++---
 2 files changed, 55 insertions(+), 31 deletions(-)

diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index b9aa993..3d806ca 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -121,7 +121,8 @@ public class BaseMessageConsumer implements MessageConsumer {
     private long lastHeartbeatTime2Master = 0;
     private long lastHeartbeatTime2Broker = 0;
     private AtomicBoolean nextWithAuthInfo2M = new AtomicBoolean(false);
-    private AtomicBoolean nextWithAuthInfo2B = new AtomicBoolean(false);
+    private ConcurrentHashMap<Integer, AtomicBoolean> nextWithAuthInfo2BMap
+        = new ConcurrentHashMap<Integer, AtomicBoolean>();
 
     /**
      * Construct a BaseMessageConsumer object.
@@ -960,9 +961,9 @@ public class BaseMessageConsumer implements MessageConsumer {
         if (subInfoList != null) {
             builder.addAllSubscribeInfo(DataConverterUtil.formatSubInfo(subInfoList));
         }
-        ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = genMasterCertificateInfo(true);
+        ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = genMasterCertificateInfo(false);
         if (authInfoBuilder != null) {
-            builder.setAuthInfo(authInfoBuilder.build());
+            builder.setAuthInfo(authInfoBuilder);
         }
         return builder.build();
     }
@@ -974,7 +975,7 @@ public class BaseMessageConsumer implements MessageConsumer {
         builder.setGroupName(this.consumerConfig.getConsumerGroup());
         ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = genMasterCertificateInfo(true);
         if (authInfoBuilder != null) {
-            builder.setAuthInfo(authInfoBuilder.build());
+            builder.setAuthInfo(authInfoBuilder);
         }
         return builder.build();
     }
@@ -1003,9 +1004,9 @@ public class BaseMessageConsumer implements MessageConsumer {
             }
         }
         ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
-                genBrokerAuthenticInfo(true);
+                genBrokerAuthenticInfo(partition.getBrokerId(), false);
         if (authInfoBuilder != null) {
-            builder.setAuthInfo(authInfoBuilder.build());
+            builder.setAuthInfo(authInfoBuilder);
         }
         return builder.build();
     }
@@ -1025,15 +1026,15 @@ public class BaseMessageConsumer implements MessageConsumer {
             builder.setReadStatus(1);
         }
         ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
-                genBrokerAuthenticInfo(true);
+                genBrokerAuthenticInfo(partition.getBrokerId(), true);
         if (authInfoBuilder != null) {
-            builder.setAuthInfo(authInfoBuilder.build());
+            builder.setAuthInfo(authInfoBuilder);
         }
         return builder.build();
     }
 
     private ClientBroker.HeartBeatRequestC2B createBrokerHeartBeatRequest(
-            List<String> partitionList) {
+            int brokerId, List<String> partitionList) {
         ClientBroker.HeartBeatRequestC2B.Builder builder =
                 ClientBroker.HeartBeatRequestC2B.newBuilder();
         builder.setClientId(consumerId);
@@ -1042,9 +1043,9 @@ public class BaseMessageConsumer implements MessageConsumer {
         builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
         builder.addAllPartitionInfo(partitionList);
         ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
-                genBrokerAuthenticInfo(true);
+                genBrokerAuthenticInfo(brokerId, false);
         if (authInfoBuilder != null) {
-            builder.setAuthInfo(authInfoBuilder.build());
+            builder.setAuthInfo(authInfoBuilder);
         }
         return builder.build();
     }
@@ -1103,6 +1104,7 @@ public class BaseMessageConsumer implements MessageConsumer {
         boolean needAdd = false;
         ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = null;
         if (this.consumerConfig.isEnableUserAuthentic()) {
+            authInfoBuilder = ClientMaster.MasterCertificateInfo.newBuilder();
             if (force) {
                 needAdd = true;
                 nextWithAuthInfo2M.set(false);
@@ -1111,27 +1113,37 @@ public class BaseMessageConsumer implements MessageConsumer {
                     needAdd = true;
                 }
             }
-        }
-        if (needAdd) {
-            authInfoBuilder = ClientMaster.MasterCertificateInfo.newBuilder();
-            authInfoBuilder.setAuthInfo(authenticateHandler
+            if (needAdd) {
+                authInfoBuilder.setAuthInfo(authenticateHandler
                     .genMasterAuthenticateToken(consumerConfig.getUsrName(),
-                            consumerConfig.getUsrPassWord()).build());
+                        consumerConfig.getUsrPassWord()));
+            } else {
+                authInfoBuilder.setAuthorizedToken(authAuthorizedTokenRef.get());
+            }
         }
         return authInfoBuilder;
     }
 
-    private ClientBroker.AuthorizedInfo.Builder genBrokerAuthenticInfo(boolean force) {
+    private ClientBroker.AuthorizedInfo.Builder genBrokerAuthenticInfo(int brokerId, boolean force) {
         ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
                 ClientBroker.AuthorizedInfo.newBuilder();
         authInfoBuilder.setVisitAuthorizedToken(visitToken.get());
         if (this.consumerConfig.isEnableUserAuthentic()) {
             boolean needAdd = false;
+            AtomicBoolean authStatus = nextWithAuthInfo2BMap.get(brokerId);
+            if (authStatus == null) {
+                AtomicBoolean tmpAuthStatus = new AtomicBoolean(false);
+                authStatus =
+                    nextWithAuthInfo2BMap.putIfAbsent(brokerId, tmpAuthStatus);
+                if (authStatus == null) {
+                    authStatus = tmpAuthStatus;
+                }
+            }
             if (force) {
                 needAdd = true;
-                nextWithAuthInfo2B.set(false);
-            } else if (nextWithAuthInfo2B.get()) {
-                if (nextWithAuthInfo2B.compareAndSet(true, false)) {
+                authStatus.set(false);
+            } else if (authStatus.get()) {
+                if (authStatus.compareAndSet(true, false)) {
                     needAdd = true;
                 }
             }
@@ -1572,7 +1584,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                     }
                     // Send heartbeat request to the broker connect by the client
                     for (BrokerInfo brokerInfo : rmtDataCache.getAllRegisterBrokers()) {
-                        List<String> partStrSet = new ArrayList<>();
+                        List<String> partStrSet = new ArrayList<String>();
                         try {
                             // Handle the heartbeat response for partitions belong to the same broker.
                             List<Partition> partitions =
@@ -1583,14 +1595,25 @@ public class BaseMessageConsumer implements MessageConsumer {
                                 }
                                 ClientBroker.HeartBeatResponseB2C heartBeatResponseV2 =
                                         getBrokerService(brokerInfo).consumerHeartbeatC2B(
-                                                createBrokerHeartBeatRequest(partStrSet),
+                                                createBrokerHeartBeatRequest(brokerInfo.getBrokerId(), partStrSet),
                                                 AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable());
                                 // When response is success
                                 if (heartBeatResponseV2.getSuccess()) {
                                     // If the peer require authentication, set a flag.
                                     // The following request will attach the auth information.
                                     if (heartBeatResponseV2.hasRequireAuth()) {
-                                        nextWithAuthInfo2B.set(heartBeatResponseV2.getRequireAuth());
+                                        AtomicBoolean authStatus =
+                                            nextWithAuthInfo2BMap.get(brokerInfo.getBrokerId());
+                                        if (authStatus == null) {
+                                            AtomicBoolean tmpAuthStatus = new AtomicBoolean(false);
+                                            authStatus =
+                                                nextWithAuthInfo2BMap.putIfAbsent(
+                                                    brokerInfo.getBrokerId(), tmpAuthStatus);
+                                            if (authStatus == null) {
+                                                authStatus = tmpAuthStatus;
+                                            }
+                                        }
+                                        authStatus.set(heartBeatResponseV2.getRequireAuth());
                                     }
                                     // If the heartbeat response report failed partitions, release the
                                     // corresponding local partition and log the operation
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
index 1135f9c..cc1037b 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/producer/ProducerManager.java
@@ -452,8 +452,7 @@ public class ProducerManager {
             this.lastBrokerUpdatedTime = System.currentTimeMillis();
         }
         builder.setHostName(AddressUtils.getLocalAddress());
-        ClientMaster.MasterCertificateInfo.Builder authInfoBuilder =
-                genMasterCertificateInfo(true);
+        ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = genMasterCertificateInfo(false);
         if (authInfoBuilder != null) {
             builder.setAuthInfo(authInfoBuilder.build());
         }
@@ -467,7 +466,7 @@ public class ProducerManager {
         ClientMaster.MasterCertificateInfo.Builder authInfoBuilder =
                 genMasterCertificateInfo(true);
         if (authInfoBuilder != null) {
-            builder.setAuthInfo(authInfoBuilder.build());
+            builder.setAuthInfo(authInfoBuilder);
         }
         return builder.build();
     }
@@ -577,6 +576,7 @@ public class ProducerManager {
         boolean needAdd = false;
         ClientMaster.MasterCertificateInfo.Builder authInfoBuilder = null;
         if (this.tubeClientConfig.isEnableUserAuthentic()) {
+            authInfoBuilder = ClientMaster.MasterCertificateInfo.newBuilder();
             if (force) {
                 needAdd = true;
                 nextWithAuthInfo2M.set(false);
@@ -585,12 +585,13 @@ public class ProducerManager {
                     needAdd = true;
                 }
             }
-        }
-        if (needAdd) {
-            authInfoBuilder = ClientMaster.MasterCertificateInfo.newBuilder();
-            authInfoBuilder.setAuthInfo(authenticateHandler
+            if (needAdd) {
+                authInfoBuilder.setAuthInfo(authenticateHandler
                     .genMasterAuthenticateToken(tubeClientConfig.getUsrName(),
-                            tubeClientConfig.getUsrPassWord()).build());
+                        tubeClientConfig.getUsrPassWord()));
+            } else {
+                authInfoBuilder.setAuthorizedToken(authAuthorizedTokenRef.get());
+            }
         }
         return authInfoBuilder;
     }