You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/09/25 02:40:08 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-359] TubeMQ consume speed dropped to 0 in some partitions, it is a very serious bug (#274)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new ca17c8f  [TUBEMQ-359] TubeMQ consume speed dropped to 0 in some partitions, it is a very serious bug (#274)
ca17c8f is described below

commit ca17c8fd9499df9147fea0b0bee489238299f30e
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri Sep 25 10:40:01 2020 +0800

    [TUBEMQ-359] TubeMQ consume speed dropped to 0 in some partitions, it is a very serious bug (#274)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../client/consumer/BaseMessageConsumer.java       |  7 ++----
 .../tubemq/client/consumer/RmtDataCache.java       | 27 ++++++++++++----------
 .../tubemq/client/consumer/RmtDataCacheTest.java   |  2 +-
 3 files changed, 18 insertions(+), 18 deletions(-)

diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index b76b5eb..380b519 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -1486,11 +1486,8 @@ public class BaseMessageConsumer implements MessageConsumer {
         public void run() {
             StringBuilder strBuffer = new StringBuilder(256);
             try {
-                if (isPullConsume) {
-                    // For pull consume, do timeout check on partitions pulled without confirm
-                    rmtDataCache.resumeTimeoutConsumePartitions(
-                            consumerConfig.getPullProtectConfirmTimeoutMs());
-                }
+                rmtDataCache.resumeTimeoutConsumePartitions(isPullConsume,
+                        consumerConfig.getPullProtectConfirmTimeoutMs());
                 // Fetch the rebalance result, construct message adn return it.
                 ConsumerEvent event = rebalanceResults.poll();
                 List<SubscribeInfo> subInfoList = null;
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index a041042..745f127 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -681,18 +681,21 @@ public class RmtDataCache implements Closeable {
         return this.brokerPartitionConMap.get(brokerInfo);
     }
 
-    public void resumeTimeoutConsumePartitions(long allowedPeriodTimes) {
-        if (!partitionUsedMap.isEmpty()) {
-            List<String> partKeys = new ArrayList<>(partitionUsedMap.keySet());
-            for (String keyId : partKeys) {
-                Long oldTime = partitionUsedMap.get(keyId);
-                if (oldTime != null && System.currentTimeMillis() - oldTime > allowedPeriodTimes) {
-                    oldTime = partitionUsedMap.remove(keyId);
-                    if (oldTime != null) {
-                        PartitionExt partitionExt = partitionMap.get(keyId);
-                        if (partitionExt != null) {
-                            partitionExt.setLastPackConsumed(false);
-                            releaseIdlePartition(keyId);
+    public void resumeTimeoutConsumePartitions(boolean isPullConsume, long allowedPeriodTimes) {
+        if (isPullConsume) {
+            // For pull consume, do timeout check on partitions pulled without confirm
+            if (!partitionUsedMap.isEmpty()) {
+                List<String> partKeys = new ArrayList<>(partitionUsedMap.keySet());
+                for (String keyId : partKeys) {
+                    Long oldTime = partitionUsedMap.get(keyId);
+                    if (oldTime != null && System.currentTimeMillis() - oldTime > allowedPeriodTimes) {
+                        oldTime = partitionUsedMap.remove(keyId);
+                        if (oldTime != null) {
+                            PartitionExt partitionExt = partitionMap.get(keyId);
+                            if (partitionExt != null) {
+                                partitionExt.setLastPackConsumed(false);
+                                releaseIdlePartition(keyId);
+                            }
                         }
                     }
                 }
diff --git a/tubemq-client/src/test/java/org/apache/tubemq/client/consumer/RmtDataCacheTest.java b/tubemq-client/src/test/java/org/apache/tubemq/client/consumer/RmtDataCacheTest.java
index c68644e..eadbfa6 100644
--- a/tubemq-client/src/test/java/org/apache/tubemq/client/consumer/RmtDataCacheTest.java
+++ b/tubemq-client/src/test/java/org/apache/tubemq/client/consumer/RmtDataCacheTest.java
@@ -65,7 +65,7 @@ public class RmtDataCacheTest {
         assertEquals(1, cache.getBrokerPartitionList(brokerInfo).size());
         assertEquals(1, cache.getCurPartitionInfoMap().size());
         assertEquals(1, cache.getAllPartitionListWithStatus().size());
-        cache.resumeTimeoutConsumePartitions(1000);
+        cache.resumeTimeoutConsumePartitions(true, 1000);
         Map<BrokerInfo, List<Partition>> infoMap = new HashMap<>();
         cache.removeAndGetPartition(infoMap, new ArrayList<String>(), 1000, true);
         cache.getSubscribeInfoList("test", "test");