You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/05/22 04:57:20 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others (#90)

This is an automated email from the ASF dual-hosted git repository.

lamberliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 4893e28  [TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others (#90)
4893e28 is described below

commit 4893e283df3b8c18187c8eb637743e6ff89504ed
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri May 22 04:57:11 2020 +0000

    [TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others (#90)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../tubemq/client/consumer/RmtDataCache.java       | 54 ++++++++++++++--------
 1 file changed, 36 insertions(+), 18 deletions(-)

diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index 1d1dbfe..36545bd 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -23,11 +23,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -57,8 +55,8 @@ public class RmtDataCache implements Closeable {
     private final AtomicInteger waitCont = new AtomicInteger(0);
     private final ConcurrentHashMap<String, Timeout> timeouts =
             new ConcurrentHashMap<>();
-    private final BlockingQueue<String> indexPartition =
-            new LinkedBlockingQueue<>();
+    private final ConcurrentLinkedQueue<String> indexPartition =
+            new ConcurrentLinkedQueue<String>();
     private final ConcurrentHashMap<String /* index */, PartitionExt> partitionMap =
             new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String /* index */, Long> partitionUsedMap =
@@ -251,7 +249,23 @@ public class RmtDataCache implements Closeable {
             if (this.isClosed.get()) {
                 return null;
             }
-            String key = indexPartition.take();
+            String key = null;
+            do {
+                key = indexPartition.poll();
+                if (key != null) {
+                    break;
+                }
+                if (this.isClosed.get()) {
+                    break;
+                }
+                if (!partitionMap.isEmpty()) {
+                    break;
+                }
+                ThreadUtils.sleep(200);
+            } while(true);
+            if (key == null) {
+                return null;
+            }
             PartitionExt partitionExt = partitionMap.get(key);
             if (partitionExt == null) {
                 return null;
@@ -271,9 +285,12 @@ public class RmtDataCache implements Closeable {
     }
 
     protected boolean isPartitionInUse(String partitionKey, long usedToken) {
-        if (partitionMap.containsKey(partitionKey)) {
+        PartitionExt partitionExt = partitionMap.get(partitionKey);
+        if (partitionExt != null) {
             Long curToken = partitionUsedMap.get(partitionKey);
-            return curToken != null && curToken == usedToken;
+            if (curToken != null && curToken == usedToken) {
+                return true;
+            }
         }
         return false;
     }
@@ -322,7 +339,7 @@ public class RmtDataCache implements Closeable {
                     partitionUsedMap.remove(partitionKey);
                     partitionExt.setLastPackConsumed(isLastPackConsumed);
                     try {
-                        indexPartition.put(partitionKey);
+                        indexPartition.offer(partitionKey);
                     } catch (Throwable e) {
                         //
                     }
@@ -351,7 +368,7 @@ public class RmtDataCache implements Closeable {
                                 timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
                     } else {
                         try {
-                            indexPartition.put(partitionKey);
+                            indexPartition.offer(partitionKey);
                         } catch (Throwable e) {
                             //
                         }
@@ -384,7 +401,7 @@ public class RmtDataCache implements Closeable {
                                 timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
                     } else {
                         try {
-                            indexPartition.put(partitionKey);
+                            indexPartition.offer(partitionKey);
                         } catch (Throwable e) {
                             //
                         }
@@ -409,7 +426,7 @@ public class RmtDataCache implements Closeable {
             }
             for (int i = this.waitCont.get() + 1; i > 0; i--) {
                 try {
-                    indexPartition.put("------");
+                    indexPartition.offer("------");
                 } catch (Throwable e) {
                     //
                 }
@@ -629,12 +646,12 @@ public class RmtDataCache implements Closeable {
                 Long oldTime = partitionUsedMap.get(keyId);
                 if (oldTime != null && System.currentTimeMillis() - oldTime > allowedPeriodTimes) {
                     partitionUsedMap.remove(keyId);
-                    if (partitionMap.containsKey(keyId)) {
-                        PartitionExt partitionExt = partitionMap.get(keyId);
+                    PartitionExt partitionExt = partitionMap.get(keyId);
+                    if (partitionExt != null) {
                         partitionExt.setLastPackConsumed(false);
                         if (!indexPartition.contains(keyId)) {
                             try {
-                                indexPartition.put(keyId);
+                                indexPartition.offer(keyId);
                             } catch (Throwable e) {
                                 //
                             }
@@ -715,7 +732,7 @@ public class RmtDataCache implements Closeable {
             partitionUsedMap.remove(partition.getPartitionKey());
             if (!indexPartition.contains(partition.getPartitionKey())) {
                 try {
-                    indexPartition.put(partition.getPartitionKey());
+                    indexPartition.offer(partition.getPartitionKey());
                 } catch (Throwable e) {
                     //
                 }
@@ -757,7 +774,7 @@ public class RmtDataCache implements Closeable {
     }
 
     private boolean isTimeWait(String indexId) {
-        return this.timeouts.containsKey(indexId);
+        return (timeouts.get(indexId) != null);
     }
 
     private boolean hasPartitionWait() {
@@ -776,10 +793,11 @@ public class RmtDataCache implements Closeable {
         public void run(Timeout timeout) throws Exception {
             Timeout timeout1 = timeouts.remove(indexId);
             if (timeout1 != null) {
-                if (partitionMap.containsKey(indexId)) {
+                PartitionExt partitionExt = partitionMap.get(indexId);
+                if (partitionExt != null) {
                     if (!indexPartition.contains(this.indexId)) {
                         try {
-                            indexPartition.put(this.indexId);
+                            indexPartition.offer(this.indexId);
                         } catch (Throwable e) {
                             //
                         }