You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by yu...@apache.org on 2021/01/29 10:07:15 UTC
[incubator-tubemq] 17/49: [TUBEMQ-451]Replace ConsumeTupleInfo with
Tuple2 (#349)
This is an automated email from the ASF dual-hosted git repository.
yuanbo pushed a commit to branch TUBEMQ-421
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 9e184b1049f940b6b1955bd35271119d42dcad54
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Dec 15 14:08:54 2020 +0800
[TUBEMQ-451]Replace ConsumeTupleInfo with Tuple2 (#349)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../org/apache/tubemq/server/master/TMaster.java | 37 +++++++++++-----------
.../nodeconsumer/ConsumerInfoHolder.java | 14 ++------
2 files changed, 22 insertions(+), 29 deletions(-)
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index b1bf1dc..8811cb0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -71,6 +71,7 @@ import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
import org.apache.tubemq.corebase.utils.DataConverterUtil;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corebase.utils.ThreadUtils;
+import org.apache.tubemq.corebase.utils.Tuple2;
import org.apache.tubemq.corerpc.RpcConfig;
import org.apache.tubemq.corerpc.RpcConstants;
import org.apache.tubemq.corerpc.RpcServiceFactory;
@@ -1883,14 +1884,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (consumerId == null) {
continue;
}
- ConsumerInfoHolder.ConsumeTupleInfo tupleInfo =
+ Tuple2<String, ConsumerInfo> tupleInfo =
consumerHolder.getConsumeTupleInfo(consumerId);
if (tupleInfo == null
- || tupleInfo.groupName == null
- || tupleInfo.consumerInfo == null) {
+ || tupleInfo.f0 == null
+ || tupleInfo.f1 == null) {
continue;
}
- List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.groupName);
+ List<String> blackTopicList = this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
Map<String, List<Partition>> topicSubPartMap = entry.getValue();
List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
@@ -1907,7 +1908,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
currentPartMap = new HashMap<>();
}
}
- if (tupleInfo.consumerInfo.isOverTLS()) {
+ if (tupleInfo.f1.isOverTLS()) {
for (Partition currentPart : currentPartMap.values()) {
if (!blackTopicList.contains(currentPart.getTopic())) {
boolean found = false;
@@ -1923,8 +1924,8 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
}
}
deletedSubInfoList
- .add(new SubscribeInfo(consumerId, tupleInfo.groupName,
- tupleInfo.consumerInfo.isOverTLS(), currentPart));
+ .add(new SubscribeInfo(consumerId, tupleInfo.f0,
+ tupleInfo.f1.isOverTLS(), currentPart));
}
for (Partition finalPart : finalPartList) {
if (!blackTopicList.contains(finalPart.getTopic())) {
@@ -1940,7 +1941,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
continue;
}
addedSubInfoList.add(new SubscribeInfo(consumerId,
- tupleInfo.groupName, true, finalPart));
+ tupleInfo.f0, true, finalPart));
}
}
} else {
@@ -1948,14 +1949,14 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if ((blackTopicList.contains(currentPart.getTopic()))
|| (!finalPartList.contains(currentPart))) {
deletedSubInfoList
- .add(new SubscribeInfo(consumerId, tupleInfo.groupName, false, currentPart));
+ .add(new SubscribeInfo(consumerId, tupleInfo.f0, false, currentPart));
}
}
for (Partition finalPart : finalPartList) {
if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
&& (!blackTopicList.contains(finalPart.getTopic()))) {
addedSubInfoList.add(new SubscribeInfo(consumerId,
- tupleInfo.groupName, false, finalPart));
+ tupleInfo.f0, false, finalPart));
}
}
}
@@ -2033,16 +2034,16 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (consumerId == null) {
continue;
}
- ConsumerInfoHolder.ConsumeTupleInfo tupleInfo =
+ Tuple2<String, ConsumerInfo> tupleInfo =
consumerHolder.getConsumeTupleInfo(consumerId);
if (tupleInfo == null
- || tupleInfo.groupName == null
- || tupleInfo.consumerInfo == null) {
+ || tupleInfo.f0 == null
+ || tupleInfo.f1 == null) {
continue;
}
List<String> blackTopicList =
- this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.groupName);
+ this.defaultBrokerConfManager.getBdbBlackTopicList(tupleInfo.f0);
Map<String, Map<String, Partition>> topicSubPartMap = entry.getValue();
List<SubscribeInfo> deletedSubInfoList = new ArrayList<>();
List<SubscribeInfo> addedSubInfoList = new ArrayList<>();
@@ -2066,15 +2067,15 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if ((blackTopicList.contains(currentPart.getTopic()))
|| (finalPartMap.get(currentPart.getPartitionKey()) == null)) {
deletedSubInfoList
- .add(new SubscribeInfo(consumerId, tupleInfo.groupName,
- tupleInfo.consumerInfo.isOverTLS(), currentPart));
+ .add(new SubscribeInfo(consumerId, tupleInfo.f0,
+ tupleInfo.f1.isOverTLS(), currentPart));
}
}
for (Partition finalPart : finalPartMap.values()) {
if ((currentPartMap.get(finalPart.getPartitionKey()) == null)
&& (!blackTopicList.contains(finalPart.getTopic()))) {
- addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.groupName,
- tupleInfo.consumerInfo.isOverTLS(), finalPart));
+ addedSubInfoList.add(new SubscribeInfo(consumerId, tupleInfo.f0,
+ tupleInfo.f1.isOverTLS(), finalPart));
}
}
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
index 5c78858..fc5d932 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/nodemanage/nodeconsumer/ConsumerInfoHolder.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.tubemq.corebase.cluster.ConsumerInfo;
+import org.apache.tubemq.corebase.utils.Tuple2;
public class ConsumerInfoHolder {
@@ -369,7 +370,7 @@ public class ConsumerInfoHolder {
return consumer;
}
- public ConsumeTupleInfo getConsumeTupleInfo(String consumerId) {
+ public Tuple2<String, ConsumerInfo> getConsumeTupleInfo(String consumerId) {
try {
rwLock.readLock().lock();
ConsumerInfo consumerInfo = null;
@@ -378,7 +379,7 @@ public class ConsumerInfoHolder {
if (consumeBandInfo != null) {
consumerInfo = consumeBandInfo.getConsumerInfo(consumerId);
}
- return new ConsumeTupleInfo(groupName, consumerInfo);
+ return new Tuple2<String, ConsumerInfo>(groupName, consumerInfo);
} finally {
rwLock.readLock().unlock();
}
@@ -424,13 +425,4 @@ public class ConsumerInfoHolder {
groupInfoMap.clear();
}
- public class ConsumeTupleInfo {
- public String groupName;
- public ConsumerInfo consumerInfo;
-
- public ConsumeTupleInfo(String groupName, ConsumerInfo consumerInfo) {
- this.groupName = groupName;
- this.consumerInfo = consumerInfo;
- }
- }
}