You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/10/09 02:00:37 UTC

[inlong] branch master updated: [INLONG-6096][TubeMQ] Optimize some code styles under the nodeconsumer module (#6097)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new abac81a62 [INLONG-6096][TubeMQ] Optimize some code styles under the nodeconsumer module (#6097)
abac81a62 is described below

commit abac81a628bcaa1e6258b1327928be8e99c89168
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sun Oct 9 10:00:32 2022 +0800

    [INLONG-6096][TubeMQ] Optimize some code styles under the nodeconsumer module (#6097)
---
 .../apache/inlong/tubemq/server/master/TMaster.java   |  2 +-
 .../nodemanage/nodeconsumer/ConsumeGroupInfo.java     | 19 +++++++++++--------
 .../nodemanage/nodeconsumer/ConsumerEventManager.java |  8 +++-----
 3 files changed, 15 insertions(+), 14 deletions(-)

diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
index 08f3f01f8..c18c54b22 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/TMaster.java
@@ -817,7 +817,7 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             strBuffer.delete(0, strBuffer.length());
             try {
                 consumeGroupInfo.settAllocated();
-                consumerEventManager.removeFirst(clientId);
+                consumerEventManager.removeFirst(clientId, strBuffer);
             } catch (Throwable e) {
                 logger.warn("Unknown exception for remove first event:", e);
             }
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
index 65b828189..13a1bba85 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumeGroupInfo.java
@@ -96,7 +96,7 @@ public class ConsumeGroupInfo {
             new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
 
     /**
-     *  Initial a Consume group node information.
+     *  Initial a consumer group node information.
      *
      * @param consumer   the consumer of consume group.
      */
@@ -298,7 +298,7 @@ public class ConsumeGroupInfo {
             return;
         }
         String newConfig;
-        String curCOnfig;
+        String curConfig;
         boolean isChanged = false;
         Set<String> newTopics = newMetaInfoMap.keySet();
         Set<String> curTopics = topicMetaInfoMap.keySet();
@@ -308,11 +308,11 @@ public class ConsumeGroupInfo {
         } else {
             for (String topicKey : newTopics) {
                 newConfig = newMetaInfoMap.get(topicKey);
-                curCOnfig = topicMetaInfoMap.get(topicKey);
+                curConfig = topicMetaInfoMap.get(topicKey);
                 if (newConfig == null) {
                     continue;
                 }
-                if (!newConfig.equals(curCOnfig)) {
+                if (!newConfig.equals(curConfig)) {
                     isChanged = true;
                     break;
                 }
@@ -724,7 +724,8 @@ public class ConsumeGroupInfo {
                     logger.warn(sBuffer.toString());
                     return false;
                 }
-                boolean foundEqual = false;
+                boolean foundOccupied = false;
+                int occupiedNodeId = -1;
                 String occupiedConsumerId = null;
                 for (ConsumerInfo consumerInfo : consumerInfoMap.values()) {
                     if (consumerInfo == null) {
@@ -732,16 +733,18 @@ public class ConsumeGroupInfo {
                     }
                     if (consumerInfo.getNodeId() == inConsumer.getNodeId()
                             && !consumerInfo.getConsumerId().equals(inConsumer.getConsumerId())) {
-                        foundEqual = true;
+                        foundOccupied = true;
+                        occupiedNodeId = consumerInfo.getNodeId();
                         occupiedConsumerId = consumerInfo.getConsumerId();
                         break;
                     }
                 }
-                if (foundEqual) {
+                if (foundOccupied) {
                     sBuffer.append("[Inconsistency subscribe] ").append(inConsumer.getConsumerId())
                             .append("'s nodeId value(").append(inConsumer.getNodeId())
                             .append(") is occupied by ").append(occupiedConsumerId)
-                            .append(" in the group!");
+                            .append("'s nodeId value(").append(occupiedNodeId)
+                            .append(") in the group!");
                     result.setCheckResult(false,
                             TErrCodeConstants.CLIENT_DUPLICATE_INDEXID, sBuffer.toString());
                     logger.warn(sBuffer.toString());
diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
index a669754fa..6a35d2f1b 100644
--- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
+++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/nodemanage/nodeconsumer/ConsumerEventManager.java
@@ -119,9 +119,8 @@ public class ConsumerEventManager {
      * disconnect event have priority over connect event
      *
      * @param consumerId    the consumer id that need removed
-     * @return              the first consumer removed from the event map
      */
-    public ConsumerEvent removeFirst(String consumerId) {
+    public void removeFirst(String consumerId, StringBuilder strBuffer) {
         ConsumerEvent event = null;
         String group = consumerHolder.getGroupName(consumerId);
         boolean selDisConnMap = hasDisconnectEvent(group);
@@ -144,12 +143,11 @@ public class ConsumerEventManager {
             }
         }
         if (event != null) {
-            logger.info(new StringBuilder(512)
-                    .append("[Event Removed] rebalanceId=")
+            logger.info(strBuffer.append("[Event Removed] rebalanceId=")
                     .append(event.getRebalanceId()).append(",clientId=")
                     .append(consumerId).toString());
+            strBuffer.delete(0, strBuffer.length());
         }
-        return event;
     }
 
     public int getUnfinishedCount(String groupName) {