You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/04/20 03:07:12 UTC

[rocketmq] branch 5.0.0-beta updated (62092e71d -> bd3059e76)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a change to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


 discard 62092e71d Minor fix
 discard 3d08eb2c1 Add thread-safety requirement and re-phrase PushConsumer class javadoc
     new bd3059e76 Add thread-safety requirement and re-phrase PushConsumer class javadoc

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (62092e71d)
            \
             N -- N -- N   refs/heads/5.0.0-beta (bd3059e76)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[rocketmq] 01/01: Add thread-safety requirement and re-phrase PushConsumer class javadoc

Posted by li...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit bd3059e76a00b0496ba0e2e433f6266f2ceb5683
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Wed Apr 20 10:59:24 2022 +0800

    Add thread-safety requirement and re-phrase PushConsumer class javadoc
---
 .../rocketmq/apis/consumer/MessageListener.java    |  9 ++++++--
 .../rocketmq/apis/consumer/PushConsumer.java       | 27 +++++++++++-----------
 2 files changed, 20 insertions(+), 16 deletions(-)

diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java
index 2042b96e4..a2aa96365 100644
--- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java
@@ -28,6 +28,11 @@ import org.apache.rocketmq.apis.message.MessageView;
  * desirable processing concurrency.
  *
  * <p>Refer to {@link PushConsumer} for more further specs.
+ *
+ * <p>
+ *     <strong>Thread Safety</strong>
+ *     This class may be called concurrently by multiple threads. Implementation should be thread safe.
+ * </p>
  */
 public interface MessageListener {
 
@@ -43,9 +48,9 @@ public interface MessageListener {
    * were raised, it will negatively acknowledge <code>message</code>, which
    * would potentially get re-delivered after the configured back off period.
    *
-   * @param message Message to process.
+   * @param message The message passed to the listener.
    * @return {@link ConsumeResult#SUCCESS} if <code>message</code> is properly processed; {@link ConsumeResult#FAILURE}
    *         otherwise.
    */
-  ConsumeResult consume(MessageView message);
+  ConsumeResult onMessage(MessageView message);
 }
diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java
index cb89ff4e6..dc59ff69b 100644
--- a/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java
@@ -24,25 +24,24 @@ import java.util.Map;
 
 
 /**
- * PushConsumer is a thread-safe rocketmq client which is used to consume message by group.
+ * PushConsumer is a managed client which delivers messages to application through {@link MessageListener}.
  *
- * <p>Push consumer is fully-managed consumer, if you are confused to choose your consumer, push consumer should be
- * your first consideration.
+ * <p>Consumers of the same group are designed to share messages from broker servers. As a result, consumers of the same
+ * group must have <strong>exactly identical subscription expressions</strong>, otherwise the behavior is undefined.
  *
- * <p>Consumers belong to the same consumer group share messages from server,
- * so consumer in the same group must have the same subscriptionExpressions, otherwise the behavior is
- * undefined. If a new consumer group's consumer is started first time, it consumes from the latest position. Once
- * consumer is started, server records its consumption progress and derives it in subsequent startup.
+ * <p>For a brand-new group, consumers consume messages from head of underlying queues, ignoring existing messages
+ * completely. In addition to delivering messages to clients, broker servers also maintain progress in perspective of
+ * group. Thus, consumers can safely restart and resume their progress automatically.</p>
  *
- * <p>You may intend to maintain different consumption progress for different consumer, different consumer group
- * should be set in this case.
+ * <p>There are scenarios where <a href="https://en.wikipedia.org/wiki/Fan-out_(software)">fan-out</a> is preferred,
+ * recommended solution is to use dedicated group of each client.
  *
- * <p>To accelerate the message consumption, push consumer applies
- * <a href="https://en.wikipedia.org/wiki/Reactive_Streams">reactive streams</a>
- * . Messages received from server is cached locally before consumption,
+ * <p>To mitigate latency, PushConsumer adopts
+ * <a href="https://en.wikipedia.org/wiki/Reactive_Streams">reactive streams</a> pattern. Namely,
+ * messages received from broker servers are first cached locally, amount of which is controlled by
  * {@link PushConsumerBuilder#setMaxCacheMessageCount(int)} and
- * {@link PushConsumerBuilder#setMaxCacheMessageSizeInBytes(int)} could be used to set the cache threshold in
- * different dimension.
+ * {@link PushConsumerBuilder#setMaxCacheMessageSizeInBytes(int)}, and then dispatched to thread pool to achieve
+ * desirable concurrency.
  */
 public interface PushConsumer extends Closeable {
     /**