You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/06/17 21:23:22 UTC

[rocketmq-clients] 02/02: [RIP-37] Add new APIs for consumer

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit f57efb82c2a7d960701ffd31746923d41738b865
Author: Zhongliang.Chen <ch...@gmail.com>
AuthorDate: Sat Jun 18 05:16:35 2022 +0800

    [RIP-37] Add new APIs for consumer
---
 .../main/java/client/apis/ClientConfiguration.java |  61 ++++++++
 .../client/apis/ClientConfigurationBuilder.java    |  72 ++++++++++
 .../java/client/apis/ClientServiceProvider.java    |  68 +++++++++
 .../src/main/java/client/apis/MessageQueue.java    |  24 ++++
 .../main/java/client/apis/SessionCredentials.java  |  58 ++++++++
 .../client/apis/SessionCredentialsProvider.java    |  30 ++++
 .../apis/StaticSessionCredentialsProvider.java     |  35 +++++
 .../java/client/apis/consumer/ConsumeResult.java   |  31 +++++
 .../client/apis/consumer/FilterExpression.java     |  56 ++++++++
 .../client/apis/consumer/FilterExpressionType.java |  32 +++++
 .../java/client/apis/consumer/MessageListener.java |  54 ++++++++
 .../java/client/apis/consumer/PushConsumer.java    |  94 +++++++++++++
 .../client/apis/consumer/PushConsumerBuilder.java  |  87 ++++++++++++
 .../java/client/apis/consumer/SimpleConsumer.java  | 154 +++++++++++++++++++++
 .../apis/consumer/SimpleConsumerBuilder.java       |  66 +++++++++
 .../apis/exception/AuthenticationException.java    |  31 +++++
 .../apis/exception/AuthorisationException.java     |  35 +++++
 .../client/apis/exception/ClientException.java     |  72 ++++++++++
 .../main/java/client/apis/exception/ErrorCode.java |  84 +++++++++++
 .../apis/exception/FlowControlException.java       |  25 ++++
 .../client/apis/exception/NetworkException.java    |  28 ++++
 .../exception/RemoteIllegalArgumentException.java  |  25 ++++
 .../apis/exception/ResourceNotFoundException.java  |  25 ++++
 .../apis/exception/ResourceNotMatchException.java  |  24 ++++
 .../client/apis/exception/TimeoutException.java    |  28 ++++
 .../src/main/java/client/apis/message/Message.java |  77 +++++++++++
 .../java/client/apis/message/MessageBuilder.java   |  91 ++++++++++++
 .../main/java/client/apis/message/MessageId.java   |  38 +++++
 .../java/client/apis/message/MessageIdVersion.java |  29 ++++
 .../main/java/client/apis/message/MessageView.java | 121 ++++++++++++++++
 .../main/java/client/apis/producer/Producer.java   | 100 +++++++++++++
 .../java/client/apis/producer/ProducerBuilder.java |  82 +++++++++++
 .../java/client/apis/producer/SendReceipt.java     |  29 ++++
 .../java/client/apis/producer/Transaction.java     |  47 +++++++
 .../client/apis/producer/TransactionChecker.java   |  41 ++++++
 .../apis/producer/TransactionResolution.java       |  34 +++++
 .../apis/retry/BackOffRetryPolicyBuilder.java      |  59 ++++++++
 .../java/client/apis/retry/BackoffRetryPolicy.java |  86 ++++++++++++
 .../main/java/client/apis/retry/RetryPolicy.java   |  40 ++++++
 java/{ => client-java}/pom.xml                     |  13 +-
 java/pom.xml                                       |   3 +-
 41 files changed, 2181 insertions(+), 8 deletions(-)

diff --git a/java/client-apis/src/main/java/client/apis/ClientConfiguration.java b/java/client-apis/src/main/java/client/apis/ClientConfiguration.java
new file mode 100644
index 0000000..56a1b56
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/ClientConfiguration.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+import java.time.Duration;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Common client configuration.
+ */
+public class ClientConfiguration {
+    private final String endpoints;
+    private final SessionCredentialsProvider sessionCredentialsProvider;
+    private final Duration requestTimeout;
+    private final boolean enableTracing;
+
+    public static ClientConfigurationBuilder newBuilder() {
+        return new ClientConfigurationBuilder();
+    }
+
+    public ClientConfiguration(String endpoints, SessionCredentialsProvider sessionCredentialsProvider,
+        Duration requestTimeout, boolean enableTracing) {
+        this.endpoints = checkNotNull(endpoints, "endpoints should not be null");
+        this.sessionCredentialsProvider = checkNotNull(sessionCredentialsProvider, "credentialsProvider should not be"
+            + " null");
+        this.requestTimeout = checkNotNull(requestTimeout, "requestTimeout should be not null");
+        this.enableTracing = enableTracing;
+    }
+
+    public String getEndpoints() {
+        return endpoints;
+    }
+
+    public SessionCredentialsProvider getCredentialsProvider() {
+        return sessionCredentialsProvider;
+    }
+
+    public Duration getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public boolean isEnableTracing() {
+        return enableTracing;
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/ClientConfigurationBuilder.java b/java/client-apis/src/main/java/client/apis/ClientConfigurationBuilder.java
new file mode 100644
index 0000000..94999bd
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/ClientConfigurationBuilder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+import java.time.Duration;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Builder to set {@link ClientConfiguration}.
+ */
+public class ClientConfigurationBuilder {
+    private String endpoints;
+    private SessionCredentialsProvider sessionCredentialsProvider;
+    private Duration requestTimeout;
+    private boolean enableTracing;
+
+    /**
+     * Configure the endpoints with which the SDK should communicate.
+     *
+     * <p>Endpoints here means address of service, complying with the following scheme(part square brackets is
+     * optional).
+     * <p>1. DNS scheme(default): dns:host[:port], host is the host to resolve via DNS, port is the port to return
+     * for each address. If not specified, 443 is used.
+     * <p>2. ipv4 scheme: ipv4:address:port[,address:port,...]
+     * <p>3. ipv6 scheme: ipv6:address:port[,address:port,...]
+     * <p>4. http/https scheme: http|https://host[:port], similar to DNS scheme, if port not specified, 443 is used.
+     *
+     * @param endpoints address of service.
+     * @return the client configuration builder instance.
+     */
+    public ClientConfigurationBuilder setEndpoints(String endpoints) {
+        checkNotNull(endpoints, "endpoints should not be not null");
+        this.endpoints = endpoints;
+        return this;
+    }
+
+    public ClientConfigurationBuilder setCredentialProvider(SessionCredentialsProvider sessionCredentialsProvider) {
+        this.sessionCredentialsProvider = checkNotNull(sessionCredentialsProvider, "credentialsProvider should not be "
+            + "null");
+        return this;
+    }
+
+    public ClientConfigurationBuilder setRequestTimeout(Duration requestTimeout) {
+        this.requestTimeout = checkNotNull(requestTimeout, "requestTimeout should not be not null");
+        return this;
+    }
+
+    public ClientConfigurationBuilder enableTracing(boolean enableTracing) {
+        this.enableTracing = enableTracing;
+        return this;
+    }
+
+    public ClientConfiguration build() {
+        return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, enableTracing);
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/ClientServiceProvider.java b/java/client-apis/src/main/java/client/apis/ClientServiceProvider.java
new file mode 100644
index 0000000..1ec7522
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/ClientServiceProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+import org.apache.rocketmq.apis.consumer.PushConsumerBuilder;
+import org.apache.rocketmq.apis.consumer.SimpleConsumerBuilder;
+import org.apache.rocketmq.apis.message.MessageBuilder;
+import org.apache.rocketmq.apis.producer.ProducerBuilder;
+
+/**
+ * Service provider to seek client, which load client according to
+ * <a href="https://en.wikipedia.org/wiki/Service_provider_interface">Java SPI mechanism</a>.
+ */
+public interface ClientServiceProvider {
+    static ClientServiceProvider loadService() {
+        final ServiceLoader<ClientServiceProvider> loaders = ServiceLoader.load(ClientServiceProvider.class);
+        final Iterator<ClientServiceProvider> iterators = loaders.iterator();
+        if (iterators.hasNext()) {
+            return iterators.next();
+        }
+        throw new UnsupportedOperationException("Client service provider not found");
+    }
+
+    /**
+     * Get the producer builder by current provider.
+     *
+     * @return the producer builder instance.
+     */
+    ProducerBuilder newProducerBuilder();
+
+    /**
+     * Get the simple consumer builder by current provider.
+     *
+     * @return the simple consumer builder instance.
+     */
+    SimpleConsumerBuilder newSimpleConsumerBuilder();
+
+    /**
+     * Get the push consumer builder by current provider.
+     *
+     * @return the push consumer builder instance.
+     */
+    PushConsumerBuilder newPushConsumerBuilder();
+
+    /**
+     * Get the message builder by current provider.
+     *
+     * @return the message builder instance.
+     */
+    MessageBuilder newMessageBuilder();
+}
diff --git a/java/client-apis/src/main/java/client/apis/MessageQueue.java b/java/client-apis/src/main/java/client/apis/MessageQueue.java
new file mode 100644
index 0000000..8c722e2
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/MessageQueue.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+public interface MessageQueue {
+    String getTopic();
+
+    String getId();
+}
\ No newline at end of file
diff --git a/java/client-apis/src/main/java/client/apis/SessionCredentials.java b/java/client-apis/src/main/java/client/apis/SessionCredentials.java
new file mode 100644
index 0000000..adef427
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/SessionCredentials.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Session credentials used in service authentications.
+ */
+public class SessionCredentials {
+    private final String accessKey;
+    private final String accessSecret;
+    private final String securityToken;
+
+    public SessionCredentials(String accessKey, String accessSecret, String securityToken) {
+        this.accessKey = checkNotNull(accessKey, "accessKey should not be null");
+        this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null");
+        this.securityToken = checkNotNull(securityToken, "securityToken should not be null");
+    }
+
+    public SessionCredentials(String accessKey, String accessSecret) {
+        this.accessKey = checkNotNull(accessKey, "accessKey should not be null");
+        this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null");
+        this.securityToken = null;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public String getAccessSecret() {
+        return accessSecret;
+    }
+
+    public Optional<String> getSecurityToken() {
+        if (null == securityToken) {
+            return Optional.empty();
+        }
+        return Optional.of(securityToken);
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/SessionCredentialsProvider.java b/java/client-apis/src/main/java/client/apis/SessionCredentialsProvider.java
new file mode 100644
index 0000000..52e6912
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/SessionCredentialsProvider.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+/**
+ * Abstract provider to provide {@link SessionCredentials}.
+ */
+public interface SessionCredentialsProvider {
+    /**
+     * Get the provided credentials.
+     *
+     * @return provided credentials.
+     */
+    SessionCredentials getCredentials();
+}
diff --git a/java/client-apis/src/main/java/client/apis/StaticSessionCredentialsProvider.java b/java/client-apis/src/main/java/client/apis/StaticSessionCredentialsProvider.java
new file mode 100644
index 0000000..5c0d46c
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/StaticSessionCredentialsProvider.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+public class StaticSessionCredentialsProvider implements SessionCredentialsProvider {
+    private final SessionCredentials credentials;
+
+    public StaticSessionCredentialsProvider(String accessKey, String accessSecret) {
+        this.credentials = new SessionCredentials(accessKey, accessSecret);
+    }
+
+    public StaticSessionCredentialsProvider(String accessKey, String accessSecret, String securityToken) {
+        this.credentials = new SessionCredentials(accessKey, accessSecret, securityToken);
+    }
+
+    @Override
+    public SessionCredentials getCredentials() {
+        return credentials;
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/ConsumeResult.java b/java/client-apis/src/main/java/client/apis/consumer/ConsumeResult.java
new file mode 100644
index 0000000..b62939b
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/ConsumeResult.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package client.apis.consumer;
+
+public enum ConsumeResult {
+    /**
+     * Consume message success and need commit this message.
+     */
+    SUCCESS,
+
+    /**
+     * Failed to consume the message, expecting potential delivery after configured backoff.
+     */
+    FAILURE
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/FilterExpression.java b/java/client-apis/src/main/java/client/apis/consumer/FilterExpression.java
new file mode 100644
index 0000000..7f76ba7
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/FilterExpression.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package client.apis.consumer;
+
+import java.util.Objects;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class FilterExpression {
+    public static final String TAG_EXPRESSION_SUB_ALL = "*";
+    public static final String TAG_EXPRESSION_SPLITTER = "\\|\\|";
+    private final String expression;
+    private final FilterExpressionType filterExpressionType;
+
+    public FilterExpression(String expression, FilterExpressionType filterExpressionType) {
+        this.expression = checkNotNull(expression, "expression should not be null");
+        this.filterExpressionType = checkNotNull(filterExpressionType, "filterExpressionType should not be null");
+    }
+
+    public String getExpression() {
+        return expression;
+    }
+
+    public FilterExpressionType getFilterExpressionType() {
+        return filterExpressionType;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        FilterExpression that = (FilterExpression) o;
+        return expression.equals(that.expression) && filterExpressionType == that.filterExpressionType;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(expression, filterExpressionType);
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/FilterExpressionType.java b/java/client-apis/src/main/java/client/apis/consumer/FilterExpressionType.java
new file mode 100644
index 0000000..1284bfd
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/FilterExpressionType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package client.apis.consumer;
+
+public enum FilterExpressionType {
+    /**
+     * Follows SQL92 standard.
+     */
+    SQL92,
+    /**
+     * Only support or operation such as
+     * "tag1 || tag2 || tag3", <br>
+     * If null or * expression,meaning subscribe all.
+     */
+    TAG
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/MessageListener.java b/java/client-apis/src/main/java/client/apis/consumer/MessageListener.java
new file mode 100644
index 0000000..2e74a4d
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/MessageListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.consumer;
+
+import org.apache.rocketmq.apis.message.MessageView;
+
+/**
+ * <p>MessageListener is used only by PushConsumer to process messages
+ * synchronously.
+ *
+ * <p>PushConsumer will fetch messages from brokers and dispatch them to an
+ * embedded thread pool in form of <code>Runnable</code> tasks to achieve desirable processing concurrency.
+ *
+ * <p>Refer to {@link PushConsumer} for more further specs.
+ *
+ * <p>
+ * <strong>Thread Safety</strong>
+ * This class may be called concurrently by multiple threads. Implementation should be thread safe.
+ * </p>
+ */
+public interface MessageListener {
+
+    /**
+     * Callback interface to handle incoming messages.
+     *
+     * Application developers are expected to implement this interface to fulfill business requirements through
+     * processing <code>message</code> and return
+     * <code>ConsumeResult</code> accordingly.
+     *
+     * PushConsumer will, on behalf of its group, acknowledge the message to broker on success; In case of failure or
+     * unexpected exceptions were raised, it will negatively acknowledge <code>message</code>, which would potentially
+     * get re-delivered after the configured back off period.
+     *
+     * @param message The message passed to the listener.
+     * @return {@link ConsumeResult#SUCCESS} if <code>message</code> is properly processed; {@link
+     * ConsumeResult#FAILURE} otherwise.
+     */
+    ConsumeResult onMessage(MessageView message);
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/PushConsumer.java b/java/client-apis/src/main/java/client/apis/consumer/PushConsumer.java
new file mode 100644
index 0000000..6b7c1a4
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/PushConsumer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.consumer;
+
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.rocketmq.apis.exception.ClientException;
+
+
+/**
+ * PushConsumer is a managed client which delivers messages to application through {@link MessageListener}.
+ *
+ * <p>Consumers of the same group are designed to share messages from broker servers. As a result, consumers of the same
+ * group must have <strong>exactly identical subscription expressions</strong>, otherwise the behavior is undefined.
+ *
+ * <p>For a brand-new group, consumers consume messages from head of underlying queues, ignoring existing messages
+ * completely. In addition to delivering messages to clients, broker servers also maintain progress in perspective of
+ * group. Thus, consumers can safely restart and resume their progress automatically.</p>
+ *
+ * <p>There are scenarios where <a href="https://en.wikipedia.org/wiki/Fan-out_(software)">fan-out</a> is preferred,
+ * recommended solution is to use dedicated group of each client.
+ *
+ * <p>To mitigate latency, PushConsumer adopts
+ * <a href="https://en.wikipedia.org/wiki/Reactive_Streams">reactive streams</a> pattern. Namely,
+ * messages received from broker servers are first cached locally, amount of which is controlled by
+ * {@link PushConsumerBuilder#setMaxCacheMessageCount(int)} and
+ * {@link PushConsumerBuilder#setMaxCacheMessageSizeInBytes(int)}, and then dispatched to thread pool to achieve
+ * desirable concurrency.
+ */
+public interface PushConsumer extends Closeable {
+    /**
+     * Get the load balancing group for consumer.
+     *
+     * @return consumer load balancing group.
+     */
+    String getConsumerGroup();
+
+    /**
+     * List the existed subscription expressions in push consumer.
+     *
+     * @return map of topic to filter expression.
+     */
+    Map<String, FilterExpression> subscriptionExpressions();
+
+    /**
+     * Add subscription expression dynamically.
+     *
+     * <p>If first subscriptionExpression that contains topicA and tag1 is exists already in consumer, then
+     * second subscriptionExpression which contains topicA and tag2, <strong>the result is that the second one
+     * replaces the first one instead of integrating them</strong>.
+     *
+     * @param topic  new topic that need to add or update.
+     * @param filterExpression new filter expression to add or update.
+     * @return push consumer instance.
+     */
+    PushConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException;
+
+    /**
+     * Remove subscription expression dynamically by topic.
+     *
+     * <p>It stops the backend task to fetch message from remote, and besides that, the local cached message whose topic
+     * was removed before would not be delivered to {@link MessageListener} anymore.
+     *
+     * <p>Nothing occurs if the specified topic does not exist in subscription expressions of push consumer.
+     *
+     * @param topic the topic to remove subscription.
+     * @return push consumer instance.
+     */
+    PushConsumer unsubscribe(String topic) throws ClientException;
+
+    /**
+     * Close the push consumer and release all related resources.
+     *
+     * <p>Once push consumer is closed, <strong>it could not be started once again.</strong> we maintained an FSM
+     * (finite-state machine) to record the different states for each producer, which is similar to
+     */
+    @Override
+    void close();
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/PushConsumerBuilder.java b/java/client-apis/src/main/java/client/apis/consumer/PushConsumerBuilder.java
new file mode 100644
index 0000000..82bfd2f
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/PushConsumerBuilder.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.consumer;
+
+import java.util.Map;
+import org.apache.rocketmq.apis.ClientConfiguration;
+import org.apache.rocketmq.apis.exception.ClientException;
+
+public interface PushConsumerBuilder {
+    /**
+     * Set the client configuration for consumer.
+     *
+     * @param clientConfiguration client's configuration.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);
+
+    /**
+     * Set the load balancing group for consumer.
+     *
+     * @param consumerGroup consumer load balancing group.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setConsumerGroup(String consumerGroup);
+
+    /**
+     * Add subscriptionExpressions for consumer.
+     *
+     * @param subscriptionExpressions subscriptions to add which use the map of topic to filterExpression.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setSubscriptionExpressions(Map<String, FilterExpression> subscriptionExpressions);
+
+    /**
+     * Register message listener, all messages meet the subscription expression would across listener here.
+     *
+     * @param listener message listener.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setMessageListener(MessageListener listener);
+
+    /**
+     * Set the maximum number of messages cached locally.
+     *
+     * @param count message count.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setMaxCacheMessageCount(int count);
+
+    /**
+     * Set the maximum bytes of messages cached locally.
+     *
+     * @param bytes message size.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setMaxCacheMessageSizeInBytes(int bytes);
+
+    /**
+     * Set the consumption thread count in parallel.
+     *
+     * @param count thread count.
+     * @return the consumer builder instance.
+     */
+    PushConsumerBuilder setThreadCount(int count);
+
+    /**
+     * Finalize the build of {@link PushConsumer}.
+     *
+     * @return the push consumer instance.
+     */
+    PushConsumer build() throws ClientException;
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumer.java b/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumer.java
new file mode 100644
index 0000000..4bc3827
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumer.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.consumer;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.apis.exception.ClientException;
+import org.apache.rocketmq.apis.message.MessageView;
+
+/**
+ * SimpleConsumer is a thread-safe rocketmq client which is used to consume message by group.
+ *
+ * <p>Simple consumer is lightweight consumer , if you want fully control the message consumption operation by yourself,
+ * simple consumer should be your first consideration.
+ *
+ * <p>Consumers belong to the same consumer group share messages from server,
+ * so consumer in the same group must have the same subscriptionExpressions, otherwise the behavior is
+ * undefined. If a new consumer group's consumer is started first time, it consumes from the latest position. Once
+ * consumer is started, server records its consumption progress and derives it in subsequent startup.
+ *
+ * <p>You may intend to maintain different consumption progress for different consumer, different consumer group
+ * should be set in this case.
+ *
+ * <p> Simple consumer divide message consumption to 3 parts.
+ * Firstly, call receive api get messages from server; Then process message by yourself; At last, your must call Ack api to commit this message.
+ * If there is error when process message ,your can reconsume the message later which control by the invisibleDuration parameter.
+ * Also, you can change the invisibleDuration by call changeInvisibleDuration api.
+ */
+public interface SimpleConsumer extends Closeable {
+    /**
+     * Get the load balancing group for simple consumer.
+     *
+     * @return consumer load balancing group.
+     */
+    String getConsumerGroup();
+
+    /**
+     * Add subscription expression dynamically.
+     *
+     * <p>If first subscriptionExpression that contains topicA and tag1 is exists already in consumer, then
+     * second subscriptionExpression which contains topicA and tag2, <strong>the result is that the second one
+     * replaces the first one instead of integrating them</strong>.
+     *
+     * @param topic  new topic that need to add or update.
+     * @param filterExpression new filter expression to add or update.
+     * @return simple consumer instance.
+     */
+    SimpleConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException;
+
+    /**
+     * Remove subscription expression dynamically by topic.
+     *
+     * <p>It stops the backend task to fetch message from remote, and besides that, the local cached message whose topic
+     * was removed before would not be delivered to {@link MessageListener} anymore.
+     *
+     * <p>Nothing occurs if the specified topic does not exist in subscription expressions of push consumer.
+     *
+     * @param topic the topic to remove subscription.
+     * @return simple consumer instance.
+     */
+    SimpleConsumer unsubscribe(String topic) throws ClientException;
+
+    /**
+     * List the existed subscription expressions in simple consumer.
+     *
+     * @return map of topic to filter expression.
+     */
+    Map<String, FilterExpression> subscriptionExpressions();
+
+    /**
+     * Fetch messages from server synchronously.
+     * <p> This method returns immediately if there are messages available.
+     * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned.
+     * @param maxMessageNum max message num when server returns.
+     * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout.
+     * @return list of messageView
+     */
+    List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException;
+
+    /**
+     * Fetch messages from server asynchronously.
+     * <p> This method returns immediately if there are messages available.
+     * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned.
+     * @param maxMessageNum max message num when server returns.
+     * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout.
+     * @return list of messageView
+     */
+    CompletableFuture<List<MessageView>> receiveAsync(int maxMessageNum, Duration invisibleDuration) throws ClientException;
+
+    /**
+     * Ack message to server synchronously, server commit this message.
+     *
+     * <p> Duplicate ack request does not take effect and throw exception.
+     * @param messageView special messageView with handle want to ack.
+     */
+    void ack(MessageView messageView) throws ClientException;
+
+    /**
+     * Ack message to server asynchronously, server commit this message.
+     *
+     * <p> Duplicate ack request does not take effect and throw exception.
+     * @param messageView special messageView with handle want to ack.
+     * @return CompletableFuture of this request.
+     */
+    CompletableFuture<Void> ackAsync(MessageView messageView);
+
+    /**
+     * Changes the invisible duration of a specified message synchronously.
+     *
+     * <p> The origin invisible duration for a message decide by ack request.
+     *
+     * <p>You must call change request before the origin invisible duration timeout.
+     * If called change request later than the origin invisible duration, this request does not take effect and throw exception.
+     * Duplicate change request will refresh the next visible time of this message to other consumers.
+     * @param messageView special messageView with handle want to change.
+     * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time.
+     */
+    void changeInvisibleDuration(MessageView messageView, Duration invisibleDuration) throws ClientException;
+
+    /**
+     * Changes the invisible duration of a specified message asynchronously.
+     *
+     * <p> The origin invisible duration for a message decide by ack request.
+     *
+     * <p> You must call change request before the origin invisible duration timeout.
+     * If called change request later than the origin invisible duration, this request does not take effect and throw exception.
+     * Duplicate change request will refresh the next visible time of this message to other consumers.
+     * @param messageView special messageView with handle want to change.
+     * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time.
+     * @return CompletableFuture of this request.
+     */
+    CompletableFuture<Void> changeInvisibleDurationAsync(MessageView messageView, Duration invisibleDuration);
+
+    @Override
+    void close();
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumerBuilder.java b/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumerBuilder.java
new file mode 100644
index 0000000..0760b0e
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumerBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.consumer;
+
+import java.time.Duration;
+import java.util.Map;
+import org.apache.rocketmq.apis.ClientConfiguration;
+import org.apache.rocketmq.apis.exception.ClientException;
+
+public interface SimpleConsumerBuilder {
+    /**
+     * Set the client configuration for simple consumer.
+     *
+     * @param clientConfiguration client's configuration.
+     * @return the simple consumer builder instance.
+     */
+    SimpleConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);
+
+    /**
+     * Set the load balancing group for simple consumer.
+     *
+     * @param consumerGroup consumer load balancing group.
+     * @return the consumer builder instance.
+     */
+    SimpleConsumerBuilder setConsumerGroup(String consumerGroup);
+
+    /**
+     * Add subscriptionExpressions for simple consumer.
+     *
+     * @param subscriptionExpressions subscriptions to add which use the map of topic to filterExpression.
+     * @return the consumer builder instance.
+     */
+    SimpleConsumerBuilder setSubscriptionExpressions(Map<String, FilterExpression> subscriptionExpressions);
+
+    /**
+     * Set the max await time when receive message from server.
+     * The simple consumer will hold this long-polling receive requests until  a message is returned or a timeout occurs.
+     * @param awaitDuration The maximum time to block when no message available.
+     * @return the consumer builder instance.
+     */
+    SimpleConsumerBuilder setAwaitDuration(Duration awaitDuration);
+
+    /**
+     * Finalize the build of the {@link SimpleConsumer} instance and start.
+     *
+     * <p>This method will block until simple consumer starts successfully.
+     *
+     * @return the simple consumer instance.
+     */
+    SimpleConsumer build() throws ClientException;
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/AuthenticationException.java b/java/client-apis/src/main/java/client/apis/exception/AuthenticationException.java
new file mode 100644
index 0000000..bfe7444
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/AuthenticationException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+/**
+ * The difference between {@link AuthorisationException} and {@link AuthenticationException} is that
+ * {@link AuthenticationException} here means current user's identity could not be recognized.
+ *
+ * <p>For example, {@link AuthenticationException} will be thrown if access key is invalid.
+ */
+public class AuthenticationException extends ClientException {
+    public AuthenticationException(ErrorCode code, String message, String requestId) {
+        super(code, message);
+        putMetadata(REQUEST_ID_KEY, requestId);
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/AuthorisationException.java b/java/client-apis/src/main/java/client/apis/exception/AuthorisationException.java
new file mode 100644
index 0000000..cc3a93f
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/AuthorisationException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+import org.apache.rocketmq.apis.message.Message;
+import org.apache.rocketmq.apis.producer.Producer;
+
+/**
+ * The difference between {@link AuthenticationException} and {@link AuthorisationException} is that
+ * {@link AuthorisationException} here means current users don't have permission to do current operation.
+ *
+ * <p>For example, current user is forbidden to send message to this topic, {@link AuthorisationException} will be
+ * thrown in {@link Producer#send(Message)}.
+ */
+public class AuthorisationException extends ClientException {
+    public AuthorisationException(ErrorCode code, String message, String requestId) {
+        super(code, message);
+        putMetadata(REQUEST_ID_KEY, requestId);
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/ClientException.java b/java/client-apis/src/main/java/client/apis/exception/ClientException.java
new file mode 100644
index 0000000..018b4da
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/ClientException.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+import com.google.common.base.MoreObjects;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Base exception for all exception raised in client, each exception should derive from current class.
+ * It should throw exception which is derived from {@link ClientException} rather than {@link ClientException} itself.
+ */
+public abstract class ClientException extends Exception {
+    /**
+     * For those {@link ClientException} along with a remote procedure call, request id could be used to track the
+     * request.
+     */
+    protected static final String REQUEST_ID_KEY = "request-id";
+
+    private final ErrorCode errorCode;
+    private final Map<String, String> context;
+
+    ClientException(ErrorCode errorCode, String message, Throwable cause) {
+        super(message, cause);
+        this.errorCode = errorCode;
+        this.context = new HashMap<>();
+    }
+
+    ClientException(ErrorCode errorCode, String message) {
+        super(message);
+        this.errorCode = errorCode;
+        this.context = new HashMap<>();
+    }
+
+    @SuppressWarnings("SameParameterValue")
+    protected void putMetadata(String key, String value) {
+        context.put(key, value);
+    }
+
+    public Optional<String> getRequestId() {
+        final String requestId = context.get(REQUEST_ID_KEY);
+        return null == requestId ? Optional.empty() : Optional.of(requestId);
+    }
+
+    public ErrorCode getErrorCode() {
+        return errorCode;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(super.toString())
+            .add("errorCode", errorCode)
+            .add("context", context)
+            .toString();
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/ErrorCode.java b/java/client-apis/src/main/java/client/apis/exception/ErrorCode.java
new file mode 100644
index 0000000..1d2895e
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/ErrorCode.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+/**
+ * Indicates the reason why the exception is thrown, it can be easily divided into the following categories.
+ *
+ * <blockquote>
+ *
+ * <table>
+ * <caption>Error Categories and Exceptions</caption>
+ * <tr>
+ *     <th>Category
+ *     <th>Exception
+ *     <th>Code range
+ * <tr>
+ *     <td>Illegal client argument
+ *     <td>{@link RemoteIllegalArgumentException}
+ *     <p>{@link IllegalArgumentException}
+ *     <td>{@code [101..199]}
+ * <tr>
+ *     <td>Authorisation failure
+ *     <td>{@link AuthorisationException}
+ *     <td>{@code [201..299]}
+ * <tr>
+ *     <td>Resource not found
+ *     <td>{@link ResourceNotFoundException}
+ *     <td>{@code [301..399]}
+ * </table>
+ *
+ * </blockquote>
+ */
+public enum ErrorCode {
+    /**
+     * Format of topic is illegal.
+     */
+    INVALID_TOPIC(101),
+    /**
+     * Format of consumer group is illegal.
+     */
+    INVALID_CONSUMER_GROUP(102),
+    /**
+     * Message is forbidden to publish.
+     */
+    MESSAGE_PUBLISH_FORBIDDEN(201),
+    /**
+     * Topic does not exist.
+     */
+    TOPIC_DOES_NOT_EXIST(301),
+    /**
+     * Consumer group does not exist.
+     */
+    CONSUMER_GROUP_DOES_NOT_EXIST(302);
+
+    private final int code;
+
+    ErrorCode(int code) {
+        this.code = code;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    @Override
+    public String toString() {
+        return String.valueOf(code);
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/FlowControlException.java b/java/client-apis/src/main/java/client/apis/exception/FlowControlException.java
new file mode 100644
index 0000000..ed177c0
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/FlowControlException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+public class FlowControlException extends ClientException {
+    public FlowControlException(ErrorCode code, String message, String requestId) {
+        super(code, message);
+        putMetadata(REQUEST_ID_KEY, requestId);
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/NetworkException.java b/java/client-apis/src/main/java/client/apis/exception/NetworkException.java
new file mode 100644
index 0000000..aa5af7b
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/NetworkException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+public class NetworkException extends ClientException {
+    public NetworkException(ErrorCode code, String message, Throwable cause) {
+        super(code, message, cause);
+    }
+
+    public NetworkException(ErrorCode code, String message) {
+        super(code, message);
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/RemoteIllegalArgumentException.java b/java/client-apis/src/main/java/client/apis/exception/RemoteIllegalArgumentException.java
new file mode 100644
index 0000000..3d456d7
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/RemoteIllegalArgumentException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+public class RemoteIllegalArgumentException extends ClientException {
+    public RemoteIllegalArgumentException(ErrorCode code, String message, String requestId) {
+        super(code, message);
+        putMetadata(REQUEST_ID_KEY, requestId);
+    }
+}
\ No newline at end of file
diff --git a/java/client-apis/src/main/java/client/apis/exception/ResourceNotFoundException.java b/java/client-apis/src/main/java/client/apis/exception/ResourceNotFoundException.java
new file mode 100644
index 0000000..19d032a
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/ResourceNotFoundException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+public class ResourceNotFoundException extends ClientException {
+    public ResourceNotFoundException(ErrorCode code, String message, String requestId) {
+        super(code, message);
+        putMetadata(REQUEST_ID_KEY, requestId);
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/ResourceNotMatchException.java b/java/client-apis/src/main/java/client/apis/exception/ResourceNotMatchException.java
new file mode 100644
index 0000000..54aa66a
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/ResourceNotMatchException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+public class ResourceNotMatchException extends ClientException {
+    public ResourceNotMatchException(ErrorCode code, String message) {
+        super(code, message);
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/TimeoutException.java b/java/client-apis/src/main/java/client/apis/exception/TimeoutException.java
new file mode 100644
index 0000000..6c16b1a
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/TimeoutException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+public class TimeoutException extends ClientException {
+    public TimeoutException(ErrorCode code, String message, Throwable cause) {
+        super(code, message, cause);
+    }
+
+    public TimeoutException(ErrorCode code, String message) {
+        super(code, message);
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/message/Message.java b/java/client-apis/src/main/java/client/apis/message/Message.java
new file mode 100644
index 0000000..7504de9
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/message/Message.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.message;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.rocketmq.apis.producer.Producer;
+
+/**
+ * Abstract message only used for {@link Producer}.
+ */
+public interface Message {
+    /**
+     * Get the topic of message, which is the first classifier for message.
+     *
+     * @return topic of message.
+     */
+    String getTopic();
+
+    /**
+     * Get the <strong>deep copy</strong> of message body.
+     *
+     * @return the <strong>deep copy</strong> of message body.
+     */
+    byte[] getBody();
+
+    /**
+     * Get the <strong>deep copy</strong> of message properties.
+     *
+     * @return the <strong>deep copy</strong> of message properties.
+     */
+    Map<String, String> getProperties();
+
+    /**
+     * Get the tag of message, which is the second classifier besides topic.
+     *
+     * @return the tag of message.
+     */
+    Optional<String> getTag();
+
+    /**
+     * Get the key collection of message.
+     *
+     * @return <strong>the key collection</strong> of message.
+     */
+    Collection<String> getKeys();
+
+    /**
+     * Get the message group, which make sense only when topic type is fifo.
+     *
+     * @return message group, which is optional.
+     */
+    Optional<String> getMessageGroup();
+
+    /**
+     * Get the expected delivery timestamp, which make sense only when topic type is delay.
+     *
+     * @return message expected delivery timestamp, which is optional.
+     */
+    Optional<Long> getDeliveryTimestamp();
+}
diff --git a/java/client-apis/src/main/java/client/apis/message/MessageBuilder.java b/java/client-apis/src/main/java/client/apis/message/MessageBuilder.java
new file mode 100644
index 0000000..4db9805
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/message/MessageBuilder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.message;
+
+import java.util.Collection;
+
+/**
+ * Builder to config {@link Message}.
+ */
+public interface MessageBuilder {
+    /**
+     * Set the topic for message.
+     *
+     * @param topic topic for the message.
+     * @return the message builder instance.
+     */
+    MessageBuilder setTopic(String topic);
+
+    /**
+     * Set the body for message.
+     *
+     * @param body body for the message.
+     * @return the message builder instance.
+     */
+    MessageBuilder setBody(byte[] body);
+
+    /**
+     * Set the tag for message.
+     *
+     * @param tag tag for the message.
+     * @return the message builder instance.
+     */
+    MessageBuilder setTag(String tag);
+
+    /**
+     * Set the key for message.
+     *
+     * @param key key for the message.
+     * @return the message builder instance.
+     */
+    MessageBuilder setKey(String key);
+
+    /**
+     * Set the key collection for message.
+     *
+     * @param keys key collection for the message.
+     * @return the message builder instance.
+     */
+    MessageBuilder setKeys(Collection<String> keys);
+
+    /**
+     * Set the group for message.
+     *
+     * @param messageGroup group for the message.
+     * @return the message builder instance.
+     */
+    MessageBuilder setMessageGroup(String messageGroup);
+
+    /**
+     * Add user property for message.
+     *
+     * @param key   single property key.
+     * @param value single property value.
+     * @return the message builder instance.
+     */
+    MessageBuilder addProperty(String key, String value);
+
+    /**
+     * Finalize the build of the {@link Message} instance.
+     *
+     * <p>Unique {@link MessageId} is generated after message building.</p>
+     *
+     * @return the message instance.
+     */
+    Message build();
+}
diff --git a/java/client-apis/src/main/java/client/apis/message/MessageId.java b/java/client-apis/src/main/java/client/apis/message/MessageId.java
new file mode 100644
index 0000000..7b396ba
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/message/MessageId.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.message;
+
+/**
+ * Abstract message id, the implement must override {@link Object#toString()}, which indicates the message id using
+ * string form.
+ */
+public interface MessageId {
+    /**
+     * Get the version of message id.
+     *
+     * @return the version of message id.
+     */
+    MessageIdVersion getVersion();
+
+    /**
+     * The implementation <strong>must</strong> override this method, which indicates the message id using string form.
+     *
+     * @return string-formed string id.
+     */
+    String toString();
+}
diff --git a/java/client-apis/src/main/java/client/apis/message/MessageIdVersion.java b/java/client-apis/src/main/java/client/apis/message/MessageIdVersion.java
new file mode 100644
index 0000000..ab3fc57
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/message/MessageIdVersion.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.message;
+
+public enum MessageIdVersion {
+    /**
+     * V0 version, whose length is 32.
+     */
+    V0,
+    /**
+     * V1 version, whose length is 34 and begins with "01".
+     */
+    V1
+}
diff --git a/java/client-apis/src/main/java/client/apis/message/MessageView.java b/java/client-apis/src/main/java/client/apis/message/MessageView.java
new file mode 100644
index 0000000..3f576a4
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/message/MessageView.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.message;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.rocketmq.apis.MessageQueue;
+
+/**
+ * {@link MessageView} provides a read-only view for message, that's why setters do not exist here. In addition,
+ * it only makes sense when {@link Message} is sent successfully, or it could be considered as a return receipt
+ * for producer/consumer.
+ */
+public interface MessageView {
+    /**
+     * Get the unique id of message.
+     *
+     * @return unique id.
+     */
+    MessageId getMessageId();
+
+    /**
+     * Get the topic of message.
+     *
+     * @return topic of message.
+     */
+    String getTopic();
+
+    /**
+     * Get the <strong>deep copy</strong> of message body, which makes the modification of return value does not
+     * affect the message itself.
+     *
+     * @return the <strong>deep copy</strong> of message body.
+     */
+    byte[] getBody();
+
+    /**
+     * Get the <strong>deep copy</strong> of message properties, which makes the modification of return value does
+     * not affect the message itself.
+     *
+     * @return the <strong>deep copy</strong> of message properties.
+     */
+    Map<String, String> getProperties();
+
+    /**
+     * Get the tag of message, which is optional.
+     *
+     * @return the tag of message, which is optional.
+     */
+    Optional<String> getTag();
+
+    /**
+     * Get the key collection of message.
+     *
+     * @return <strong>the key collection</strong> of message.
+     */
+    Collection<String> getKeys();
+
+    /**
+     * Get the message group, which is optional and only make sense only when topic type is fifo.
+     *
+     * @return message group, which is optional.
+     */
+    Optional<String> getMessageGroup();
+
+    /**
+     * Get the expected delivery timestamp, which make sense only when topic type is delay.
+     *
+     * @return message expected delivery timestamp, which is optional.
+     */
+    Optional<Long> getDeliveryTimestamp();
+
+    /**
+     * Get the born host of message.
+     *
+     * @return born host of message.
+     */
+    String getBornHost();
+
+    /**
+     * Get the born timestamp of message.
+     *
+     * @return born timestamp of message.
+     */
+    long getBornTimestamp();
+
+    /**
+     * Get the delivery attempt for message.
+     *
+     * @return delivery attempt.
+     */
+    int getDeliveryAttempt();
+
+    /**
+     * Get the {@link MessageQueue} of message.
+     *
+     * @return message queue.
+     */
+    MessageQueue getMessageQueue();
+
+    /**
+     * Get the position of message in {@link MessageQueue}.
+     */
+    long getOffset();
+}
diff --git a/java/client-apis/src/main/java/client/apis/producer/Producer.java b/java/client-apis/src/main/java/client/apis/producer/Producer.java
new file mode 100644
index 0000000..a973635
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/producer/Producer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.producer;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.apis.exception.ClientException;
+import org.apache.rocketmq.apis.message.Message;
+
+/**
+ * Producer is a thread-safe rocketmq client which is used to publish messages.
+ *
+ * <p>On account of network timeout or other reasons, rocketmq producer only promised the at-least-once semantics.
+ * For producer, at-least-once semantics means potentially attempts are made at sending it, messages may be
+ * duplicated but not lost.
+ */
+public interface Producer extends Closeable {
+    /**
+     * Sends a message synchronously.
+     *
+     * <p>This method does not return until it gets the definitive result.
+     *
+     * @param message message to send.
+     */
+    SendReceipt send(Message message) throws ClientException;
+
+    /**
+     * Sends a transactional message synchronously.
+     *
+     * @param message     message to send.
+     * @param transaction transaction to bind.
+     * @return the message id assigned to the appointed message.
+     */
+    SendReceipt send(Message message, Transaction transaction) throws ClientException;
+
+    /**
+     * Sends a message asynchronously.
+     *
+     * <p>This method returns immediately, the result is included in the {@link CompletableFuture};
+     *
+     * @param message message to send.
+     * @return a future that indicates the result.
+     */
+    CompletableFuture<SendReceipt> sendAsync(Message message);
+
+    /**
+     * Sends batch messages synchronously.
+     *
+     * <p>This method does not return until it gets the definitive result.
+     *
+     * <p>All messages to send should have the same topic.
+     *
+     * @param messages batch messages to send.
+     * @return collection indicates the message id assigned to the appointed message, which keep the same order
+     * messages collection.
+     */
+    List<SendReceipt> send(List<Message> messages) throws ClientException;
+
+    /**
+     * Begins a transaction.
+     *
+     * <p>For example:
+     *
+     * <pre>{@code
+     * Transaction transaction = producer.beginTransaction();
+     * SendReceipt receipt1 = producer.send(message1, transaction);
+     * SendReceipt receipt2 = producer.send(message2, transaction);
+     * transaction.commit();
+     * }</pre>
+     *
+     * @return a transaction entity to execute commit/rollback operation.
+     */
+    Transaction beginTransaction() throws ClientException;
+
+    /**
+     * Close the producer and release all related resources.
+     *
+     * <p>This method does not return until all related resource is released. Once producer is closed, <strong>it could
+     * not be started once again.</strong> we maintained an FSM (finite-state machine) to record the different states
+     * for each producer.
+     */
+    @Override
+    void close();
+}
diff --git a/java/client-apis/src/main/java/client/apis/producer/ProducerBuilder.java b/java/client-apis/src/main/java/client/apis/producer/ProducerBuilder.java
new file mode 100644
index 0000000..31a79db
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/producer/ProducerBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.producer;
+
+import org.apache.rocketmq.apis.ClientConfiguration;
+import org.apache.rocketmq.apis.exception.ClientException;
+import org.apache.rocketmq.apis.message.Message;
+import org.apache.rocketmq.apis.retry.BackoffRetryPolicy;
+
+/**
+ * Builder to config and start {@link Producer}.
+ */
+public interface ProducerBuilder {
+    /**
+     * Set the client configuration for producer.
+     *
+     * @param clientConfiguration client's configuration.
+     * @return the producer builder instance.
+     */
+    ProducerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);
+
+    /**
+     * Declare topics ahead of message sending/preparation.
+     *
+     * <p>Even though the declaration is not essential, we <strong>highly recommend</strong> to declare the topics in
+     * advance, which could help to discover potential mistakes.
+     *
+     * @param topics topics to send/prepare.
+     * @return the producer builder instance.
+     */
+    ProducerBuilder setTopics(String... topics);
+
+    /**
+     * Set the threads count for {@link Producer#sendAsync(Message)}.
+     *
+     * @return the producer builder instance.
+     */
+    ProducerBuilder setAsyncThreadCount(int count);
+
+    /**
+     * Set the retry policy to send message.
+     *
+     * @param retryPolicy policy to re-send message when failure encountered.
+     * @return the producer builder instance.
+     */
+    ProducerBuilder setRetryPolicy(BackoffRetryPolicy retryPolicy);
+
+    /**
+     * Set the transaction checker for producer.
+     *
+     * @param checker transaction checker.
+     * @return the produce builder instance.
+     */
+    ProducerBuilder setTransactionChecker(TransactionChecker checker);
+
+    /**
+     * Finalize the build of {@link Producer} instance and start.
+     *
+     * <p>The producer does a series of preparatory work during startup, which could help to identify more unexpected
+     * error earlier.
+     *
+     * <p>Especially, if this method is invoked more than once, different producer will be created and started.
+     *
+     * @return the producer instance.
+     */
+    Producer build() throws ClientException;
+}
diff --git a/java/client-apis/src/main/java/client/apis/producer/SendReceipt.java b/java/client-apis/src/main/java/client/apis/producer/SendReceipt.java
new file mode 100644
index 0000000..8340ab2
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/producer/SendReceipt.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.producer;
+
+import org.apache.rocketmq.apis.MessageQueue;
+import org.apache.rocketmq.apis.message.MessageId;
+
+public interface SendReceipt {
+    MessageId getMessageId();
+
+    MessageQueue getMessageQueue();
+
+    long getOffset();
+}
diff --git a/java/client-apis/src/main/java/client/apis/producer/Transaction.java b/java/client-apis/src/main/java/client/apis/producer/Transaction.java
new file mode 100644
index 0000000..e2df718
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/producer/Transaction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.producer;
+
+import org.apache.rocketmq.apis.exception.ClientException;
+
+/**
+ * An entity to describe an independent transaction.
+ *
+ * <p>Once request of commit of roll-back reached server, subsequent arrived commit or roll-back request in
+ * {@link Transaction} would be ignored by server.
+ *
+ * <p>If transaction is not commit/roll-back in time, it is suspended until it is solved by {@link TransactionChecker}
+ * or reach the end of life.
+ */
+public interface Transaction {
+    /**
+     * Try to commit the transaction, which would expose the message before the transaction is closed if no exception
+     * thrown.
+     *
+     * <p>What you should pay more attention is that commit may be successful even exception thrown.
+     */
+    void commit() throws ClientException;
+
+    /**
+     * Try to roll back the transaction, which would expose the message before the transaction is closed if no exception
+     * thrown.
+     *
+     * <p>What you should pay more attention is that roll-back may be successful even exception thrown.
+     */
+    void rollback() throws ClientException;
+}
\ No newline at end of file
diff --git a/java/client-apis/src/main/java/client/apis/producer/TransactionChecker.java b/java/client-apis/src/main/java/client/apis/producer/TransactionChecker.java
new file mode 100644
index 0000000..5050469
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/producer/TransactionChecker.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.producer;
+
+import org.apache.rocketmq.apis.message.MessageView;
+
+/**
+ * Used to determine {@link TransactionResolution} when {@link Transaction} is not committed or roll-backed in time.
+ * {@link Transaction#commit()} and {@link Transaction#rollback()} does not promise that it would be applied
+ * successfully, so that checker here is necessary.
+ *
+ * <p>If {@link TransactionChecker#check(MessageView)} returns {@link TransactionResolution#UNKNOWN} or exception
+ * raised during the invocation of {@link TransactionChecker#check(MessageView)}, the examination from server will be
+ * performed periodically.
+ */
+public interface TransactionChecker {
+    /**
+     * Server will solve the suspended transactional message by this method.
+     *
+     * <p>If exception was thrown in this method, which equals {@link TransactionResolution#UNKNOWN} is returned.
+     *
+     * @param messageView message to determine {@link TransactionResolution}.
+     * @return the transaction resolution.
+     */
+    TransactionResolution check(MessageView messageView);
+}
diff --git a/java/client-apis/src/main/java/client/apis/producer/TransactionResolution.java b/java/client-apis/src/main/java/client/apis/producer/TransactionResolution.java
new file mode 100644
index 0000000..afaa4ff
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/producer/TransactionResolution.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.producer;
+
+public enum TransactionResolution {
+    /**
+     * Notify server that current transaction should be committed.
+     */
+    COMMIT,
+    /**
+     * Notify server that current transaction should be roll-backed.
+     */
+    ROLLBACK,
+    /**
+     * Notify server that the state of this transaction is not sure. You should be cautions before return unknown
+     * because the examination from server will be performed periodically.
+     */
+    UNKNOWN;
+}
diff --git a/java/client-apis/src/main/java/client/apis/retry/BackOffRetryPolicyBuilder.java b/java/client-apis/src/main/java/client/apis/retry/BackOffRetryPolicyBuilder.java
new file mode 100644
index 0000000..8f2fd02
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/retry/BackOffRetryPolicyBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.retry;
+
+import java.time.Duration;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class BackOffRetryPolicyBuilder {
+    private int maxAttempts = 3;
+    private Duration initialBackoff = Duration.ofMillis(100);
+    private Duration maxBackoff = Duration.ofSeconds(1);
+    private int backoffMultiplier = 2;
+
+    public BackOffRetryPolicyBuilder() {
+    }
+
+    BackOffRetryPolicyBuilder setMaxAttempts(int maxAttempts) {
+        checkArgument(maxAttempts > 0, "maxAttempts must be positive");
+        this.maxAttempts = maxAttempts;
+        return this;
+    }
+
+    BackOffRetryPolicyBuilder setInitialBackoff(Duration initialBackoff) {
+        this.initialBackoff = checkNotNull(initialBackoff, "initialBackoff should not be null");
+        return this;
+    }
+
+    BackOffRetryPolicyBuilder setMaxBackoff(Duration maxBackoff) {
+        this.maxBackoff = checkNotNull(maxBackoff, "maxBackoff should not be null");
+        return this;
+    }
+
+    BackOffRetryPolicyBuilder setBackoffMultiplier(int backoffMultiplier) {
+        checkArgument(backoffMultiplier > 0, "backoffMultiplier must be positive");
+        this.backoffMultiplier = backoffMultiplier;
+        return this;
+    }
+
+    BackoffRetryPolicy build() {
+        return new BackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/retry/BackoffRetryPolicy.java b/java/client-apis/src/main/java/client/apis/retry/BackoffRetryPolicy.java
new file mode 100644
index 0000000..2c5e0ff
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/retry/BackoffRetryPolicy.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.retry;
+
+import com.google.common.base.MoreObjects;
+import java.time.Duration;
+import java.util.Random;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The {@link BackoffRetryPolicy} defines a policy to do more attempts when failure is encountered, mainly refer to
+ * <a href="https://github.com/grpc/proposal/blob/master/A6-client-retries.md">gRPC Retry Design</a>.
+ */
+public class BackoffRetryPolicy implements RetryPolicy {
+    public static BackOffRetryPolicyBuilder newBuilder() {
+        return new BackOffRetryPolicyBuilder();
+    }
+
+    private final Random random;
+    private final int maxAttempts;
+    private final Duration initialBackoff;
+    private final Duration maxBackoff;
+    private final int backoffMultiplier;
+
+    public BackoffRetryPolicy(int maxAttempts, Duration initialBackoff, Duration maxBackoff, int backoffMultiplier) {
+        checkArgument(maxBackoff.compareTo(initialBackoff) <= 0, "initialBackoff should not be minor than maxBackoff");
+        checkArgument(maxAttempts > 0, "maxAttempts must be positive");
+        this.random = new Random();
+        this.maxAttempts = maxAttempts;
+        this.initialBackoff = checkNotNull(initialBackoff, "initialBackoff should not be null");
+        this.maxBackoff = maxBackoff;
+        this.backoffMultiplier = backoffMultiplier;
+    }
+
+    @Override
+    public int getMaxAttempts() {
+        return maxAttempts;
+    }
+
+    @Override
+    public Duration getNextAttemptDelay(int attempt) {
+        checkArgument(attempt > 0, "attempt must be positive");
+        int randomNumberBound = Math.min(initialBackoff.getNano() * (backoffMultiplier ^ (attempt - 1)),
+            maxBackoff.getNano());
+        return Duration.ofNanos(random.nextInt(randomNumberBound));
+    }
+
+    public Duration getInitialBackoff() {
+        return initialBackoff;
+    }
+
+    public Duration getMaxBackoff() {
+        return maxBackoff;
+    }
+
+    public int getBackoffMultiplier() {
+        return backoffMultiplier;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("maxAttempts", maxAttempts)
+            .add("initialBackoff", initialBackoff)
+            .add("maxBackoff", maxBackoff)
+            .add("backoffMultiplier", backoffMultiplier)
+            .toString();
+    }
+}
diff --git a/java/client-apis/src/main/java/client/apis/retry/RetryPolicy.java b/java/client-apis/src/main/java/client/apis/retry/RetryPolicy.java
new file mode 100644
index 0000000..c890dcf
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/retry/RetryPolicy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.retry;
+
+import java.time.Duration;
+
+/**
+ * Internal interface for retry policy.
+ */
+interface RetryPolicy {
+    /**
+     * Get the max attempt times for retry.
+     *
+     * @return max attempt times.
+     */
+    int getMaxAttempts();
+
+    /**
+     * Get await time after current attempts, the attempt index starts at 1.
+     *
+     * @param attempt current attempt.
+     * @return await time.
+     */
+    Duration getNextAttemptDelay(int attempt);
+}
diff --git a/java/pom.xml b/java/client-java/pom.xml
similarity index 62%
copy from java/pom.xml
copy to java/client-java/pom.xml
index 049a500..a5be176 100644
--- a/java/pom.xml
+++ b/java/client-java/pom.xml
@@ -2,15 +2,14 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-client-java-parent</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>5.0.0-SNAPSHOT</version>
+    </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <groupId>org.apache.rocketmq</groupId>
-    <artifactId>rocketmq-client-java-parent</artifactId>
-    <packaging>pom</packaging>
-    <version>5.0.0-SNAPSHOT</version>
-    <modules>
-        <module>apis</module>
-    </modules>
+    <artifactId>rocketmq-client-java</artifactId>
 
     <properties>
         <maven.compiler.release>8</maven.compiler.release>
diff --git a/java/pom.xml b/java/pom.xml
index 049a500..39886f4 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -9,7 +9,8 @@
     <packaging>pom</packaging>
     <version>5.0.0-SNAPSHOT</version>
     <modules>
-        <module>apis</module>
+        <module>client-apis</module>
+        <module>client-java</module>
     </modules>
 
     <properties>