You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/09 02:00:37 UTC
[inlong] branch master updated: [INLONG-6096][TubeMQ] Optimize some code styles under the nodeconsumer module (#6097)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new abac81a62 [INLONG-6096][TubeMQ] Optimize some code styles under the nodeconsumer module (#6097)
abac81a62 is described below
commit abac81a628bcaa1e6258b1327928be8e99c89168
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sun Oct 9 10:00:32 2022 +0800
[INLONG-6096][TubeMQ] Optimize some code styles under the nodeconsumer module (#6097)
---
.../apache/inlong/tubemq/server/master/TMaster.java | 2 +-
.../nodemanage/nodeconsumer/ConsumeGroupInfo.java | 19 +++++++++++--------
.../nodemanage/nodeconsumer/ConsumerEventManager.java | 8 +++-----
3 files changed, 15 insertions(+), 14 deletions(-)
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index 08f3f01f8..c18c54b22 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -817,7 +817,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
strBuffer.delete(0, strBuffer.length());
try {
consumeGroupInfo.settAllocated();
- consumerEventManager.removeFirst(clientId);
+ consumerEventManager.removeFirst(clientId, strBuffer);
} catch (Throwable e) {
logger.warn("Unknown exception for remove first event:", e);
}
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
index 65b828189..13a1bba85 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
@@ -96,7 +96,7 @@ public class ConsumeGroupInfo {
new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
/**
- * Initial a Consume group node information.
+ * Initial a consumer group node information.
*
* @param consumer the consumer of consume group.
*/
@@ -298,7 +298,7 @@ public class ConsumeGroupInfo {
return;
}
String newConfig;
- String curCOnfig;
+ String curConfig;
boolean isChanged = false;
Set<String> newTopics = newMetaInfoMap.keySet();
Set<String> curTopics = topicMetaInfoMap.keySet();
@@ -308,11 +308,11 @@ public class ConsumeGroupInfo {
} else {
for (String topicKey : newTopics) {
newConfig = newMetaInfoMap.get(topicKey);
- curCOnfig = topicMetaInfoMap.get(topicKey);
+ curConfig = topicMetaInfoMap.get(topicKey);
if (newConfig == null) {
continue;
}
- if (!newConfig.equals(curCOnfig)) {
+ if (!newConfig.equals(curConfig)) {
isChanged = true;
break;
}
@@ -724,7 +724,8 @@ public class ConsumeGroupInfo {
logger.warn(sBuffer.toString());
return false;
}
- boolean foundEqual = false;
+ boolean foundOccupied = false;
+ int occupiedNodeId = -1;
String occupiedConsumerId = null;
for (ConsumerInfo consumerInfo : consumerInfoMap.values()) {
if (consumerInfo == null) {
@@ -732,16 +733,18 @@ public class ConsumeGroupInfo {
}
if (consumerInfo.getNodeId() == inConsumer.getNodeId()
&& !consumerInfo.getConsumerId().equals(inConsumer.getConsumerId())) {
- foundEqual = true;
+ foundOccupied = true;
+ occupiedNodeId = consumerInfo.getNodeId();
occupiedConsumerId = consumerInfo.getConsumerId();
break;
}
}
- if (foundEqual) {
+ if (foundOccupied) {
sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
.append("'s nodeId value(").append(inConsumer.getNodeId())
.append(") is occupied by ").append(occupiedConsumerId)
- .append(" in the group!");
+ .append("'s nodeId value(").append(occupiedNodeId)
+ .append(") in the group!");
result.setCheckResult(false,
TErrCodeConstants.CLIENT_DUPLICATE_INDEXID, sBuffer.toString());
logger.warn(sBuffer.toString());
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
index a669754fa..6a35d2f1b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
@@ -119,9 +119,8 @@ public class ConsumerEventManager {
* disconnect event have priority over connect event
*
* @param consumerId the consumer id that need removed
- * @return the first consumer removed from the event map
*/
- public ConsumerEvent removeFirst(String consumerId) {
+ public void removeFirst(String consumerId, StringBuilder strBuffer) {
ConsumerEvent event = null;
String group = consumerHolder.getGroupName(consumerId);
boolean selDisConnMap = hasDisconnectEvent(group);
@@ -144,12 +143,11 @@ public class ConsumerEventManager {
}
}
if (event != null) {
- logger.info(new StringBuilder(512)
- .append("[Event Removed] rebalanceId=")
+ logger.info(strBuffer.append("[Event Removed] rebalanceId=")
.append(event.getRebalanceId()).append(",clientId=")
.append(consumerId).toString());
+ strBuffer.delete(0, strBuffer.length());
}
- return event;
}
public int getUnfinishedCount(String groupName) {