You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/05/25 03:11:34 UTC

[incubator-tubemq] branch release-0.3.0 updated: [TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others[addendum] (#96)

This is an automated email from the ASF dual-hosted git repository.

lamberliu pushed a commit to branch release-0.3.0
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/release-0.3.0 by this push:
     new a44773e  [TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others[addendum] (#96)
a44773e is described below

commit a44773e954512c533a6bb6bc0bcba2278698fef8
Author: gosonzhang <46...@qq.com>
AuthorDate: Mon May 25 03:11:26 2020 +0000

    [TUBEMQ-149]Some of the consumers stop consuming their corresponding partitions and never release the partition to others[addendum] (#96)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../tubemq/client/consumer/RmtDataCache.java       | 79 ++++++++++++++--------
 1 file changed, 49 insertions(+), 30 deletions(-)

diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index ded1d99..e138d2a 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -340,11 +340,7 @@ public class RmtDataCache implements Closeable {
                 if (oldUsedToken != null && oldUsedToken == usedToken) {
                     partitionUsedMap.remove(partitionKey);
                     partitionExt.setLastPackConsumed(isLastPackConsumed);
-                    try {
-                        indexPartition.offer(partitionKey);
-                    } catch (Throwable e) {
-                        //
-                    }
+                    releaseIdlePartition(-1, partitionKey);
                 }
             }
         }
@@ -365,16 +361,7 @@ public class RmtDataCache implements Closeable {
                     partitionExt.setLastPackConsumed(isLastPackConsumed);
                     long waitDlt =
                             partitionExt.procConsumeResult(isFilterConsume);
-                    if (waitDlt > 10) {
-                        timeouts.put(partitionKey,
-                                timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
-                    } else {
-                        try {
-                            indexPartition.offer(partitionKey);
-                        } catch (Throwable e) {
-                            //
-                        }
-                    }
+                    releaseIdlePartition(waitDlt, partitionKey);
                 }
             }
         }
@@ -398,21 +385,26 @@ public class RmtDataCache implements Closeable {
                     long waitDlt =
                             partitionExt.procConsumeResult(isFilterConsume, reqProcType,
                                     errCode, msgSize, isEscLimit, limitDlt, curDataDlt, false);
-                    if (waitDlt > 10) {
-                        timeouts.put(partitionKey,
-                                timer.newTimeout(new TimeoutTask(partitionKey), waitDlt, TimeUnit.MILLISECONDS));
-                    } else {
-                        try {
-                            indexPartition.offer(partitionKey);
-                        } catch (Throwable e) {
-                            //
-                        }
-                    }
+                    releaseIdlePartition(waitDlt, partitionKey);
                 }
             }
         }
     }
 
+    private void releaseIdlePartition(long waitDlt, String partitionKey) {
+        if (waitDlt > 10) {
+            TimeoutTask timeoutTask = new TimeoutTask(partitionKey);
+            timeouts.put(partitionKey,
+                timer.newTimeout(timeoutTask, waitDlt, TimeUnit.MILLISECONDS));
+        } else {
+            try {
+                indexPartition.offer(partitionKey);
+            } catch (Throwable e) {
+                //
+            }
+        }
+    }
+
     /**
      * Close the remote data cache
      */
@@ -427,11 +419,7 @@ public class RmtDataCache implements Closeable {
                 timer = null;
             }
             for (int i = this.waitCont.get() + 1; i > 0; i--) {
-                try {
-                    indexPartition.offer("------");
-                } catch (Throwable e) {
-                    //
-                }
+                releaseIdlePartition(-1, "------");
             }
         }
     }
@@ -663,6 +651,31 @@ public class RmtDataCache implements Closeable {
                 }
             }
         }
+        // add timeout expired check
+        if (!timeouts.isEmpty()) {
+            List<String> partKeys = new ArrayList<String>();
+            partKeys.addAll(timeouts.keySet());
+            Timeout timeout1 = null;
+            for (String keyId : partKeys) {
+                timeout1 = timeouts.get(keyId);
+                if (timeout1 != null && timeout1.isExpired()) {
+                    timeout1 = timeouts.remove(keyId);
+                    if (timeout1 != null) {
+                        PartitionExt partitionExt = partitionMap.get(keyId);
+                        if (partitionExt != null) {
+                            if (!indexPartition.contains(keyId)) {
+                                try {
+                                    indexPartition.offer(keyId);
+                                } catch (Throwable e) {
+                                    //
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+
+        }
     }
 
     private void waitPartitions(List<String> partitionKeys, long inUseWaitPeriodMs) {
@@ -787,9 +800,15 @@ public class RmtDataCache implements Closeable {
     public class TimeoutTask implements TimerTask {
 
         private String indexId;
+        private long createTime = 0L;
 
         public TimeoutTask(final String indexId) {
             this.indexId = indexId;
+            this.createTime = System.currentTimeMillis();
+        }
+
+        public long getCreateTime() {
+            return this.createTime;
         }
 
         @Override