You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/04/20 02:59:35 UTC

[rocketmq] branch 5.0.0-beta updated: Add thread-safety requirement and re-phrase PushConsumer class javadoc

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta by this push:
     new 3d08eb2c1 Add thread-safety requirement and re-phrase PushConsumer class javadoc
3d08eb2c1 is described below

commit 3d08eb2c1d338935b168f9b8ad6add2902c3d3ef
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Wed Apr 20 10:59:24 2022 +0800

    Add thread-safety requirement and re-phrase PushConsumer class javadoc
---
 .../rocketmq/apis/consumer/MessageListener.java    |  9 +++++--
 .../rocketmq/apis/consumer/PushConsumer.java       | 28 +++++++++++-----------
 2 files changed, 21 insertions(+), 16 deletions(-)

diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java
index 2042b96e4..a2aa96365 100644
--- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java
@@ -28,6 +28,11 @@ import org.apache.rocketmq.apis.message.MessageView;
  * desirable processing concurrency.
  *
  * <p>Refer to {@link PushConsumer} for more further specs.
+ *
+ * <p>
+ *     <strong>Thread Safety</strong>
+ *     This class may be called concurrently by multiple threads. Implementation should be thread safe.
+ * </p>
  */
 public interface MessageListener {
 
@@ -43,9 +48,9 @@ public interface MessageListener {
    * were raised, it will negatively acknowledge <code>message</code>, which
    * would potentially get re-delivered after the configured back off period.
    *
-   * @param message Message to process.
+   * @param message The message passed to the listener.
    * @return {@link ConsumeResult#SUCCESS} if <code>message</code> is properly processed; {@link ConsumeResult#FAILURE}
    *         otherwise.
    */
-  ConsumeResult consume(MessageView message);
+  ConsumeResult onMessage(MessageView message);
 }
diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java
index cb89ff4e6..de8da0b38 100644
--- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java
@@ -24,25 +24,25 @@ import java.util.Map;
 
 
 /**
- * PushConsumer is a thread-safe rocketmq client which is used to consume message by group.
+ * PushConsumer is a managed client which delivers messages to application through {@link MessageListener}.
  *
- * <p>Push consumer is fully-managed consumer, if you are confused to choose your consumer, push consumer should be
- * your first consideration.
+ * <p>Consumers of the same group are designed to share messages from server. As a result, consumers of the same group
+ * must have <strong></strong>exactly identical subscription expressions</strong>, otherwise the behavior is undefined.
  *
- * <p>Consumers belong to the same consumer group share messages from server,
- * so consumer in the same group must have the same subscriptionExpressions, otherwise the behavior is
- * undefined. If a new consumer group's consumer is started first time, it consumes from the latest position. Once
- * consumer is started, server records its consumption progress and derives it in subsequent startup.
  *
- * <p>You may intend to maintain different consumption progress for different consumer, different consumer group
- * should be set in this case.
+ * <p>For a brand-new group, consumers consume messages from head of underlying queues, ignoring existing messages
+ * completely. In addition to delivering messages to clients, broker servers also maintain progress in perspective of
+ * group. Thus, consumers can safely restart and resume their progress automatically.</p>
  *
- * <p>To accelerate the message consumption, push consumer applies
- * <a href="https://en.wikipedia.org/wiki/Reactive_Streams">reactive streams</a>
- * . Messages received from server is cached locally before consumption,
+ * <p>There are scenarios where <a href="https://en.wikipedia.org/wiki/Fan-out_(software)">fan-out</a> is preferred,
+ * recommended solution is to use dedicated group of each client.
+ *
+ * <p>To mitigate latency, PushConsumer adopts
+ * <a href="https://en.wikipedia.org/wiki/Reactive_Streams">reactive streams</a> pattern. Namely,
+ * messages received from server is first cached locally, amount of which is controlled by
  * {@link PushConsumerBuilder#setMaxCacheMessageCount(int)} and
- * {@link PushConsumerBuilder#setMaxCacheMessageSizeInBytes(int)} could be used to set the cache threshold in
- * different dimension.
+ * {@link PushConsumerBuilder#setMaxCacheMessageSizeInBytes(int)}, and then dispatched to thread pool to achieve
+ * desirable concurrency.
  */
 public interface PushConsumer extends Closeable {
     /**