You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/05/22 04:57:20 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-149]Some of the
consumers stop consuming their corresponding partitions and never release
the partition to others (#90)
This is an automated email from the ASF dual-hosted git repository.
lamberliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 4893e28 [TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others (#90)
4893e28 is described below
commit 4893e283df3b8c18187c8eb637743e6ff89504ed
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri May 22 04:57:11 2020 +0000
[TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others (#90)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../tubemq/client/consumer/RmtDataCache.java | 54 ++++++++++++++--------
1 file changed, 36 insertions(+), 18 deletions(-)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index 1d1dbfe..36545bd 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -23,11 +23,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -57,8 +55,8 @@ public class RmtDataCache implements Closeable {
private final AtomicInteger waitCont = new AtomicInteger(0);
private final ConcurrentHashMap<String, Timeout> timeouts =
new ConcurrentHashMap<>();
- private final BlockingQueue<String> indexPartition =
- new LinkedBlockingQueue<>();
+ private final ConcurrentLinkedQueue<String> indexPartition =
+ new ConcurrentLinkedQueue<String>();
private final ConcurrentHashMap<String /* index */, PartitionExt> partitionMap =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String /* index */, Long> partitionUsedMap =
@@ -251,7 +249,23 @@ public class RmtDataCache implements Closeable {
if (this.isClosed.get()) {
return null;
}
- String key = indexPartition.take();
+ String key = null;
+ do {
+ key = indexPartition.poll();
+ if (key != null) {
+ break;
+ }
+ if (this.isClosed.get()) {
+ break;
+ }
+ if (!partitionMap.isEmpty()) {
+ break;
+ }
+ ThreadUtils.sleep(200);
+ } while(true);
+ if (key == null) {
+ return null;
+ }
PartitionExt partitionExt = partitionMap.get(key);
if (partitionExt == null) {
return null;
@@ -271,9 +285,12 @@ public class RmtDataCache implements Closeable {
}
protected boolean isPartitionInUse(String partitionKey, long usedToken) {
- if (partitionMap.containsKey(partitionKey)) {
+ PartitionExt partitionExt = partitionMap.get(partitionKey);
+ if (partitionExt != null) {
Long curToken = partitionUsedMap.get(partitionKey);
- return curToken != null && curToken == usedToken;
+ if (curToken != null && curToken == usedToken) {
+ return true;
+ }
}
return false;
}
@@ -322,7 +339,7 @@ public class RmtDataCache implements Closeable {
partitionUsedMap.remove(partitionKey);
partitionExt.setLastPackConsumed(isLastPackConsumed);
try {
- indexPartition.put(partitionKey);
+ indexPartition.offer(partitionKey);
} catch (Throwable e) {
//
}
@@ -351,7 +368,7 @@ public class RmtDataCache implements Closeable {
timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
} else {
try {
- indexPartition.put(partitionKey);
+ indexPartition.offer(partitionKey);
} catch (Throwable e) {
//
}
@@ -384,7 +401,7 @@ public class RmtDataCache implements Closeable {
timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
} else {
try {
- indexPartition.put(partitionKey);
+ indexPartition.offer(partitionKey);
} catch (Throwable e) {
//
}
@@ -409,7 +426,7 @@ public class RmtDataCache implements Closeable {
}
for (int i = this.waitCont.get() + 1; i > 0; i--) {
try {
- indexPartition.put("------");
+ indexPartition.offer("------");
} catch (Throwable e) {
//
}
@@ -629,12 +646,12 @@ public class RmtDataCache implements Closeable {
Long oldTime = partitionUsedMap.get(keyId);
if (oldTime != null && System.currentTimeMillis() - oldTime > allowedPeriodTimes) {
partitionUsedMap.remove(keyId);
- if (partitionMap.containsKey(keyId)) {
- PartitionExt partitionExt = partitionMap.get(keyId);
+ PartitionExt partitionExt = partitionMap.get(keyId);
+ if (partitionExt != null) {
partitionExt.setLastPackConsumed(false);
if (!indexPartition.contains(keyId)) {
try {
- indexPartition.put(keyId);
+ indexPartition.offer(keyId);
} catch (Throwable e) {
//
}
@@ -715,7 +732,7 @@ public class RmtDataCache implements Closeable {
partitionUsedMap.remove(partition.getPartitionKey());
if (!indexPartition.contains(partition.getPartitionKey())) {
try {
- indexPartition.put(partition.getPartitionKey());
+ indexPartition.offer(partition.getPartitionKey());
} catch (Throwable e) {
//
}
@@ -757,7 +774,7 @@ public class RmtDataCache implements Closeable {
}
private boolean isTimeWait(String indexId) {
- return this.timeouts.containsKey(indexId);
+ return (timeouts.get(indexId) != null);
}
private boolean hasPartitionWait() {
@@ -776,10 +793,11 @@ public class RmtDataCache implements Closeable {
public void run(Timeout timeout) throws Exception {
Timeout timeout1 = timeouts.remove(indexId);
if (timeout1 != null) {
- if (partitionMap.containsKey(indexId)) {
+ PartitionExt partitionExt = partitionMap.get(indexId);
+ if (partitionExt != null) {
if (!indexPartition.contains(this.indexId)) {
try {
- indexPartition.put(this.indexId);
+ indexPartition.offer(this.indexId);
} catch (Throwable e) {
//
}