You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/09/25 02:40:08 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-359] TubeMQ
consume speed dropped to 0 in some partitions,
it is a very serious bug (#274)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new ca17c8f [TUBEMQ-359] TubeMQ consume speed dropped to 0 in some partitions, it is a very serious bug (#274)
ca17c8f is described below
commit ca17c8fd9499df9147fea0b0bee489238299f30e
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri Sep 25 10:40:01 2020 +0800
[TUBEMQ-359] TubeMQ consume speed dropped to 0 in some partitions, it is a very serious bug (#274)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../client/consumer/BaseMessageConsumer.java | 7 ++----
.../tubemq/client/consumer/RmtDataCache.java | 27 ++++++++++++----------
.../tubemq/client/consumer/RmtDataCacheTest.java | 2 +-
3 files changed, 18 insertions(+), 18 deletions(-)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index b76b5eb..380b519 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -1486,11 +1486,8 @@ public class BaseMessageConsumer implements MessageConsumer {
public void run() {
StringBuilder strBuffer = new StringBuilder(256);
try {
- if (isPullConsume) {
- // For pull consume, do timeout check on partitions pulled without confirm
- rmtDataCache.resumeTimeoutConsumePartitions(
- consumerConfig.getPullProtectConfirmTimeoutMs());
- }
+ rmtDataCache.resumeTimeoutConsumePartitions(isPullConsume,
+ consumerConfig.getPullProtectConfirmTimeoutMs());
// Fetch the rebalance result, construct message adn return it.
ConsumerEvent event = rebalanceResults.poll();
List<SubscribeInfo> subInfoList = null;
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index a041042..745f127 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -681,18 +681,21 @@ public class RmtDataCache implements Closeable {
return this.brokerPartitionConMap.get(brokerInfo);
}
- public void resumeTimeoutConsumePartitions(long allowedPeriodTimes) {
- if (!partitionUsedMap.isEmpty()) {
- List<String> partKeys = new ArrayList<>(partitionUsedMap.keySet());
- for (String keyId : partKeys) {
- Long oldTime = partitionUsedMap.get(keyId);
- if (oldTime != null && System.currentTimeMillis() - oldTime > allowedPeriodTimes) {
- oldTime = partitionUsedMap.remove(keyId);
- if (oldTime != null) {
- PartitionExt partitionExt = partitionMap.get(keyId);
- if (partitionExt != null) {
- partitionExt.setLastPackConsumed(false);
- releaseIdlePartition(keyId);
+ public void resumeTimeoutConsumePartitions(boolean isPullConsume, long allowedPeriodTimes) {
+ if (isPullConsume) {
+ // For pull consume, do timeout check on partitions pulled without confirm
+ if (!partitionUsedMap.isEmpty()) {
+ List<String> partKeys = new ArrayList<>(partitionUsedMap.keySet());
+ for (String keyId : partKeys) {
+ Long oldTime = partitionUsedMap.get(keyId);
+ if (oldTime != null && System.currentTimeMillis() - oldTime > allowedPeriodTimes) {
+ oldTime = partitionUsedMap.remove(keyId);
+ if (oldTime != null) {
+ PartitionExt partitionExt = partitionMap.get(keyId);
+ if (partitionExt != null) {
+ partitionExt.setLastPackConsumed(false);
+ releaseIdlePartition(keyId);
+ }
}
}
}
diff --git a/tubemq-client/src/test/java/org/apache/tubemq/client/consumer/RmtDataCacheTest.java b/tubemq-client/src/test/java/org/apache/tubemq/client/consumer/RmtDataCacheTest.java
index c68644e..eadbfa6 100644
--- a/tubemq-client/src/test/java/org/apache/tubemq/client/consumer/RmtDataCacheTest.java
+++ b/tubemq-client/src/test/java/org/apache/tubemq/client/consumer/RmtDataCacheTest.java
@@ -65,7 +65,7 @@ public class RmtDataCacheTest {
assertEquals(1, cache.getBrokerPartitionList(brokerInfo).size());
assertEquals(1, cache.getCurPartitionInfoMap().size());
assertEquals(1, cache.getAllPartitionListWithStatus().size());
- cache.resumeTimeoutConsumePartitions(1000);
+ cache.resumeTimeoutConsumePartitions(true, 1000);
Map<BrokerInfo, List<Partition>> infoMap = new HashMap<>();
cache.removeAndGetPartition(infoMap, new ArrayList<String>(), 1000, true);
cache.getSubscribeInfoList("test", "test");