You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/05/25 03:11:34 UTC
[incubator-tubemq] branch release-0.3.0 updated: [TUBEMQ-149]Some
of the consumers stop consuming their corresponding partitions and never
release the partition to others[addendum] (#96)
This is an automated email from the ASF dual-hosted git repository.
lamberliu pushed a commit to branch release-0.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/release-0.3.0 by this push:
new a44773e [TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others[addendum] (#96)
a44773e is described below
commit a44773e954512c533a6bb6bc0bcba2278698fef8
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon May 25 03:11:26 2020 +0000
[TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others[addendum] (#96)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../tubemq/client/consumer/RmtDataCache.java | 79 ++++++++++++++--------
1 file changed, 49 insertions(+), 30 deletions(-)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index ded1d99..e138d2a 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -340,11 +340,7 @@ public class RmtDataCache implements Closeable {
if (oldUsedToken != null && oldUsedToken == usedToken) {
partitionUsedMap.remove(partitionKey);
partitionExt.setLastPackConsumed(isLastPackConsumed);
- try {
- indexPartition.offer(partitionKey);
- } catch (Throwable e) {
- //
- }
+ releaseIdlePartition(-1, partitionKey);
}
}
}
@@ -365,16 +361,7 @@ public class RmtDataCache implements Closeable {
partitionExt.setLastPackConsumed(isLastPackConsumed);
long waitDlt =
partitionExt.procConsumeResult(isFilterConsume);
- if (waitDlt > 10) {
- timeouts.put(partitionKey,
- timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
- } else {
- try {
- indexPartition.offer(partitionKey);
- } catch (Throwable e) {
- //
- }
- }
+ releaseIdlePartition(waitDlt, partitionKey);
}
}
}
@@ -398,21 +385,26 @@ public class RmtDataCache implements Closeable {
long waitDlt =
partitionExt.procConsumeResult(isFilterConsume, reqProcType,
errCode, msgSize, isEscLimit, limitDlt, curDataDlt, false);
- if (waitDlt > 10) {
- timeouts.put(partitionKey,
- timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
- } else {
- try {
- indexPartition.offer(partitionKey);
- } catch (Throwable e) {
- //
- }
- }
+ releaseIdlePartition(waitDlt, partitionKey);
}
}
}
}
+ private void releaseIdlePartition(long waitDlt, String partitionKey) {
+ if (waitDlt > 10) {
+ TimeoutTask timeoutTask = new TimeoutTask(partitionKey);
+ timeouts.put(partitionKey,
+ timer.newTimeout(timeoutTask, waitDlt, TimeUnit.MILLISECONDS));
+ } else {
+ try {
+ indexPartition.offer(partitionKey);
+ } catch (Throwable e) {
+ //
+ }
+ }
+ }
+
/**
* Close the remote data cache
*/
@@ -427,11 +419,7 @@ public class RmtDataCache implements Closeable {
timer = null;
}
for (int i = this.waitCont.get() + 1; i > 0; i--) {
- try {
- indexPartition.offer("------");
- } catch (Throwable e) {
- //
- }
+ releaseIdlePartition(-1, "------");
}
}
}
@@ -663,6 +651,31 @@ public class RmtDataCache implements Closeable {
}
}
}
+ // add timeout expired check
+ if (!timeouts.isEmpty()) {
+ List<String> partKeys = new ArrayList<String>();
+ partKeys.addAll(timeouts.keySet());
+ Timeout timeout1 = null;
+ for (String keyId : partKeys) {
+ timeout1 = timeouts.get(keyId);
+ if (timeout1 != null && timeout1.isExpired()) {
+ timeout1 = timeouts.remove(keyId);
+ if (timeout1 != null) {
+ PartitionExt partitionExt = partitionMap.get(keyId);
+ if (partitionExt != null) {
+ if (!indexPartition.contains(keyId)) {
+ try {
+ indexPartition.offer(keyId);
+ } catch (Throwable e) {
+ //
+ }
+ }
+ }
+ }
+ }
+ }
+
+ }
}
private void waitPartitions(List<String> partitionKeys, long inUseWaitPeriodMs) {
@@ -787,9 +800,15 @@ public class RmtDataCache implements Closeable {
public class TimeoutTask implements TimerTask {
private String indexId;
+ private long createTime = 0L;
public TimeoutTask(final String indexId) {
this.indexId = indexId;
+ this.createTime = System.currentTimeMillis();
+ }
+
+ public long getCreateTime() {
+ return this.createTime;
}
@Override