You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/03/28 11:56:28 UTC

[GitHub] [rocketmq] ni-ze commented on a change in pull request #4019: [RIP-37] Add new APIs for consumer

ni-ze commented on a change in pull request #4019:
URL: https://github.com/apache/rocketmq/pull/4019#discussion_r836347205



##########
File path: apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.apis.consumer;
+
+import com.google.common.util.concurrent.Service;
+import java.io.Closeable;
+import java.util.Collection;
+
+import org.apache.rocketmq.apis.exception.*;
+
+/**
+ * PushConsumer is a thread-safe rocketmq client which is used to consume message by group.
+ *
+ * <p>Push consumer is fully-managed consumer, if you are confused to choose your consumer, push consumer should be
+ * your first consideration.
+ *
+ * <p>Consumers belong to the same consumer group share messages from server,
+ * so consumer in the same group must have the same {@link SubscriptionExpression}s, otherwise the behavior is
+ * undefined. If a new consumer group's consumer is started first time, it consumes from the latest position. Once
+ * consumer is started, server records its consumption progress and derives it in subsequent startup.
+ *
+ * <p>You may intend to maintain different consumption progress for different consumer, different consumer group
+ * should be set in this case.
+ *
+ * <p>To accelerate the message consumption, push consumer applies
+ * <a href="https://en.wikipedia.org/wiki/Reactive_Streams">reactive streams</a>
+ * . Messages received from server is cached locally before consumption,
+ * {@link PushConsumerBuilder#setMaxCacheMessageCount(int)} and
+ * {@link PushConsumerBuilder#setMaxCacheMessageSizeInBytes(int)} could be used to set the cache threshold in
+ * different dimension.
+ */
+public interface PushConsumer extends Closeable {
+    /**
+     * Get the load balancing group for consumer.
+     *
+     * @return consumer load balancing group.
+     */
+    String getConsumerGroup();
+
+    /**
+     * Get the existed subscription expression in push consumer.
+     *
+     * @return collections of subscription expression.
+     */
+    Collection<SubscriptionExpression> listSubscriptionExpression();
+
+    /**
+     * Add subscription expression dynamically.
+     *
+     * <p>If first {@link SubscriptionExpression} that contains topicA and tag1 is exists already in consumer, then
+     * second {@link SubscriptionExpression} which contains topicA and tag2, <strong>the result is that the second one
+     * replaces the first one instead of integrating them</strong>.
+     *
+     * @param subscriptionExpression new subscription expression to add.
+     * @return push consumer instance.
+     */
+    PushConsumer subscribe(SubscriptionExpression subscriptionExpression) throws ClientException;
+
+    /**
+     * Remove subscription expression dynamically by topic.
+     *
+     * <p>It stops the backend task to fetch message from remote, and besides that, the local cached message whose topic
+     * was removed before would not be delivered to {@link MessageListener} anymore.
+     *
+     * <p>Nothing occurs if the specified topic does not exist in subscription expressions of push consumer.
+     *
+     * @param topic the topic to remove subscription.
+     * @return push consumer instance.
+     */
+    PushConsumer unsubscribe(String topic) throws ClientException;
+
+    /**
+     * Close the push consumer and release all related resources.
+     *
+     * <p>Once push consumer is closed, <strong>it could not be started once again.</strong> we maintained an FSM
+     * (finite-state machine) to record the different states for each producer, which is similar to
+     * {@link Service.State}.
+     */

Review comment:
       When we use subscribe mode, client will rebalance and poll messages from messageQueue into local buffer. MessageListener will be invoked after this relationship change, and start task to pull message.
   On the other hand,  app wants to be notify once the queue it consumes is determined, init it use messageQueue, messageListener will be used in this scene.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org