You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:07:15 UTC

[incubator-tubemq] 17/49: [TUBEMQ-451]Replace ConsumeTupleInfo with Tuple2 (#349)

This is an automated email from the ASF dual-hosted git repository.

yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git

commit 9e184b1049f940b6b1955bd35271119d42dcad54
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Dec 15 14:08:54 2020 +0800

    [TUBEMQ-451]Replace ConsumeTupleInfo with Tuple2 (#349)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../org/apache/tubemq/server/master/TMaster.java   | 37 +++++++++++-----------
 .../nodeconsumer/ConsumerInfoHolder.java           | 14 ++------
 2 files changed, 22 insertions(+), 29 deletions(-)

diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index b1bf1dc..8811cb0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -71,6 +71,7 @@ import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
 import org.apache.tubemq.corebase.utils.DataConverterUtil;
 import org.apache.tubemq.corebase.utils.TStringUtils;
 import org.apache.tubemq.corebase.utils.ThreadUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
 import org.apache.tubemq.corerpc.RpcConfig;
 import org.apache.tubemq.corerpc.RpcConstants;
 import org.apache.tubemq.corerpc.RpcServiceFactory;
@@ -1883,14 +1884,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             if (consumerId == null) {
                 continue;
             }
-            ConsumerInfoHolder.ConsumeTupleInfo tupleInfo =
+            Tuple2<String, ConsumerInfo> tupleInfo =
                     consumerHolder.getConsumeTupleInfo(consumerId);
             if (tupleInfo == null
-                    || tupleInfo.groupName == null
-                    || tupleInfo.consumerInfo == null) {
+                    || tupleInfo.f0 == null
+                    || tupleInfo.f1 == null) {
                 continue;
             }
-            List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.groupName);
+            List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
             Map<String, List<Partition>> topicSubPartMap = entry.getValue();
             List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
             List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
@@ -1907,7 +1908,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                         currentPartMap = new HashMap<>();
                     }
                 }
-                if (tupleInfo.consumerInfo.isOverTLS()) {
+                if (tupleInfo.f1.isOverTLS()) {
                     for (Partition currentPart : currentPartMap.values()) {
                         if (!blackTopicList.contains(currentPart.getTopic())) {
                             boolean found = false;
@@ -1923,8 +1924,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                             }
                         }
                         deletedSubInfoList
-                                .add(new SubscribeInfo(consumerId, tupleInfo.groupName,
-                                        tupleInfo.consumerInfo.isOverTLS(), currentPart));
+                                .add(new SubscribeInfo(consumerId, tupleInfo.f0,
+                                        tupleInfo.f1.isOverTLS(), currentPart));
                     }
                     for (Partition finalPart : finalPartList) {
                         if (!blackTopicList.contains(finalPart.getTopic())) {
@@ -1940,7 +1941,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                                 continue;
                             }
                             addedSubInfoList.add(new SubscribeInfo(consumerId,
-                                    tupleInfo.groupName, true, finalPart));
+                                    tupleInfo.f0, true, finalPart));
                         }
                     }
                 } else {
@@ -1948,14 +1949,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                         if ((blackTopicList.contains(currentPart.getTopic()))
                                 || (!finalPartList.contains(currentPart))) {
                             deletedSubInfoList
-                                    .add(new SubscribeInfo(consumerId, tupleInfo.groupName, false, currentPart));
+                                    .add(new SubscribeInfo(consumerId, tupleInfo.f0, false, currentPart));
                         }
                     }
                     for (Partition finalPart : finalPartList) {
                         if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
                                 && (!blackTopicList.contains(finalPart.getTopic()))) {
                             addedSubInfoList.add(new SubscribeInfo(consumerId,
-                                    tupleInfo.groupName, false, finalPart));
+                                    tupleInfo.f0, false, finalPart));
                         }
                     }
                 }
@@ -2033,16 +2034,16 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             if (consumerId == null) {
                 continue;
             }
-            ConsumerInfoHolder.ConsumeTupleInfo tupleInfo =
+            Tuple2<String, ConsumerInfo> tupleInfo =
                     consumerHolder.getConsumeTupleInfo(consumerId);
             if (tupleInfo == null
-                    || tupleInfo.groupName == null
-                    || tupleInfo.consumerInfo == null) {
+                    || tupleInfo.f0 == null
+                    || tupleInfo.f1 == null) {
                 continue;
             }
 
             List<String> blackTopicList =
-                    this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.groupName);
+                    this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
             Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue();
             List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
             List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
@@ -2066,15 +2067,15 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                     if ((blackTopicList.contains(currentPart.getTopic()))
                             || (finalPartMap.get(currentPart.getPartitionKey()) == null)) {
                         deletedSubInfoList
-                                .add(new SubscribeInfo(consumerId, tupleInfo.groupName,
-                                        tupleInfo.consumerInfo.isOverTLS(), currentPart));
+                                .add(new SubscribeInfo(consumerId, tupleInfo.f0,
+                                        tupleInfo.f1.isOverTLS(), currentPart));
                     }
                 }
                 for (Partition finalPart : finalPartMap.values()) {
                     if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
                             && (!blackTopicList.contains(finalPart.getTopic()))) {
-                        addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.groupName,
-                                tupleInfo.consumerInfo.isOverTLS(), finalPart));
+                        addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.f0,
+                                tupleInfo.f1.isOverTLS(), finalPart));
                     }
                 }
             }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
index 5c78858..fc5d932 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.tubemq.corebase.cluster.ConsumerInfo;
+import org.apache.tubemq.corebase.utils.Tuple2;
 
 
 public class ConsumerInfoHolder {
@@ -369,7 +370,7 @@ public class ConsumerInfoHolder {
         return consumer;
     }
 
-    public ConsumeTupleInfo getConsumeTupleInfo(String consumerId) {
+    public Tuple2<String, ConsumerInfo> getConsumeTupleInfo(String consumerId) {
         try {
             rwLock.readLock().lock();
             ConsumerInfo consumerInfo = null;
@@ -378,7 +379,7 @@ public class ConsumerInfoHolder {
             if (consumeBandInfo != null) {
                 consumerInfo = consumeBandInfo.getConsumerInfo(consumerId);
             }
-            return new ConsumeTupleInfo(groupName, consumerInfo);
+            return new Tuple2<String, ConsumerInfo>(groupName, consumerInfo);
         } finally {
             rwLock.readLock().unlock();
         }
@@ -424,13 +425,4 @@ public class ConsumerInfoHolder {
         groupInfoMap.clear();
     }
 
-    public class ConsumeTupleInfo {
-        public String groupName;
-        public ConsumerInfo consumerInfo;
-
-        public ConsumeTupleInfo(String groupName, ConsumerInfo consumerInfo) {
-            this.groupName = groupName;
-            this.consumerInfo = consumerInfo;
-        }
-    }
 }