You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/05/22 06:05:09 UTC

[incubator-tubemq] branch release-0.3.0 updated: [TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others (#91)

This is an automated email from the ASF dual-hosted git repository.

lamberliu pushed a commit to branch release-0.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/release-0.3.0 by this push:
     new 595ef3d  [TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others (#91)
595ef3d is described below

commit 595ef3de97b8c01ff8b948cdb3b6e2ac3879ca00
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri May 22 06:05:00 2020 +0000

    [TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others (#91)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../tubemq/client/consumer/RmtDataCache.java       | 50 ++++++++++++++--------
 1 file changed, 33 insertions(+), 17 deletions(-)

diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index ac643af..ded1d99 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -23,11 +23,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,8 +55,8 @@ public class RmtDataCache implements Closeable {
     private final AtomicInteger waitCont = new AtomicInteger(0);
     private final ConcurrentHashMap<String, Timeout> timeouts =
             new ConcurrentHashMap<String, Timeout>();
-    private final BlockingQueue<String> indexPartition =
-            new LinkedBlockingQueue<String>();
+    private final ConcurrentLinkedQueue<String> indexPartition =
+            new ConcurrentLinkedQueue<String>();
     private final ConcurrentHashMap<String /* index */, PartitionExt> partitionMap =
             new ConcurrentHashMap<String, PartitionExt>();
     private final ConcurrentHashMap<String /* index */, Long> partitionUsedMap =
@@ -251,7 +249,23 @@ public class RmtDataCache implements Closeable {
             if (this.isClosed.get()) {
                 return null;
             }
-            String key = indexPartition.take();
+            String key = null;
+            do {
+                key = indexPartition.poll();
+                if (key != null) {
+                    break;
+                }
+                if (this.isClosed.get()) {
+                    break;
+                }
+                if (!partitionMap.isEmpty()) {
+                    break;
+                }
+                ThreadUtils.sleep(200);
+            } while(true);
+            if (key == null) {
+                return null;
+            }
             PartitionExt partitionExt = partitionMap.get(key);
             if (partitionExt == null) {
                 return null;
@@ -271,7 +285,8 @@ public class RmtDataCache implements Closeable {
     }
 
     protected boolean isPartitionInUse(String partitionKey, long usedToken) {
-        if (partitionMap.containsKey(partitionKey)) {
+        PartitionExt partitionExt = partitionMap.get(partitionKey);
+        if (partitionExt != null) {
             Long curToken = partitionUsedMap.get(partitionKey);
             if (curToken != null && curToken == usedToken) {
                 return true;
@@ -326,7 +341,7 @@ public class RmtDataCache implements Closeable {
                     partitionUsedMap.remove(partitionKey);
                     partitionExt.setLastPackConsumed(isLastPackConsumed);
                     try {
-                        indexPartition.put(partitionKey);
+                        indexPartition.offer(partitionKey);
                     } catch (Throwable e) {
                         //
                     }
@@ -355,7 +370,7 @@ public class RmtDataCache implements Closeable {
                                 timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
                     } else {
                         try {
-                            indexPartition.put(partitionKey);
+                            indexPartition.offer(partitionKey);
                         } catch (Throwable e) {
                             //
                         }
@@ -388,7 +403,7 @@ public class RmtDataCache implements Closeable {
                                 timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
                     } else {
                         try {
-                            indexPartition.put(partitionKey);
+                            indexPartition.offer(partitionKey);
                         } catch (Throwable e) {
                             //
                         }
@@ -413,7 +428,7 @@ public class RmtDataCache implements Closeable {
             }
             for (int i = this.waitCont.get() + 1; i > 0; i--) {
                 try {
-                    indexPartition.put("------");
+                    indexPartition.offer("------");
                 } catch (Throwable e) {
                     //
                 }
@@ -634,12 +649,12 @@ public class RmtDataCache implements Closeable {
                 Long oldTime = partitionUsedMap.get(keyId);
                 if (oldTime != null && System.currentTimeMillis() - oldTime > allowedPeriodTimes) {
                     partitionUsedMap.remove(keyId);
-                    if (partitionMap.containsKey(keyId)) {
-                        PartitionExt partitionExt = partitionMap.get(keyId);
+                    PartitionExt partitionExt = partitionMap.get(keyId);
+                    if (partitionExt != null) {
                         partitionExt.setLastPackConsumed(false);
                         if (!indexPartition.contains(keyId)) {
                             try {
-                                indexPartition.put(keyId);
+                                indexPartition.offer(keyId);
                             } catch (Throwable e) {
                                 //
                             }
@@ -720,7 +735,7 @@ public class RmtDataCache implements Closeable {
             partitionUsedMap.remove(partition.getPartitionKey());
             if (!indexPartition.contains(partition.getPartitionKey())) {
                 try {
-                    indexPartition.put(partition.getPartitionKey());
+                    indexPartition.offer(partition.getPartitionKey());
                 } catch (Throwable e) {
                     //
                 }
@@ -762,7 +777,7 @@ public class RmtDataCache implements Closeable {
     }
 
     private boolean isTimeWait(String indexId) {
-        return this.timeouts.containsKey(indexId);
+        return (this.timeouts.get(indexId) != null);
     }
 
     private boolean hasPartitionWait() {
@@ -781,10 +796,11 @@ public class RmtDataCache implements Closeable {
         public void run(Timeout timeout) throws Exception {
             Timeout timeout1 = timeouts.remove(indexId);
             if (timeout1 != null) {
-                if (partitionMap.containsKey(indexId)) {
+                PartitionExt partitionExt = partitionMap.get(indexId);
+                if (partitionExt != null) {
                     if (!indexPartition.contains(this.indexId)) {
                         try {
-                            indexPartition.put(this.indexId);
+                            indexPartition.offer(this.indexId);
                         } catch (Throwable e) {
                             //
                         }