You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/12/01 02:28:36 UTC
[incubator-tubemq] branch TUBEMQ-430 updated: [TUBEMQ-419]
SetMaxPartCheckPeriodMs() negative number,
getMessage() still lock the thread[addendum] (#332)
This is an automated email from the ASF dual-hosted git repository.
lamberliu pushed a commit to branch TUBEMQ-430
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-430 by this push:
new c70b6ea [TUBEMQ-419] SetMaxPartCheckPeriodMs() negative number, getMessage() still lock the thread[addendum] (#332)
c70b6ea is described below
commit c70b6eaba2beeaac8694b5f52ba5ffb3baf9e06f
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Dec 1 10:28:28 2020 +0800
[TUBEMQ-419] SetMaxPartCheckPeriodMs() negative number, getMessage() still lock the thread[addendum] (#332)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../java/org/apache/tubemq/client/config/ConsumerConfig.java | 12 ++++++++++++
.../apache/tubemq/client/consumer/PullMessageConsumer.java | 7 +++++++
2 files changed, 19 insertions(+)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
index d8b63fb..184da79 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
@@ -124,6 +124,18 @@ public class ConsumerConfig extends TubeClientConfig {
return pullConsumeReadyWaitPeriodMs;
}
+ // setPullConsumeReadyWaitPeriodMs() use note:
+ // The value range is [negative value, 0, positive value] and the value directly determines
+ // the behavior of the PullMessageConsumer.GetMessage() function:
+ // 1. if it is set to a negative value, it means that the GetMessage() calling thread will
+ // be blocked forever and will not return until the consumption conditions are met;
+ // 2. if If it is set to 0, it means that the GetMessage() calling thread will only block
+ // the ConsumerConfig.getPullConsumeReadyChkSliceMs() interval when the consumption
+ // conditions are not met and then return;
+ // 3. if it is set to a positive number, it will not meet the current user usage (including
+ // unused partitions or allocated partitions, but these partitions do not meet the usage
+ // conditions), the GetMessage() calling thread will be blocked until the total time of
+ // ConsumerConfig.getPullConsumeReadyWaitPeriodMs expires
public void setPullConsumeReadyWaitPeriodMs(long pullConsumeReadyWaitPeriodMs) {
this.pullConsumeReadyWaitPeriodMs = pullConsumeReadyWaitPeriodMs;
}
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java
index af5d50f..d9c3baf 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/PullMessageConsumer.java
@@ -28,6 +28,13 @@ public interface PullMessageConsumer extends MessageConsumer {
PullMessageConsumer subscribe(String topic,
TreeSet<String> filterConds) throws TubeClientException;
+ // getMessage() use note:
+ // This getMessage have a blocking situation: when the current
+ // consumer consumption situation is not satisfied (including
+ // without partitions to consumption, or allocated partitions but
+ // the partitions do not meet the consumption situation),
+ // the call will sleep at intervals of ConsumerConfig.getPullConsumeReadyChkSliceMs(),
+ // until the total time of ConsumerConfig.getPullConsumeReadyWaitPeriodMs
ConsumerResult getMessage() throws TubeClientException;
ConsumerResult confirmConsume(final String confirmContext,