You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/04/20 02:59:35 UTC
[rocketmq] branch 5.0.0-beta updated: Add thread-safety requirement and re-phrase PushConsumer class javadoc
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta by this push:
new 3d08eb2c1 Add thread-safety requirement and re-phrase PushConsumer class javadoc
3d08eb2c1 is described below
commit 3d08eb2c1d338935b168f9b8ad6add2902c3d3ef
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Wed Apr 20 10:59:24 2022 +0800
Add thread-safety requirement and re-phrase PushConsumer class javadoc
---
.../rocketmq/apis/consumer/MessageListener.java | 9 +++++--
.../rocketmq/apis/consumer/PushConsumer.java | 28 +++++++++++-----------
2 files changed, 21 insertions(+), 16 deletions(-)
diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java
index 2042b96e4..a2aa96365 100644
--- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java
@@ -28,6 +28,11 @@ import org.apache.rocketmq.apis.message.MessageView;
* desirable processing concurrency.
*
* <p>Refer to {@link PushConsumer} for more further specs.
+ *
+ * <p>
+ * <strong>Thread Safety</strong>
+ * This class may be called concurrently by multiple threads. Implementation should be thread safe.
+ * </p>
*/
public interface MessageListener {
@@ -43,9 +48,9 @@ public interface MessageListener {
* were raised, it will negatively acknowledge <code>message</code>, which
* would potentially get re-delivered after the configured back off period.
*
- * @param message Message to process.
+ * @param message The message passed to the listener.
* @return {@link ConsumeResult#SUCCESS} if <code>message</code> is properly processed; {@link ConsumeResult#FAILURE}
* otherwise.
*/
- ConsumeResult consume(MessageView message);
+ ConsumeResult onMessage(MessageView message);
}
diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java
index cb89ff4e6..de8da0b38 100644
--- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java
@@ -24,25 +24,25 @@ import java.util.Map;
/**
- * PushConsumer is a thread-safe rocketmq client which is used to consume message by group.
+ * PushConsumer is a managed client which delivers messages to application through {@link MessageListener}.
*
- * <p>Push consumer is fully-managed consumer, if you are confused to choose your consumer, push consumer should be
- * your first consideration.
+ * <p>Consumers of the same group are designed to share messages from server. As a result, consumers of the same group
+ * must have <strong></strong>exactly identical subscription expressions</strong>, otherwise the behavior is undefined.
*
- * <p>Consumers belong to the same consumer group share messages from server,
- * so consumer in the same group must have the same subscriptionExpressions, otherwise the behavior is
- * undefined. If a new consumer group's consumer is started first time, it consumes from the latest position. Once
- * consumer is started, server records its consumption progress and derives it in subsequent startup.
*
- * <p>You may intend to maintain different consumption progress for different consumer, different consumer group
- * should be set in this case.
+ * <p>For a brand-new group, consumers consume messages from head of underlying queues, ignoring existing messages
+ * completely. In addition to delivering messages to clients, broker servers also maintain progress in perspective of
+ * group. Thus, consumers can safely restart and resume their progress automatically.</p>
*
- * <p>To accelerate the message consumption, push consumer applies
- * <a href="https://en.wikipedia.org/wiki/Reactive_Streams">reactive streams</a>
- * . Messages received from server is cached locally before consumption,
+ * <p>There are scenarios where <a href="https://en.wikipedia.org/wiki/Fan-out_(software)">fan-out</a> is preferred,
+ * recommended solution is to use dedicated group of each client.
+ *
+ * <p>To mitigate latency, PushConsumer adopts
+ * <a href="https://en.wikipedia.org/wiki/Reactive_Streams">reactive streams</a> pattern. Namely,
+ * messages received from server is first cached locally, amount of which is controlled by
* {@link PushConsumerBuilder#setMaxCacheMessageCount(int)} and
- * {@link PushConsumerBuilder#setMaxCacheMessageSizeInBytes(int)} could be used to set the cache threshold in
- * different dimension.
+ * {@link PushConsumerBuilder#setMaxCacheMessageSizeInBytes(int)}, and then dispatched to thread pool to achieve
+ * desirable concurrency.
*/
public interface PushConsumer extends Closeable {
/**