You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/04/19 02:20:55 UTC

[rocketmq] branch 5.0.0-beta updated: [RIP-37] Add new APIs for consumer (#4019)

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-beta by this push:
     new df5e885b3 [RIP-37] Add new APIs for consumer (#4019)
df5e885b3 is described below

commit df5e885b3f1f0d9e5466d145ad736bc57b25c2a3
Author: Zhongliang.Chen <ch...@gmail.com>
AuthorDate: Tue Apr 19 10:20:47 2022 +0800

    [RIP-37] Add new APIs for consumer (#4019)
    
    * Add new APIs for consumer
    
    * Change APIs for consumer
    
    * Update PullConsumer API
    
    * Update PullConsumer API comments
    
    * Update pull consumer, add rebalance listener
    
    * remove pullConsumer
    
    * remove subscriptionExpression define
    
    * remove batch consume from PushConsumer
    
    * fix check style error
    
    Co-authored-by: 陈仲良 <al...@Alvin.local>
---
 .../rocketmq/apis/ClientServiceProvider.java       |  17 +++
 .../rocketmq/apis/consumer/ConsumeStatus.java      |  30 ++++
 .../rocketmq/apis/consumer/FilterExpression.java   |  56 ++++++++
 .../apis/consumer/FilterExpressionType.java        |  32 +++++
 .../rocketmq/apis/consumer/MessageListener.java    |  37 +++++
 .../rocketmq/apis/consumer/PushConsumer.java       |  96 +++++++++++++
 .../apis/consumer/PushConsumerBuilder.java         |  88 ++++++++++++
 .../rocketmq/apis/consumer/SimpleConsumer.java     | 155 +++++++++++++++++++++
 .../apis/consumer/SimpleConsumerBuilder.java       |  67 +++++++++
 9 files changed, 578 insertions(+)

diff --git a/apis/src/main/java/org/apache/rocketmq/apis/ClientServiceProvider.java b/apis/src/main/java/org/apache/rocketmq/apis/ClientServiceProvider.java
index 075f9ea9b..76d921303 100644
--- a/apis/src/main/java/org/apache/rocketmq/apis/ClientServiceProvider.java
+++ b/apis/src/main/java/org/apache/rocketmq/apis/ClientServiceProvider.java
@@ -19,6 +19,9 @@ package org.apache.rocketmq.apis;
 
 import java.util.Iterator;
 import java.util.ServiceLoader;
+
+import org.apache.rocketmq.apis.consumer.PushConsumerBuilder;
+import org.apache.rocketmq.apis.consumer.SimpleConsumerBuilder;
 import org.apache.rocketmq.apis.message.MessageBuilder;
 import org.apache.rocketmq.apis.producer.ProducerBuilder;
 
@@ -43,6 +46,20 @@ public interface ClientServiceProvider {
      */
     ProducerBuilder newProducerBuilder();
 
+    /**
+     * Get the simple consumer builder by current provider.
+     *
+     * @return the simple consumer builder instance.
+     */
+    SimpleConsumerBuilder newSimpleConsumerBuilder();
+
+    /**
+     * Get the push consumer builder by current provider.
+     *
+     * @return the push consumer builder instance.
+     */
+    PushConsumerBuilder newPushConsumerBuilder();
+
     /**
      * Get the message builder by current provider.
      *
diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/ConsumeStatus.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/ConsumeStatus.java
new file mode 100644
index 000000000..945832ff9
--- /dev/null
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/ConsumeStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.apis.consumer;
+
+public enum ConsumeStatus {
+    /**
+     * Consume message success and need commit this message.
+     */
+    SUCCESS,
+    /**
+     * Consume message failed and need reconsume later.
+     */
+    FAILED
+}
diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/FilterExpression.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/FilterExpression.java
new file mode 100644
index 000000000..b5cf27429
--- /dev/null
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/FilterExpression.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.apis.consumer;
+
+import java.util.Objects;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class FilterExpression {
+    public static final String TAG_EXPRESSION_SUB_ALL = "*";
+    public static final String TAG_EXPRESSION_SPLITTER = "\\|\\|";
+    private final String expression;
+    private final FilterExpressionType filterExpressionType;
+
+    public FilterExpression(String expression, FilterExpressionType filterExpressionType) {
+        this.expression = checkNotNull(expression, "expression should not be null");
+        this.filterExpressionType = checkNotNull(filterExpressionType, "filterExpressionType should not be null");
+    }
+
+    public String getExpression() {
+        return expression;
+    }
+
+    public FilterExpressionType getFilterExpressionType() {
+        return filterExpressionType;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        FilterExpression that = (FilterExpression) o;
+        return expression.equals(that.expression) && filterExpressionType == that.filterExpressionType;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(expression, filterExpressionType);
+    }
+}
diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/FilterExpressionType.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/FilterExpressionType.java
new file mode 100644
index 000000000..99bc48716
--- /dev/null
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/FilterExpressionType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.apis.consumer;
+
+public enum FilterExpressionType {
+    /**
+     * Follows SQL92 standard.
+     */
+    SQL92,
+    /**
+     * Only support or operation such as
+     * "tag1 || tag2 || tag3", <br>
+     * If null or * expression,meaning subscribe all.
+     */
+    TAG
+}
diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java
new file mode 100644
index 000000000..3bf17cce1
--- /dev/null
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/MessageListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.apis.consumer;
+
+import org.apache.rocketmq.apis.message.MessageView;
+
+/**
+ * MessageListener is used only for push consumer to process message consumption synchronously.
+ *
+ * <p> Refer to {@link PushConsumer}, push consumer will get message from server
+ * and dispatch the message to backend thread pool which control by parameter threadCount to consumer message concurrently.
+ */
+public interface MessageListener {
+    /**
+     * The callback interface for consume message. Your should process the messageView and return consumeStatus.
+     * Push consumer will commit the message to server when return SUCCESS or reconsume later when return FAILED.
+     * When consume method throw unexpected exception, this consumeStatus will be treated as FAILED.
+     * @param messageView is message which need consume.
+     * @return ConsumeStatus which defined in {@link ConsumeStatus}
+     */
+    ConsumeStatus consume(MessageView messageView);
+}
diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java
new file mode 100644
index 000000000..cb89ff4e6
--- /dev/null
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.apis.consumer;
+
+import org.apache.rocketmq.apis.exception.ClientException;
+
+import java.io.Closeable;
+import java.util.Map;
+
+
+/**
+ * PushConsumer is a thread-safe rocketmq client which is used to consume message by group.
+ *
+ * <p>Push consumer is fully-managed consumer, if you are confused to choose your consumer, push consumer should be
+ * your first consideration.
+ *
+ * <p>Consumers belong to the same consumer group share messages from server,
+ * so consumer in the same group must have the same subscriptionExpressions, otherwise the behavior is
+ * undefined. If a new consumer group's consumer is started first time, it consumes from the latest position. Once
+ * consumer is started, server records its consumption progress and derives it in subsequent startup.
+ *
+ * <p>You may intend to maintain different consumption progress for different consumer, different consumer group
+ * should be set in this case.
+ *
+ * <p>To accelerate the message consumption, push consumer applies
+ * <a href="https://en.wikipedia.org/wiki/Reactive_Streams">reactive streams</a>
+ * . Messages received from server is cached locally before consumption,
+ * {@link PushConsumerBuilder#setMaxCacheMessageCount(int)} and
+ * {@link PushConsumerBuilder#setMaxCacheMessageSizeInBytes(int)} could be used to set the cache threshold in
+ * different dimension.
+ */
+public interface PushConsumer extends Closeable {
+    /**
+     * Get the load balancing group for consumer.
+     *
+     * @return consumer load balancing group.
+     */
+    String getConsumerGroup();
+
+    /**
+     * List the existed subscription expressions in push consumer.
+     *
+     * @return map of topic to filter expression.
+     */
+    Map<String, FilterExpression> subscriptionExpressions();
+
+    /**
+     * Add subscription expression dynamically.
+     *
+     * <p>If first subscriptionExpression that contains topicA and tag1 is exists already in consumer, then
+     * second subscriptionExpression which contains topicA and tag2, <strong>the result is that the second one
+     * replaces the first one instead of integrating them</strong>.
+     *
+     * @param topic  new topic that need to add or update.
+     * @param filterExpression new filter expression to add or update.
+     * @return push consumer instance.
+     */
+    PushConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException;
+
+    /**
+     * Remove subscription expression dynamically by topic.
+     *
+     * <p>It stops the backend task to fetch message from remote, and besides that, the local cached message whose topic
+     * was removed before would not be delivered to {@link MessageListener} anymore.
+     *
+     * <p>Nothing occurs if the specified topic does not exist in subscription expressions of push consumer.
+     *
+     * @param topic the topic to remove subscription.
+     * @return push consumer instance.
+     */
+    PushConsumer unsubscribe(String topic) throws ClientException;
+
+    /**
+     * Close the push consumer and release all related resources.
+     *
+     * <p>Once push consumer is closed, <strong>it could not be started once again.</strong> we maintained an FSM
+     * (finite-state machine) to record the different states for each producer, which is similar to
+     */
+    @Override
+    void close();
+}
diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumerBuilder.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumerBuilder.java
new file mode 100644
index 000000000..166d48f8e
--- /dev/null
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/PushConsumerBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.apis.consumer;
+
+import org.apache.rocketmq.apis.ClientConfiguration;
+import org.apache.rocketmq.apis.exception.ClientException;
+
+import java.util.Map;
+
+public interface PushConsumerBuilder {
+    /**
+     * Set the client configuration for consumer.
+     *
+     * @param clientConfiguration client's configuration.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);
+
+    /**
+     * Set the load balancing group for consumer.
+     *
+     * @param consumerGroup consumer load balancing group.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setConsumerGroup(String consumerGroup);
+
+    /**
+     * Add subscriptionExpressions for consumer.
+     *
+     * @param subscriptionExpressions subscriptions to add which use the map of topic to filterExpression.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setSubscriptionExpressions(Map<String, FilterExpression> subscriptionExpressions);
+
+    /**
+     * Register message listener, all messages meet the subscription expression would across listener here.
+     *
+     * @param listener message listener.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setMessageListener(MessageListener listener);
+
+    /**
+     * Set the maximum number of messages cached locally.
+     *
+     * @param count message count.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setMaxCacheMessageCount(int count);
+
+    /**
+     * Set the maximum bytes of messages cached locally.
+     *
+     * @param bytes message size.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setMaxCacheMessageSizeInBytes(int bytes);
+
+    /**
+     * Set the consumption thread count in parallel.
+     *
+     * @param count thread count.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setThreadCount(int count);
+
+    /**
+     * Finalize the build of {@link PushConsumer}.
+     *
+     * @return the push consumer instance.
+     */
+    PushConsumer build() throws ClientException;
+}
diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/SimpleConsumer.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/SimpleConsumer.java
new file mode 100644
index 000000000..0fd5d51c1
--- /dev/null
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/SimpleConsumer.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.apis.consumer;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.rocketmq.apis.exception.ClientException;
+import org.apache.rocketmq.apis.message.MessageView;
+
+/**
+ * SimpleConsumer is a thread-safe rocketmq client which is used to consume message by group.
+ *
+ * <p>Simple consumer is lightweight consumer , if you want fully control the message consumption operation by yourself,
+ * simple consumer should be your first consideration.
+ *
+ * <p>Consumers belong to the same consumer group share messages from server,
+ * so consumer in the same group must have the same subscriptionExpressions, otherwise the behavior is
+ * undefined. If a new consumer group's consumer is started first time, it consumes from the latest position. Once
+ * consumer is started, server records its consumption progress and derives it in subsequent startup.
+ *
+ * <p>You may intend to maintain different consumption progress for different consumer, different consumer group
+ * should be set in this case.
+ *
+ * <p> Simple consumer divide message consumption to 3 parts.
+ * Firstly, call receive api get messages from server; Then process message by yourself; At last, your must call Ack api to commit this message.
+ * If there is error when process message ,your can reconsume the message later which control by the invisibleDuration parameter.
+ * Also, you can change the invisibleDuration by call changeInvisibleDuration api.
+ */
+public interface SimpleConsumer extends Closeable {
+    /**
+     * Get the load balancing group for simple consumer.
+     *
+     * @return consumer load balancing group.
+     */
+    String getConsumerGroup();
+
+    /**
+     * Add subscription expression dynamically.
+     *
+     * <p>If first subscriptionExpression that contains topicA and tag1 is exists already in consumer, then
+     * second subscriptionExpression which contains topicA and tag2, <strong>the result is that the second one
+     * replaces the first one instead of integrating them</strong>.
+     *
+     * @param topic  new topic that need to add or update.
+     * @param filterExpression new filter expression to add or update.
+     * @return simple consumer instance.
+     */
+    SimpleConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException;
+
+    /**
+     * Remove subscription expression dynamically by topic.
+     *
+     * <p>It stops the backend task to fetch message from remote, and besides that, the local cached message whose topic
+     * was removed before would not be delivered to {@link MessageListener} anymore.
+     *
+     * <p>Nothing occurs if the specified topic does not exist in subscription expressions of push consumer.
+     *
+     * @param topic the topic to remove subscription.
+     * @return simple consumer instance.
+     */
+    SimpleConsumer unsubscribe(String topic) throws ClientException;
+
+    /**
+     * List the existed subscription expressions in simple consumer.
+     *
+     * @return map of topic to filter expression.
+     */
+    Map<String, FilterExpression> subscriptionExpressions();
+
+    /**
+     * Fetch messages from server synchronously.
+     * <p> This method returns immediately if there are messages available.
+     * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned.
+     * @param maxMessageNum max message num when server returns.
+     * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout.
+     * @return list of messageView
+     */
+    List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException;
+
+    /**
+     * Fetch messages from server asynchronously.
+     * <p> This method returns immediately if there are messages available.
+     * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned.
+     * @param maxMessageNum max message num when server returns.
+     * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout.
+     * @return list of messageView
+     */
+    CompletableFuture<List<MessageView>> receiveAsync(int maxMessageNum, Duration invisibleDuration) throws ClientException;
+
+    /**
+     * Ack message to server synchronously, server commit this message.
+     *
+     * <p> Duplicate ack request does not take effect and throw exception.
+     * @param messageView special messageView with handle want to ack.
+     */
+    void ack(MessageView messageView) throws ClientException;
+
+    /**
+     * Ack message to server asynchronously, server commit this message.
+     *
+     * <p> Duplicate ack request does not take effect and throw exception.
+     * @param messageView special messageView with handle want to ack.
+     * @return CompletableFuture of this request.
+     */
+    CompletableFuture<Void> ackAsync(MessageView messageView);
+
+    /**
+     * Changes the invisible duration of a specified message synchronously.
+     *
+     * <p> The origin invisible duration for a message decide by ack request.
+     *
+     * <p>You must call change request before the origin invisible duration timeout.
+     * If called change request later than the origin invisible duration, this request does not take effect and throw exception.
+     * Duplicate change request will refresh the next visible time of this message to other consumers.
+     * @param messageView special messageView with handle want to change.
+     * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time.
+     */
+    void changeInvisibleDuration(MessageView messageView, Duration invisibleDuration) throws ClientException;
+
+    /**
+     * Changes the invisible duration of a specified message asynchronously.
+     *
+     * <p> The origin invisible duration for a message decide by ack request.
+     *
+     * <p> You must call change request before the origin invisible duration timeout.
+     * If called change request later than the origin invisible duration, this request does not take effect and throw exception.
+     * Duplicate change request will refresh the next visible time of this message to other consumers.
+     * @param messageView special messageView with handle want to change.
+     * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time.
+     * @return CompletableFuture of this request.
+     */
+    CompletableFuture<Void> changeInvisibleDurationAsync(MessageView messageView, Duration invisibleDuration);
+
+    @Override
+    void close();
+}
diff --git a/apis/src/main/java/org/apache/rocketmq/apis/consumer/SimpleConsumerBuilder.java b/apis/src/main/java/org/apache/rocketmq/apis/consumer/SimpleConsumerBuilder.java
new file mode 100644
index 000000000..e1ff6c76f
--- /dev/null
+++ b/apis/src/main/java/org/apache/rocketmq/apis/consumer/SimpleConsumerBuilder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.apis.consumer;
+
+import java.time.Duration;
+import java.util.Map;
+
+import org.apache.rocketmq.apis.ClientConfiguration;
+import org.apache.rocketmq.apis.exception.ClientException;
+
+public interface SimpleConsumerBuilder {
+    /**
+     * Set the client configuration for simple consumer.
+     *
+     * @param clientConfiguration client's configuration.
+     * @return the simple consumer builder instance.
+     */
+    SimpleConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);
+
+    /**
+     * Set the load balancing group for simple consumer.
+     *
+     * @param consumerGroup consumer load balancing group.
+     * @return the consumer builder instance.
+     */
+    SimpleConsumerBuilder setConsumerGroup(String consumerGroup);
+
+    /**
+     * Add subscriptionExpressions for simple consumer.
+     *
+     * @param subscriptionExpressions subscriptions to add which use the map of topic to filterExpression.
+     * @return the consumer builder instance.
+     */
+    SimpleConsumerBuilder setSubscriptionExpressions(Map<String, FilterExpression> subscriptionExpressions);
+
+    /**
+     * Set the max await time when receive message from server.
+     * The simple consumer will hold this long-polling receive requests until  a message is returned or a timeout occurs.
+     * @param awaitDuration The maximum time to block when no message available.
+     * @return the consumer builder instance.
+     */
+    SimpleConsumerBuilder setAwaitDuration(Duration awaitDuration);
+
+    /**
+     * Finalize the build of the {@link SimpleConsumer} instance and start.
+     *
+     * <p>This method will block until simple consumer starts successfully.
+     *
+     * @return the simple consumer instance.
+     */
+    SimpleConsumer build() throws ClientException;
+}