You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/05/22 06:05:09 UTC
[incubator-tubemq] branch release-0.3.0 updated: [TUBEMQ-149]Some
of the consumers stop consuming their corresponding partitions and never
release the partition to others (#91)
This is an automated email from the ASF dual-hosted git repository.
lamberliu pushed a commit to branch release-0.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/release-0.3.0 by this push:
new 595ef3d [TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others (#91)
595ef3d is described below
commit 595ef3de97b8c01ff8b948cdb3b6e2ac3879ca00
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri May 22 06:05:00 2020 +0000
[TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others (#91)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../tubemq/client/consumer/RmtDataCache.java | 50 ++++++++++++++--------
1 file changed, 33 insertions(+), 17 deletions(-)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index ac643af..ded1d99 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -23,11 +23,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -57,8 +55,8 @@ public class RmtDataCache implements Closeable {
private final AtomicInteger waitCont = new AtomicInteger(0);
private final ConcurrentHashMap<String, Timeout> timeouts =
new ConcurrentHashMap<String, Timeout>();
- private final BlockingQueue<String> indexPartition =
- new LinkedBlockingQueue<String>();
+ private final ConcurrentLinkedQueue<String> indexPartition =
+ new ConcurrentLinkedQueue<String>();
private final ConcurrentHashMap<String /* index */, PartitionExt> partitionMap =
new ConcurrentHashMap<String, PartitionExt>();
private final ConcurrentHashMap<String /* index */, Long> partitionUsedMap =
@@ -251,7 +249,23 @@ public class RmtDataCache implements Closeable {
if (this.isClosed.get()) {
return null;
}
- String key = indexPartition.take();
+ String key = null;
+ do {
+ key = indexPartition.poll();
+ if (key != null) {
+ break;
+ }
+ if (this.isClosed.get()) {
+ break;
+ }
+ if (!partitionMap.isEmpty()) {
+ break;
+ }
+ ThreadUtils.sleep(200);
+ } while(true);
+ if (key == null) {
+ return null;
+ }
PartitionExt partitionExt = partitionMap.get(key);
if (partitionExt == null) {
return null;
@@ -271,7 +285,8 @@ public class RmtDataCache implements Closeable {
}
protected boolean isPartitionInUse(String partitionKey, long usedToken) {
- if (partitionMap.containsKey(partitionKey)) {
+ PartitionExt partitionExt = partitionMap.get(partitionKey);
+ if (partitionExt != null) {
Long curToken = partitionUsedMap.get(partitionKey);
if (curToken != null && curToken == usedToken) {
return true;
@@ -326,7 +341,7 @@ public class RmtDataCache implements Closeable {
partitionUsedMap.remove(partitionKey);
partitionExt.setLastPackConsumed(isLastPackConsumed);
try {
- indexPartition.put(partitionKey);
+ indexPartition.offer(partitionKey);
} catch (Throwable e) {
//
}
@@ -355,7 +370,7 @@ public class RmtDataCache implements Closeable {
timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
} else {
try {
- indexPartition.put(partitionKey);
+ indexPartition.offer(partitionKey);
} catch (Throwable e) {
//
}
@@ -388,7 +403,7 @@ public class RmtDataCache implements Closeable {
timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
} else {
try {
- indexPartition.put(partitionKey);
+ indexPartition.offer(partitionKey);
} catch (Throwable e) {
//
}
@@ -413,7 +428,7 @@ public class RmtDataCache implements Closeable {
}
for (int i = this.waitCont.get() + 1; i > 0; i--) {
try {
- indexPartition.put("------");
+ indexPartition.offer("------");
} catch (Throwable e) {
//
}
@@ -634,12 +649,12 @@ public class RmtDataCache implements Closeable {
Long oldTime = partitionUsedMap.get(keyId);
if (oldTime != null && System.currentTimeMillis() - oldTime > allowedPeriodTimes) {
partitionUsedMap.remove(keyId);
- if (partitionMap.containsKey(keyId)) {
- PartitionExt partitionExt = partitionMap.get(keyId);
+ PartitionExt partitionExt = partitionMap.get(keyId);
+ if (partitionExt != null) {
partitionExt.setLastPackConsumed(false);
if (!indexPartition.contains(keyId)) {
try {
- indexPartition.put(keyId);
+ indexPartition.offer(keyId);
} catch (Throwable e) {
//
}
@@ -720,7 +735,7 @@ public class RmtDataCache implements Closeable {
partitionUsedMap.remove(partition.getPartitionKey());
if (!indexPartition.contains(partition.getPartitionKey())) {
try {
- indexPartition.put(partition.getPartitionKey());
+ indexPartition.offer(partition.getPartitionKey());
} catch (Throwable e) {
//
}
@@ -762,7 +777,7 @@ public class RmtDataCache implements Closeable {
}
private boolean isTimeWait(String indexId) {
- return this.timeouts.containsKey(indexId);
+ return (this.timeouts.get(indexId) != null);
}
private boolean hasPartitionWait() {
@@ -781,10 +796,11 @@ public class RmtDataCache implements Closeable {
public void run(Timeout timeout) throws Exception {
Timeout timeout1 = timeouts.remove(indexId);
if (timeout1 != null) {
- if (partitionMap.containsKey(indexId)) {
+ PartitionExt partitionExt = partitionMap.get(indexId);
+ if (partitionExt != null) {
if (!indexPartition.contains(this.indexId)) {
try {
- indexPartition.put(this.indexId);
+ indexPartition.offer(this.indexId);
} catch (Throwable e) {
//
}