You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/06/17 21:23:22 UTC
[rocketmq-clients] 02/02: [RIP-37] Add new APIs for consumer
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit f57efb82c2a7d960701ffd31746923d41738b865
Author: Zhongliang.Chen <ch...@gmail.com>
AuthorDate: Sat Jun 18 05:16:35 2022 +0800
[RIP-37] Add new APIs for consumer
---
.../main/java/client/apis/ClientConfiguration.java | 61 ++++++++
.../client/apis/ClientConfigurationBuilder.java | 72 ++++++++++
.../java/client/apis/ClientServiceProvider.java | 68 +++++++++
.../src/main/java/client/apis/MessageQueue.java | 24 ++++
.../main/java/client/apis/SessionCredentials.java | 58 ++++++++
.../client/apis/SessionCredentialsProvider.java | 30 ++++
.../apis/StaticSessionCredentialsProvider.java | 35 +++++
.../java/client/apis/consumer/ConsumeResult.java | 31 +++++
.../client/apis/consumer/FilterExpression.java | 56 ++++++++
.../client/apis/consumer/FilterExpressionType.java | 32 +++++
.../java/client/apis/consumer/MessageListener.java | 54 ++++++++
.../java/client/apis/consumer/PushConsumer.java | 94 +++++++++++++
.../client/apis/consumer/PushConsumerBuilder.java | 87 ++++++++++++
.../java/client/apis/consumer/SimpleConsumer.java | 154 +++++++++++++++++++++
.../apis/consumer/SimpleConsumerBuilder.java | 66 +++++++++
.../apis/exception/AuthenticationException.java | 31 +++++
.../apis/exception/AuthorisationException.java | 35 +++++
.../client/apis/exception/ClientException.java | 72 ++++++++++
.../main/java/client/apis/exception/ErrorCode.java | 84 +++++++++++
.../apis/exception/FlowControlException.java | 25 ++++
.../client/apis/exception/NetworkException.java | 28 ++++
.../exception/RemoteIllegalArgumentException.java | 25 ++++
.../apis/exception/ResourceNotFoundException.java | 25 ++++
.../apis/exception/ResourceNotMatchException.java | 24 ++++
.../client/apis/exception/TimeoutException.java | 28 ++++
.../src/main/java/client/apis/message/Message.java | 77 +++++++++++
.../java/client/apis/message/MessageBuilder.java | 91 ++++++++++++
.../main/java/client/apis/message/MessageId.java | 38 +++++
.../java/client/apis/message/MessageIdVersion.java | 29 ++++
.../main/java/client/apis/message/MessageView.java | 121 ++++++++++++++++
.../main/java/client/apis/producer/Producer.java | 100 +++++++++++++
.../java/client/apis/producer/ProducerBuilder.java | 82 +++++++++++
.../java/client/apis/producer/SendReceipt.java | 29 ++++
.../java/client/apis/producer/Transaction.java | 47 +++++++
.../client/apis/producer/TransactionChecker.java | 41 ++++++
.../apis/producer/TransactionResolution.java | 34 +++++
.../apis/retry/BackOffRetryPolicyBuilder.java | 59 ++++++++
.../java/client/apis/retry/BackoffRetryPolicy.java | 86 ++++++++++++
.../main/java/client/apis/retry/RetryPolicy.java | 40 ++++++
java/{ => client-java}/pom.xml | 13 +-
java/pom.xml | 3 +-
41 files changed, 2181 insertions(+), 8 deletions(-)
diff --git a/java/client-apis/src/main/java/client/apis/ClientConfiguration.java b/java/client-apis/src/main/java/client/apis/ClientConfiguration.java
new file mode 100644
index 0000000..56a1b56
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/ClientConfiguration.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+import java.time.Duration;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Common client configuration.
+ */
+public class ClientConfiguration {
+ private final String endpoints;
+ private final SessionCredentialsProvider sessionCredentialsProvider;
+ private final Duration requestTimeout;
+ private final boolean enableTracing;
+
+ public static ClientConfigurationBuilder newBuilder() {
+ return new ClientConfigurationBuilder();
+ }
+
+ public ClientConfiguration(String endpoints, SessionCredentialsProvider sessionCredentialsProvider,
+ Duration requestTimeout, boolean enableTracing) {
+ this.endpoints = checkNotNull(endpoints, "endpoints should not be null");
+ this.sessionCredentialsProvider = checkNotNull(sessionCredentialsProvider, "credentialsProvider should not be"
+ + " null");
+ this.requestTimeout = checkNotNull(requestTimeout, "requestTimeout should be not null");
+ this.enableTracing = enableTracing;
+ }
+
+ public String getEndpoints() {
+ return endpoints;
+ }
+
+ public SessionCredentialsProvider getCredentialsProvider() {
+ return sessionCredentialsProvider;
+ }
+
+ public Duration getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public boolean isEnableTracing() {
+ return enableTracing;
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/ClientConfigurationBuilder.java b/java/client-apis/src/main/java/client/apis/ClientConfigurationBuilder.java
new file mode 100644
index 0000000..94999bd
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/ClientConfigurationBuilder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+import java.time.Duration;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Builder to set {@link ClientConfiguration}.
+ */
+public class ClientConfigurationBuilder {
+ private String endpoints;
+ private SessionCredentialsProvider sessionCredentialsProvider;
+ private Duration requestTimeout;
+ private boolean enableTracing;
+
+ /**
+ * Configure the endpoints with which the SDK should communicate.
+ *
+ * <p>Endpoints here means address of service, complying with the following scheme(part square brackets is
+ * optional).
+ * <p>1. DNS scheme(default): dns:host[:port], host is the host to resolve via DNS, port is the port to return
+ * for each address. If not specified, 443 is used.
+ * <p>2. ipv4 scheme: ipv4:address:port[,address:port,...]
+ * <p>3. ipv6 scheme: ipv6:address:port[,address:port,...]
+ * <p>4. http/https scheme: http|https://host[:port], similar to DNS scheme, if port not specified, 443 is used.
+ *
+ * @param endpoints address of service.
+ * @return the client configuration builder instance.
+ */
+ public ClientConfigurationBuilder setEndpoints(String endpoints) {
+ checkNotNull(endpoints, "endpoints should not be not null");
+ this.endpoints = endpoints;
+ return this;
+ }
+
+ public ClientConfigurationBuilder setCredentialProvider(SessionCredentialsProvider sessionCredentialsProvider) {
+ this.sessionCredentialsProvider = checkNotNull(sessionCredentialsProvider, "credentialsProvider should not be "
+ + "null");
+ return this;
+ }
+
+ public ClientConfigurationBuilder setRequestTimeout(Duration requestTimeout) {
+ this.requestTimeout = checkNotNull(requestTimeout, "requestTimeout should not be not null");
+ return this;
+ }
+
+ public ClientConfigurationBuilder enableTracing(boolean enableTracing) {
+ this.enableTracing = enableTracing;
+ return this;
+ }
+
+ public ClientConfiguration build() {
+ return new ClientConfiguration(endpoints, sessionCredentialsProvider, requestTimeout, enableTracing);
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/ClientServiceProvider.java b/java/client-apis/src/main/java/client/apis/ClientServiceProvider.java
new file mode 100644
index 0000000..1ec7522
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/ClientServiceProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+import org.apache.rocketmq.apis.consumer.PushConsumerBuilder;
+import org.apache.rocketmq.apis.consumer.SimpleConsumerBuilder;
+import org.apache.rocketmq.apis.message.MessageBuilder;
+import org.apache.rocketmq.apis.producer.ProducerBuilder;
+
+/**
+ * Service provider to seek client, which load client according to
+ * <a href="https://en.wikipedia.org/wiki/Service_provider_interface">Java SPI mechanism</a>.
+ */
+public interface ClientServiceProvider {
+ static ClientServiceProvider loadService() {
+ final ServiceLoader<ClientServiceProvider> loaders = ServiceLoader.load(ClientServiceProvider.class);
+ final Iterator<ClientServiceProvider> iterators = loaders.iterator();
+ if (iterators.hasNext()) {
+ return iterators.next();
+ }
+ throw new UnsupportedOperationException("Client service provider not found");
+ }
+
+ /**
+ * Get the producer builder by current provider.
+ *
+ * @return the producer builder instance.
+ */
+ ProducerBuilder newProducerBuilder();
+
+ /**
+ * Get the simple consumer builder by current provider.
+ *
+ * @return the simple consumer builder instance.
+ */
+ SimpleConsumerBuilder newSimpleConsumerBuilder();
+
+ /**
+ * Get the push consumer builder by current provider.
+ *
+ * @return the push consumer builder instance.
+ */
+ PushConsumerBuilder newPushConsumerBuilder();
+
+ /**
+ * Get the message builder by current provider.
+ *
+ * @return the message builder instance.
+ */
+ MessageBuilder newMessageBuilder();
+}
diff --git a/java/client-apis/src/main/java/client/apis/MessageQueue.java b/java/client-apis/src/main/java/client/apis/MessageQueue.java
new file mode 100644
index 0000000..8c722e2
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/MessageQueue.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+public interface MessageQueue {
+ String getTopic();
+
+ String getId();
+}
\ No newline at end of file
diff --git a/java/client-apis/src/main/java/client/apis/SessionCredentials.java b/java/client-apis/src/main/java/client/apis/SessionCredentials.java
new file mode 100644
index 0000000..adef427
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/SessionCredentials.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Session credentials used in service authentications.
+ */
+public class SessionCredentials {
+ private final String accessKey;
+ private final String accessSecret;
+ private final String securityToken;
+
+ public SessionCredentials(String accessKey, String accessSecret, String securityToken) {
+ this.accessKey = checkNotNull(accessKey, "accessKey should not be null");
+ this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null");
+ this.securityToken = checkNotNull(securityToken, "securityToken should not be null");
+ }
+
+ public SessionCredentials(String accessKey, String accessSecret) {
+ this.accessKey = checkNotNull(accessKey, "accessKey should not be null");
+ this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null");
+ this.securityToken = null;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public String getAccessSecret() {
+ return accessSecret;
+ }
+
+ public Optional<String> getSecurityToken() {
+ if (null == securityToken) {
+ return Optional.empty();
+ }
+ return Optional.of(securityToken);
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/SessionCredentialsProvider.java b/java/client-apis/src/main/java/client/apis/SessionCredentialsProvider.java
new file mode 100644
index 0000000..52e6912
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/SessionCredentialsProvider.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+/**
+ * Abstract provider to provide {@link SessionCredentials}.
+ */
+public interface SessionCredentialsProvider {
+ /**
+ * Get the provided credentials.
+ *
+ * @return provided credentials.
+ */
+ SessionCredentials getCredentials();
+}
diff --git a/java/client-apis/src/main/java/client/apis/StaticSessionCredentialsProvider.java b/java/client-apis/src/main/java/client/apis/StaticSessionCredentialsProvider.java
new file mode 100644
index 0000000..5c0d46c
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/StaticSessionCredentialsProvider.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis;
+
+public class StaticSessionCredentialsProvider implements SessionCredentialsProvider {
+ private final SessionCredentials credentials;
+
+ public StaticSessionCredentialsProvider(String accessKey, String accessSecret) {
+ this.credentials = new SessionCredentials(accessKey, accessSecret);
+ }
+
+ public StaticSessionCredentialsProvider(String accessKey, String accessSecret, String securityToken) {
+ this.credentials = new SessionCredentials(accessKey, accessSecret, securityToken);
+ }
+
+ @Override
+ public SessionCredentials getCredentials() {
+ return credentials;
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/ConsumeResult.java b/java/client-apis/src/main/java/client/apis/consumer/ConsumeResult.java
new file mode 100644
index 0000000..b62939b
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/ConsumeResult.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package client.apis.consumer;
+
+public enum ConsumeResult {
+ /**
+ * Consume message success and need commit this message.
+ */
+ SUCCESS,
+
+ /**
+ * Failed to consume the message, expecting potential delivery after configured backoff.
+ */
+ FAILURE
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/FilterExpression.java b/java/client-apis/src/main/java/client/apis/consumer/FilterExpression.java
new file mode 100644
index 0000000..7f76ba7
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/FilterExpression.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package client.apis.consumer;
+
+import java.util.Objects;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class FilterExpression {
+ public static final String TAG_EXPRESSION_SUB_ALL = "*";
+ public static final String TAG_EXPRESSION_SPLITTER = "\\|\\|";
+ private final String expression;
+ private final FilterExpressionType filterExpressionType;
+
+ public FilterExpression(String expression, FilterExpressionType filterExpressionType) {
+ this.expression = checkNotNull(expression, "expression should not be null");
+ this.filterExpressionType = checkNotNull(filterExpressionType, "filterExpressionType should not be null");
+ }
+
+ public String getExpression() {
+ return expression;
+ }
+
+ public FilterExpressionType getFilterExpressionType() {
+ return filterExpressionType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FilterExpression that = (FilterExpression) o;
+ return expression.equals(that.expression) && filterExpressionType == that.filterExpressionType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(expression, filterExpressionType);
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/FilterExpressionType.java b/java/client-apis/src/main/java/client/apis/consumer/FilterExpressionType.java
new file mode 100644
index 0000000..1284bfd
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/FilterExpressionType.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package client.apis.consumer;
+
+public enum FilterExpressionType {
+ /**
+ * Follows SQL92 standard.
+ */
+ SQL92,
+ /**
+ * Only support or operation such as
+ * "tag1 || tag2 || tag3", <br>
+ * If null or * expression,meaning subscribe all.
+ */
+ TAG
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/MessageListener.java b/java/client-apis/src/main/java/client/apis/consumer/MessageListener.java
new file mode 100644
index 0000000..2e74a4d
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/MessageListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.consumer;
+
+import org.apache.rocketmq.apis.message.MessageView;
+
+/**
+ * <p>MessageListener is used only by PushConsumer to process messages
+ * synchronously.
+ *
+ * <p>PushConsumer will fetch messages from brokers and dispatch them to an
+ * embedded thread pool in form of <code>Runnable</code> tasks to achieve desirable processing concurrency.
+ *
+ * <p>Refer to {@link PushConsumer} for more further specs.
+ *
+ * <p>
+ * <strong>Thread Safety</strong>
+ * This class may be called concurrently by multiple threads. Implementation should be thread safe.
+ * </p>
+ */
+public interface MessageListener {
+
+ /**
+ * Callback interface to handle incoming messages.
+ *
+ * Application developers are expected to implement this interface to fulfill business requirements through
+ * processing <code>message</code> and return
+ * <code>ConsumeResult</code> accordingly.
+ *
+ * PushConsumer will, on behalf of its group, acknowledge the message to broker on success; In case of failure or
+ * unexpected exceptions were raised, it will negatively acknowledge <code>message</code>, which would potentially
+ * get re-delivered after the configured back off period.
+ *
+ * @param message The message passed to the listener.
+ * @return {@link ConsumeResult#SUCCESS} if <code>message</code> is properly processed; {@link
+ * ConsumeResult#FAILURE} otherwise.
+ */
+ ConsumeResult onMessage(MessageView message);
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/PushConsumer.java b/java/client-apis/src/main/java/client/apis/consumer/PushConsumer.java
new file mode 100644
index 0000000..6b7c1a4
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/PushConsumer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.consumer;
+
+import java.io.Closeable;
+import java.util.Map;
+import org.apache.rocketmq.apis.exception.ClientException;
+
+
+/**
+ * PushConsumer is a managed client which delivers messages to application through {@link MessageListener}.
+ *
+ * <p>Consumers of the same group are designed to share messages from broker servers. As a result, consumers of the same
+ * group must have <strong>exactly identical subscription expressions</strong>, otherwise the behavior is undefined.
+ *
+ * <p>For a brand-new group, consumers consume messages from head of underlying queues, ignoring existing messages
+ * completely. In addition to delivering messages to clients, broker servers also maintain progress in perspective of
+ * group. Thus, consumers can safely restart and resume their progress automatically.</p>
+ *
+ * <p>There are scenarios where <a href="https://en.wikipedia.org/wiki/Fan-out_(software)">fan-out</a> is preferred,
+ * recommended solution is to use dedicated group of each client.
+ *
+ * <p>To mitigate latency, PushConsumer adopts
+ * <a href="https://en.wikipedia.org/wiki/Reactive_Streams">reactive streams</a> pattern. Namely,
+ * messages received from broker servers are first cached locally, amount of which is controlled by
+ * {@link PushConsumerBuilder#setMaxCacheMessageCount(int)} and
+ * {@link PushConsumerBuilder#setMaxCacheMessageSizeInBytes(int)}, and then dispatched to thread pool to achieve
+ * desirable concurrency.
+ */
+public interface PushConsumer extends Closeable {
+ /**
+ * Get the load balancing group for consumer.
+ *
+ * @return consumer load balancing group.
+ */
+ String getConsumerGroup();
+
+ /**
+ * List the existed subscription expressions in push consumer.
+ *
+ * @return map of topic to filter expression.
+ */
+ Map<String, FilterExpression> subscriptionExpressions();
+
+ /**
+ * Add subscription expression dynamically.
+ *
+ * <p>If first subscriptionExpression that contains topicA and tag1 is exists already in consumer, then
+ * second subscriptionExpression which contains topicA and tag2, <strong>the result is that the second one
+ * replaces the first one instead of integrating them</strong>.
+ *
+ * @param topic new topic that need to add or update.
+ * @param filterExpression new filter expression to add or update.
+ * @return push consumer instance.
+ */
+ PushConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException;
+
+ /**
+ * Remove subscription expression dynamically by topic.
+ *
+ * <p>It stops the backend task to fetch message from remote, and besides that, the local cached message whose topic
+ * was removed before would not be delivered to {@link MessageListener} anymore.
+ *
+ * <p>Nothing occurs if the specified topic does not exist in subscription expressions of push consumer.
+ *
+ * @param topic the topic to remove subscription.
+ * @return push consumer instance.
+ */
+ PushConsumer unsubscribe(String topic) throws ClientException;
+
+ /**
+ * Close the push consumer and release all related resources.
+ *
+ * <p>Once push consumer is closed, <strong>it could not be started once again.</strong> we maintained an FSM
+ * (finite-state machine) to record the different states for each producer, which is similar to
+ */
+ @Override
+ void close();
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/PushConsumerBuilder.java b/java/client-apis/src/main/java/client/apis/consumer/PushConsumerBuilder.java
new file mode 100644
index 0000000..82bfd2f
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/PushConsumerBuilder.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.consumer;
+
+import java.util.Map;
+import org.apache.rocketmq.apis.ClientConfiguration;
+import org.apache.rocketmq.apis.exception.ClientException;
+
+public interface PushConsumerBuilder {
+ /**
+ * Set the client configuration for consumer.
+ *
+ * @param clientConfiguration client's configuration.
+ * @return the consumer builder instance.
+ */
+ PushConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);
+
+ /**
+ * Set the load balancing group for consumer.
+ *
+ * @param consumerGroup consumer load balancing group.
+ * @return the consumer builder instance.
+ */
+ PushConsumerBuilder setConsumerGroup(String consumerGroup);
+
+ /**
+ * Add subscriptionExpressions for consumer.
+ *
+ * @param subscriptionExpressions subscriptions to add which use the map of topic to filterExpression.
+ * @return the consumer builder instance.
+ */
+ PushConsumerBuilder setSubscriptionExpressions(Map<String, FilterExpression> subscriptionExpressions);
+
+ /**
+ * Register message listener, all messages meet the subscription expression would across listener here.
+ *
+ * @param listener message listener.
+ * @return the consumer builder instance.
+ */
+ PushConsumerBuilder setMessageListener(MessageListener listener);
+
+ /**
+ * Set the maximum number of messages cached locally.
+ *
+ * @param count message count.
+ * @return the consumer builder instance.
+ */
+ PushConsumerBuilder setMaxCacheMessageCount(int count);
+
+ /**
+ * Set the maximum bytes of messages cached locally.
+ *
+ * @param bytes message size.
+ * @return the consumer builder instance.
+ */
+ PushConsumerBuilder setMaxCacheMessageSizeInBytes(int bytes);
+
+ /**
+ * Set the consumption thread count in parallel.
+ *
+ * @param count thread count.
+ * @return the consumer builder instance.
+ */
+ PushConsumerBuilder setThreadCount(int count);
+
+ /**
+ * Finalize the build of {@link PushConsumer}.
+ *
+ * @return the push consumer instance.
+ */
+ PushConsumer build() throws ClientException;
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumer.java b/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumer.java
new file mode 100644
index 0000000..4bc3827
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumer.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.consumer;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.apis.exception.ClientException;
+import org.apache.rocketmq.apis.message.MessageView;
+
+/**
+ * SimpleConsumer is a thread-safe rocketmq client which is used to consume message by group.
+ *
+ * <p>Simple consumer is lightweight consumer , if you want fully control the message consumption operation by yourself,
+ * simple consumer should be your first consideration.
+ *
+ * <p>Consumers belong to the same consumer group share messages from server,
+ * so consumer in the same group must have the same subscriptionExpressions, otherwise the behavior is
+ * undefined. If a new consumer group's consumer is started first time, it consumes from the latest position. Once
+ * consumer is started, server records its consumption progress and derives it in subsequent startup.
+ *
+ * <p>You may intend to maintain different consumption progress for different consumer, different consumer group
+ * should be set in this case.
+ *
+ * <p> Simple consumer divide message consumption to 3 parts.
+ * Firstly, call receive api get messages from server; Then process message by yourself; At last, your must call Ack api to commit this message.
+ * If there is error when process message ,your can reconsume the message later which control by the invisibleDuration parameter.
+ * Also, you can change the invisibleDuration by call changeInvisibleDuration api.
+ */
+public interface SimpleConsumer extends Closeable {
+ /**
+ * Get the load balancing group for simple consumer.
+ *
+ * @return consumer load balancing group.
+ */
+ String getConsumerGroup();
+
+ /**
+ * Add subscription expression dynamically.
+ *
+ * <p>If first subscriptionExpression that contains topicA and tag1 is exists already in consumer, then
+ * second subscriptionExpression which contains topicA and tag2, <strong>the result is that the second one
+ * replaces the first one instead of integrating them</strong>.
+ *
+ * @param topic new topic that need to add or update.
+ * @param filterExpression new filter expression to add or update.
+ * @return simple consumer instance.
+ */
+ SimpleConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException;
+
+ /**
+ * Remove subscription expression dynamically by topic.
+ *
+ * <p>It stops the backend task to fetch message from remote, and besides that, the local cached message whose topic
+ * was removed before would not be delivered to {@link MessageListener} anymore.
+ *
+ * <p>Nothing occurs if the specified topic does not exist in subscription expressions of push consumer.
+ *
+ * @param topic the topic to remove subscription.
+ * @return simple consumer instance.
+ */
+ SimpleConsumer unsubscribe(String topic) throws ClientException;
+
+ /**
+ * List the existed subscription expressions in simple consumer.
+ *
+ * @return map of topic to filter expression.
+ */
+ Map<String, FilterExpression> subscriptionExpressions();
+
+ /**
+ * Fetch messages from server synchronously.
+ * <p> This method returns immediately if there are messages available.
+ * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned.
+ * @param maxMessageNum max message num when server returns.
+ * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout.
+ * @return list of messageView
+ */
+ List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException;
+
+ /**
+ * Fetch messages from server asynchronously.
+ * <p> This method returns immediately if there are messages available.
+ * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned.
+ * @param maxMessageNum max message num when server returns.
+ * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout.
+ * @return list of messageView
+ */
+ CompletableFuture<List<MessageView>> receiveAsync(int maxMessageNum, Duration invisibleDuration) throws ClientException;
+
+ /**
+ * Ack message to server synchronously, server commit this message.
+ *
+ * <p> Duplicate ack request does not take effect and throw exception.
+ * @param messageView special messageView with handle want to ack.
+ */
+ void ack(MessageView messageView) throws ClientException;
+
+ /**
+ * Ack message to server asynchronously, server commit this message.
+ *
+ * <p> Duplicate ack request does not take effect and throw exception.
+ * @param messageView special messageView with handle want to ack.
+ * @return CompletableFuture of this request.
+ */
+ CompletableFuture<Void> ackAsync(MessageView messageView);
+
+ /**
+ * Changes the invisible duration of a specified message synchronously.
+ *
+ * <p> The origin invisible duration for a message decide by ack request.
+ *
+ * <p>You must call change request before the origin invisible duration timeout.
+ * If called change request later than the origin invisible duration, this request does not take effect and throw exception.
+ * Duplicate change request will refresh the next visible time of this message to other consumers.
+ * @param messageView special messageView with handle want to change.
+ * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time.
+ */
+ void changeInvisibleDuration(MessageView messageView, Duration invisibleDuration) throws ClientException;
+
+ /**
+ * Changes the invisible duration of a specified message asynchronously.
+ *
+ * <p> The origin invisible duration for a message decide by ack request.
+ *
+ * <p> You must call change request before the origin invisible duration timeout.
+ * If called change request later than the origin invisible duration, this request does not take effect and throw exception.
+ * Duplicate change request will refresh the next visible time of this message to other consumers.
+ * @param messageView special messageView with handle want to change.
+ * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time.
+ * @return CompletableFuture of this request.
+ */
+ CompletableFuture<Void> changeInvisibleDurationAsync(MessageView messageView, Duration invisibleDuration);
+
+ @Override
+ void close();
+}
diff --git a/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumerBuilder.java b/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumerBuilder.java
new file mode 100644
index 0000000..0760b0e
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/consumer/SimpleConsumerBuilder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.consumer;
+
+import java.time.Duration;
+import java.util.Map;
+import org.apache.rocketmq.apis.ClientConfiguration;
+import org.apache.rocketmq.apis.exception.ClientException;
+
+public interface SimpleConsumerBuilder {
+ /**
+ * Set the client configuration for simple consumer.
+ *
+ * @param clientConfiguration client's configuration.
+ * @return the simple consumer builder instance.
+ */
+ SimpleConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);
+
+ /**
+ * Set the load balancing group for simple consumer.
+ *
+ * @param consumerGroup consumer load balancing group.
+ * @return the consumer builder instance.
+ */
+ SimpleConsumerBuilder setConsumerGroup(String consumerGroup);
+
+ /**
+ * Add subscriptionExpressions for simple consumer.
+ *
+ * @param subscriptionExpressions subscriptions to add which use the map of topic to filterExpression.
+ * @return the consumer builder instance.
+ */
+ SimpleConsumerBuilder setSubscriptionExpressions(Map<String, FilterExpression> subscriptionExpressions);
+
+ /**
+ * Set the max await time when receive message from server.
+ * The simple consumer will hold this long-polling receive requests until a message is returned or a timeout occurs.
+ * @param awaitDuration The maximum time to block when no message available.
+ * @return the consumer builder instance.
+ */
+ SimpleConsumerBuilder setAwaitDuration(Duration awaitDuration);
+
+ /**
+ * Finalize the build of the {@link SimpleConsumer} instance and start.
+ *
+ * <p>This method will block until simple consumer starts successfully.
+ *
+ * @return the simple consumer instance.
+ */
+ SimpleConsumer build() throws ClientException;
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/AuthenticationException.java b/java/client-apis/src/main/java/client/apis/exception/AuthenticationException.java
new file mode 100644
index 0000000..bfe7444
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/AuthenticationException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+/**
+ * The difference between {@link AuthorisationException} and {@link AuthenticationException} is that
+ * {@link AuthenticationException} here means current user's identity could not be recognized.
+ *
+ * <p>For example, {@link AuthenticationException} will be thrown if access key is invalid.
+ */
+public class AuthenticationException extends ClientException {
+ public AuthenticationException(ErrorCode code, String message, String requestId) {
+ super(code, message);
+ putMetadata(REQUEST_ID_KEY, requestId);
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/AuthorisationException.java b/java/client-apis/src/main/java/client/apis/exception/AuthorisationException.java
new file mode 100644
index 0000000..cc3a93f
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/AuthorisationException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+import org.apache.rocketmq.apis.message.Message;
+import org.apache.rocketmq.apis.producer.Producer;
+
+/**
+ * The difference between {@link AuthenticationException} and {@link AuthorisationException} is that
+ * {@link AuthorisationException} here means current users don't have permission to do current operation.
+ *
+ * <p>For example, current user is forbidden to send message to this topic, {@link AuthorisationException} will be
+ * thrown in {@link Producer#send(Message)}.
+ */
+public class AuthorisationException extends ClientException {
+ public AuthorisationException(ErrorCode code, String message, String requestId) {
+ super(code, message);
+ putMetadata(REQUEST_ID_KEY, requestId);
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/ClientException.java b/java/client-apis/src/main/java/client/apis/exception/ClientException.java
new file mode 100644
index 0000000..018b4da
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/ClientException.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+import com.google.common.base.MoreObjects;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Base exception for all exception raised in client, each exception should derive from current class.
+ * It should throw exception which is derived from {@link ClientException} rather than {@link ClientException} itself.
+ */
+public abstract class ClientException extends Exception {
+ /**
+ * For those {@link ClientException} along with a remote procedure call, request id could be used to track the
+ * request.
+ */
+ protected static final String REQUEST_ID_KEY = "request-id";
+
+ private final ErrorCode errorCode;
+ private final Map<String, String> context;
+
+ ClientException(ErrorCode errorCode, String message, Throwable cause) {
+ super(message, cause);
+ this.errorCode = errorCode;
+ this.context = new HashMap<>();
+ }
+
+ ClientException(ErrorCode errorCode, String message) {
+ super(message);
+ this.errorCode = errorCode;
+ this.context = new HashMap<>();
+ }
+
+ @SuppressWarnings("SameParameterValue")
+ protected void putMetadata(String key, String value) {
+ context.put(key, value);
+ }
+
+ public Optional<String> getRequestId() {
+ final String requestId = context.get(REQUEST_ID_KEY);
+ return null == requestId ? Optional.empty() : Optional.of(requestId);
+ }
+
+ public ErrorCode getErrorCode() {
+ return errorCode;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(super.toString())
+ .add("errorCode", errorCode)
+ .add("context", context)
+ .toString();
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/ErrorCode.java b/java/client-apis/src/main/java/client/apis/exception/ErrorCode.java
new file mode 100644
index 0000000..1d2895e
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/ErrorCode.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+/**
+ * Indicates the reason why the exception is thrown, it can be easily divided into the following categories.
+ *
+ * <blockquote>
+ *
+ * <table>
+ * <caption>Error Categories and Exceptions</caption>
+ * <tr>
+ * <th>Category
+ * <th>Exception
+ * <th>Code range
+ * <tr>
+ * <td>Illegal client argument
+ * <td>{@link RemoteIllegalArgumentException}
+ * <p>{@link IllegalArgumentException}
+ * <td>{@code [101..199]}
+ * <tr>
+ * <td>Authorisation failure
+ * <td>{@link AuthorisationException}
+ * <td>{@code [201..299]}
+ * <tr>
+ * <td>Resource not found
+ * <td>{@link ResourceNotFoundException}
+ * <td>{@code [301..399]}
+ * </table>
+ *
+ * </blockquote>
+ */
+public enum ErrorCode {
+ /**
+ * Format of topic is illegal.
+ */
+ INVALID_TOPIC(101),
+ /**
+ * Format of consumer group is illegal.
+ */
+ INVALID_CONSUMER_GROUP(102),
+ /**
+ * Message is forbidden to publish.
+ */
+ MESSAGE_PUBLISH_FORBIDDEN(201),
+ /**
+ * Topic does not exist.
+ */
+ TOPIC_DOES_NOT_EXIST(301),
+ /**
+ * Consumer group does not exist.
+ */
+ CONSUMER_GROUP_DOES_NOT_EXIST(302);
+
+ private final int code;
+
+ ErrorCode(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(code);
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/FlowControlException.java b/java/client-apis/src/main/java/client/apis/exception/FlowControlException.java
new file mode 100644
index 0000000..ed177c0
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/FlowControlException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+public class FlowControlException extends ClientException {
+ public FlowControlException(ErrorCode code, String message, String requestId) {
+ super(code, message);
+ putMetadata(REQUEST_ID_KEY, requestId);
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/NetworkException.java b/java/client-apis/src/main/java/client/apis/exception/NetworkException.java
new file mode 100644
index 0000000..aa5af7b
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/NetworkException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+public class NetworkException extends ClientException {
+ public NetworkException(ErrorCode code, String message, Throwable cause) {
+ super(code, message, cause);
+ }
+
+ public NetworkException(ErrorCode code, String message) {
+ super(code, message);
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/RemoteIllegalArgumentException.java b/java/client-apis/src/main/java/client/apis/exception/RemoteIllegalArgumentException.java
new file mode 100644
index 0000000..3d456d7
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/RemoteIllegalArgumentException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+public class RemoteIllegalArgumentException extends ClientException {
+ public RemoteIllegalArgumentException(ErrorCode code, String message, String requestId) {
+ super(code, message);
+ putMetadata(REQUEST_ID_KEY, requestId);
+ }
+}
\ No newline at end of file
diff --git a/java/client-apis/src/main/java/client/apis/exception/ResourceNotFoundException.java b/java/client-apis/src/main/java/client/apis/exception/ResourceNotFoundException.java
new file mode 100644
index 0000000..19d032a
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/ResourceNotFoundException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+public class ResourceNotFoundException extends ClientException {
+ public ResourceNotFoundException(ErrorCode code, String message, String requestId) {
+ super(code, message);
+ putMetadata(REQUEST_ID_KEY, requestId);
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/ResourceNotMatchException.java b/java/client-apis/src/main/java/client/apis/exception/ResourceNotMatchException.java
new file mode 100644
index 0000000..54aa66a
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/ResourceNotMatchException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+public class ResourceNotMatchException extends ClientException {
+ public ResourceNotMatchException(ErrorCode code, String message) {
+ super(code, message);
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/exception/TimeoutException.java b/java/client-apis/src/main/java/client/apis/exception/TimeoutException.java
new file mode 100644
index 0000000..6c16b1a
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/exception/TimeoutException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.exception;
+
+public class TimeoutException extends ClientException {
+ public TimeoutException(ErrorCode code, String message, Throwable cause) {
+ super(code, message, cause);
+ }
+
+ public TimeoutException(ErrorCode code, String message) {
+ super(code, message);
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/message/Message.java b/java/client-apis/src/main/java/client/apis/message/Message.java
new file mode 100644
index 0000000..7504de9
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/message/Message.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.message;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.rocketmq.apis.producer.Producer;
+
+/**
+ * Abstract message only used for {@link Producer}.
+ */
+public interface Message {
+ /**
+ * Get the topic of message, which is the first classifier for message.
+ *
+ * @return topic of message.
+ */
+ String getTopic();
+
+ /**
+ * Get the <strong>deep copy</strong> of message body.
+ *
+ * @return the <strong>deep copy</strong> of message body.
+ */
+ byte[] getBody();
+
+ /**
+ * Get the <strong>deep copy</strong> of message properties.
+ *
+ * @return the <strong>deep copy</strong> of message properties.
+ */
+ Map<String, String> getProperties();
+
+ /**
+ * Get the tag of message, which is the second classifier besides topic.
+ *
+ * @return the tag of message.
+ */
+ Optional<String> getTag();
+
+ /**
+ * Get the key collection of message.
+ *
+ * @return <strong>the key collection</strong> of message.
+ */
+ Collection<String> getKeys();
+
+ /**
+ * Get the message group, which make sense only when topic type is fifo.
+ *
+ * @return message group, which is optional.
+ */
+ Optional<String> getMessageGroup();
+
+ /**
+ * Get the expected delivery timestamp, which make sense only when topic type is delay.
+ *
+ * @return message expected delivery timestamp, which is optional.
+ */
+ Optional<Long> getDeliveryTimestamp();
+}
diff --git a/java/client-apis/src/main/java/client/apis/message/MessageBuilder.java b/java/client-apis/src/main/java/client/apis/message/MessageBuilder.java
new file mode 100644
index 0000000..4db9805
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/message/MessageBuilder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.message;
+
+import java.util.Collection;
+
+/**
+ * Builder to config {@link Message}.
+ */
+public interface MessageBuilder {
+ /**
+ * Set the topic for message.
+ *
+ * @param topic topic for the message.
+ * @return the message builder instance.
+ */
+ MessageBuilder setTopic(String topic);
+
+ /**
+ * Set the body for message.
+ *
+ * @param body body for the message.
+ * @return the message builder instance.
+ */
+ MessageBuilder setBody(byte[] body);
+
+ /**
+ * Set the tag for message.
+ *
+ * @param tag tag for the message.
+ * @return the message builder instance.
+ */
+ MessageBuilder setTag(String tag);
+
+ /**
+ * Set the key for message.
+ *
+ * @param key key for the message.
+ * @return the message builder instance.
+ */
+ MessageBuilder setKey(String key);
+
+ /**
+ * Set the key collection for message.
+ *
+ * @param keys key collection for the message.
+ * @return the message builder instance.
+ */
+ MessageBuilder setKeys(Collection<String> keys);
+
+ /**
+ * Set the group for message.
+ *
+ * @param messageGroup group for the message.
+ * @return the message builder instance.
+ */
+ MessageBuilder setMessageGroup(String messageGroup);
+
+ /**
+ * Add user property for message.
+ *
+ * @param key single property key.
+ * @param value single property value.
+ * @return the message builder instance.
+ */
+ MessageBuilder addProperty(String key, String value);
+
+ /**
+ * Finalize the build of the {@link Message} instance.
+ *
+ * <p>Unique {@link MessageId} is generated after message building.</p>
+ *
+ * @return the message instance.
+ */
+ Message build();
+}
diff --git a/java/client-apis/src/main/java/client/apis/message/MessageId.java b/java/client-apis/src/main/java/client/apis/message/MessageId.java
new file mode 100644
index 0000000..7b396ba
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/message/MessageId.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.message;
+
+/**
+ * Abstract message id, the implement must override {@link Object#toString()}, which indicates the message id using
+ * string form.
+ */
+public interface MessageId {
+ /**
+ * Get the version of message id.
+ *
+ * @return the version of message id.
+ */
+ MessageIdVersion getVersion();
+
+ /**
+ * The implementation <strong>must</strong> override this method, which indicates the message id using string form.
+ *
+ * @return string-formed string id.
+ */
+ String toString();
+}
diff --git a/java/client-apis/src/main/java/client/apis/message/MessageIdVersion.java b/java/client-apis/src/main/java/client/apis/message/MessageIdVersion.java
new file mode 100644
index 0000000..ab3fc57
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/message/MessageIdVersion.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.message;
+
+public enum MessageIdVersion {
+ /**
+ * V0 version, whose length is 32.
+ */
+ V0,
+ /**
+ * V1 version, whose length is 34 and begins with "01".
+ */
+ V1
+}
diff --git a/java/client-apis/src/main/java/client/apis/message/MessageView.java b/java/client-apis/src/main/java/client/apis/message/MessageView.java
new file mode 100644
index 0000000..3f576a4
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/message/MessageView.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.message;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.rocketmq.apis.MessageQueue;
+
+/**
+ * {@link MessageView} provides a read-only view for message, that's why setters do not exist here. In addition,
+ * it only makes sense when {@link Message} is sent successfully, or it could be considered as a return receipt
+ * for producer/consumer.
+ */
+public interface MessageView {
+ /**
+ * Get the unique id of message.
+ *
+ * @return unique id.
+ */
+ MessageId getMessageId();
+
+ /**
+ * Get the topic of message.
+ *
+ * @return topic of message.
+ */
+ String getTopic();
+
+ /**
+ * Get the <strong>deep copy</strong> of message body, which makes the modification of return value does not
+ * affect the message itself.
+ *
+ * @return the <strong>deep copy</strong> of message body.
+ */
+ byte[] getBody();
+
+ /**
+ * Get the <strong>deep copy</strong> of message properties, which makes the modification of return value does
+ * not affect the message itself.
+ *
+ * @return the <strong>deep copy</strong> of message properties.
+ */
+ Map<String, String> getProperties();
+
+ /**
+ * Get the tag of message, which is optional.
+ *
+ * @return the tag of message, which is optional.
+ */
+ Optional<String> getTag();
+
+ /**
+ * Get the key collection of message.
+ *
+ * @return <strong>the key collection</strong> of message.
+ */
+ Collection<String> getKeys();
+
+ /**
+ * Get the message group, which is optional and only make sense only when topic type is fifo.
+ *
+ * @return message group, which is optional.
+ */
+ Optional<String> getMessageGroup();
+
+ /**
+ * Get the expected delivery timestamp, which make sense only when topic type is delay.
+ *
+ * @return message expected delivery timestamp, which is optional.
+ */
+ Optional<Long> getDeliveryTimestamp();
+
+ /**
+ * Get the born host of message.
+ *
+ * @return born host of message.
+ */
+ String getBornHost();
+
+ /**
+ * Get the born timestamp of message.
+ *
+ * @return born timestamp of message.
+ */
+ long getBornTimestamp();
+
+ /**
+ * Get the delivery attempt for message.
+ *
+ * @return delivery attempt.
+ */
+ int getDeliveryAttempt();
+
+ /**
+ * Get the {@link MessageQueue} of message.
+ *
+ * @return message queue.
+ */
+ MessageQueue getMessageQueue();
+
+ /**
+ * Get the position of message in {@link MessageQueue}.
+ */
+ long getOffset();
+}
diff --git a/java/client-apis/src/main/java/client/apis/producer/Producer.java b/java/client-apis/src/main/java/client/apis/producer/Producer.java
new file mode 100644
index 0000000..a973635
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/producer/Producer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.producer;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.apis.exception.ClientException;
+import org.apache.rocketmq.apis.message.Message;
+
+/**
+ * Producer is a thread-safe rocketmq client which is used to publish messages.
+ *
+ * <p>On account of network timeout or other reasons, rocketmq producer only promised the at-least-once semantics.
+ * For producer, at-least-once semantics means potentially attempts are made at sending it, messages may be
+ * duplicated but not lost.
+ */
+public interface Producer extends Closeable {
+ /**
+ * Sends a message synchronously.
+ *
+ * <p>This method does not return until it gets the definitive result.
+ *
+ * @param message message to send.
+ */
+ SendReceipt send(Message message) throws ClientException;
+
+ /**
+ * Sends a transactional message synchronously.
+ *
+ * @param message message to send.
+ * @param transaction transaction to bind.
+ * @return the message id assigned to the appointed message.
+ */
+ SendReceipt send(Message message, Transaction transaction) throws ClientException;
+
+ /**
+ * Sends a message asynchronously.
+ *
+ * <p>This method returns immediately, the result is included in the {@link CompletableFuture};
+ *
+ * @param message message to send.
+ * @return a future that indicates the result.
+ */
+ CompletableFuture<SendReceipt> sendAsync(Message message);
+
+ /**
+ * Sends batch messages synchronously.
+ *
+ * <p>This method does not return until it gets the definitive result.
+ *
+ * <p>All messages to send should have the same topic.
+ *
+ * @param messages batch messages to send.
+ * @return collection indicates the message id assigned to the appointed message, which keep the same order
+ * messages collection.
+ */
+ List<SendReceipt> send(List<Message> messages) throws ClientException;
+
+ /**
+ * Begins a transaction.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Transaction transaction = producer.beginTransaction();
+ * SendReceipt receipt1 = producer.send(message1, transaction);
+ * SendReceipt receipt2 = producer.send(message2, transaction);
+ * transaction.commit();
+ * }</pre>
+ *
+ * @return a transaction entity to execute commit/rollback operation.
+ */
+ Transaction beginTransaction() throws ClientException;
+
+ /**
+ * Close the producer and release all related resources.
+ *
+ * <p>This method does not return until all related resource is released. Once producer is closed, <strong>it could
+ * not be started once again.</strong> we maintained an FSM (finite-state machine) to record the different states
+ * for each producer.
+ */
+ @Override
+ void close();
+}
diff --git a/java/client-apis/src/main/java/client/apis/producer/ProducerBuilder.java b/java/client-apis/src/main/java/client/apis/producer/ProducerBuilder.java
new file mode 100644
index 0000000..31a79db
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/producer/ProducerBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.producer;
+
+import org.apache.rocketmq.apis.ClientConfiguration;
+import org.apache.rocketmq.apis.exception.ClientException;
+import org.apache.rocketmq.apis.message.Message;
+import org.apache.rocketmq.apis.retry.BackoffRetryPolicy;
+
+/**
+ * Builder to config and start {@link Producer}.
+ */
+public interface ProducerBuilder {
+ /**
+ * Set the client configuration for producer.
+ *
+ * @param clientConfiguration client's configuration.
+ * @return the producer builder instance.
+ */
+ ProducerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);
+
+ /**
+ * Declare topics ahead of message sending/preparation.
+ *
+ * <p>Even though the declaration is not essential, we <strong>highly recommend</strong> to declare the topics in
+ * advance, which could help to discover potential mistakes.
+ *
+ * @param topics topics to send/prepare.
+ * @return the producer builder instance.
+ */
+ ProducerBuilder setTopics(String... topics);
+
+ /**
+ * Set the threads count for {@link Producer#sendAsync(Message)}.
+ *
+ * @return the producer builder instance.
+ */
+ ProducerBuilder setAsyncThreadCount(int count);
+
+ /**
+ * Set the retry policy to send message.
+ *
+ * @param retryPolicy policy to re-send message when failure encountered.
+ * @return the producer builder instance.
+ */
+ ProducerBuilder setRetryPolicy(BackoffRetryPolicy retryPolicy);
+
+ /**
+ * Set the transaction checker for producer.
+ *
+ * @param checker transaction checker.
+ * @return the produce builder instance.
+ */
+ ProducerBuilder setTransactionChecker(TransactionChecker checker);
+
+ /**
+ * Finalize the build of {@link Producer} instance and start.
+ *
+ * <p>The producer does a series of preparatory work during startup, which could help to identify more unexpected
+ * error earlier.
+ *
+ * <p>Especially, if this method is invoked more than once, different producer will be created and started.
+ *
+ * @return the producer instance.
+ */
+ Producer build() throws ClientException;
+}
diff --git a/java/client-apis/src/main/java/client/apis/producer/SendReceipt.java b/java/client-apis/src/main/java/client/apis/producer/SendReceipt.java
new file mode 100644
index 0000000..8340ab2
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/producer/SendReceipt.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.producer;
+
+import org.apache.rocketmq.apis.MessageQueue;
+import org.apache.rocketmq.apis.message.MessageId;
+
+public interface SendReceipt {
+ MessageId getMessageId();
+
+ MessageQueue getMessageQueue();
+
+ long getOffset();
+}
diff --git a/java/client-apis/src/main/java/client/apis/producer/Transaction.java b/java/client-apis/src/main/java/client/apis/producer/Transaction.java
new file mode 100644
index 0000000..e2df718
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/producer/Transaction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.producer;
+
+import org.apache.rocketmq.apis.exception.ClientException;
+
+/**
+ * An entity to describe an independent transaction.
+ *
+ * <p>Once request of commit of roll-back reached server, subsequent arrived commit or roll-back request in
+ * {@link Transaction} would be ignored by server.
+ *
+ * <p>If transaction is not commit/roll-back in time, it is suspended until it is solved by {@link TransactionChecker}
+ * or reach the end of life.
+ */
+public interface Transaction {
+ /**
+ * Try to commit the transaction, which would expose the message before the transaction is closed if no exception
+ * thrown.
+ *
+ * <p>What you should pay more attention is that commit may be successful even exception thrown.
+ */
+ void commit() throws ClientException;
+
+ /**
+ * Try to roll back the transaction, which would expose the message before the transaction is closed if no exception
+ * thrown.
+ *
+ * <p>What you should pay more attention is that roll-back may be successful even exception thrown.
+ */
+ void rollback() throws ClientException;
+}
\ No newline at end of file
diff --git a/java/client-apis/src/main/java/client/apis/producer/TransactionChecker.java b/java/client-apis/src/main/java/client/apis/producer/TransactionChecker.java
new file mode 100644
index 0000000..5050469
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/producer/TransactionChecker.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.producer;
+
+import org.apache.rocketmq.apis.message.MessageView;
+
+/**
+ * Used to determine {@link TransactionResolution} when {@link Transaction} is not committed or roll-backed in time.
+ * {@link Transaction#commit()} and {@link Transaction#rollback()} does not promise that it would be applied
+ * successfully, so that checker here is necessary.
+ *
+ * <p>If {@link TransactionChecker#check(MessageView)} returns {@link TransactionResolution#UNKNOWN} or exception
+ * raised during the invocation of {@link TransactionChecker#check(MessageView)}, the examination from server will be
+ * performed periodically.
+ */
+public interface TransactionChecker {
+ /**
+ * Server will solve the suspended transactional message by this method.
+ *
+ * <p>If exception was thrown in this method, which equals {@link TransactionResolution#UNKNOWN} is returned.
+ *
+ * @param messageView message to determine {@link TransactionResolution}.
+ * @return the transaction resolution.
+ */
+ TransactionResolution check(MessageView messageView);
+}
diff --git a/java/client-apis/src/main/java/client/apis/producer/TransactionResolution.java b/java/client-apis/src/main/java/client/apis/producer/TransactionResolution.java
new file mode 100644
index 0000000..afaa4ff
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/producer/TransactionResolution.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.producer;
+
+public enum TransactionResolution {
+ /**
+ * Notify server that current transaction should be committed.
+ */
+ COMMIT,
+ /**
+ * Notify server that current transaction should be roll-backed.
+ */
+ ROLLBACK,
+ /**
+ * Notify server that the state of this transaction is not sure. You should be cautions before return unknown
+ * because the examination from server will be performed periodically.
+ */
+ UNKNOWN;
+}
diff --git a/java/client-apis/src/main/java/client/apis/retry/BackOffRetryPolicyBuilder.java b/java/client-apis/src/main/java/client/apis/retry/BackOffRetryPolicyBuilder.java
new file mode 100644
index 0000000..8f2fd02
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/retry/BackOffRetryPolicyBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.retry;
+
+import java.time.Duration;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class BackOffRetryPolicyBuilder {
+ private int maxAttempts = 3;
+ private Duration initialBackoff = Duration.ofMillis(100);
+ private Duration maxBackoff = Duration.ofSeconds(1);
+ private int backoffMultiplier = 2;
+
+ public BackOffRetryPolicyBuilder() {
+ }
+
+ BackOffRetryPolicyBuilder setMaxAttempts(int maxAttempts) {
+ checkArgument(maxAttempts > 0, "maxAttempts must be positive");
+ this.maxAttempts = maxAttempts;
+ return this;
+ }
+
+ BackOffRetryPolicyBuilder setInitialBackoff(Duration initialBackoff) {
+ this.initialBackoff = checkNotNull(initialBackoff, "initialBackoff should not be null");
+ return this;
+ }
+
+ BackOffRetryPolicyBuilder setMaxBackoff(Duration maxBackoff) {
+ this.maxBackoff = checkNotNull(maxBackoff, "maxBackoff should not be null");
+ return this;
+ }
+
+ BackOffRetryPolicyBuilder setBackoffMultiplier(int backoffMultiplier) {
+ checkArgument(backoffMultiplier > 0, "backoffMultiplier must be positive");
+ this.backoffMultiplier = backoffMultiplier;
+ return this;
+ }
+
+ BackoffRetryPolicy build() {
+ return new BackoffRetryPolicy(maxAttempts, initialBackoff, maxBackoff, backoffMultiplier);
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/retry/BackoffRetryPolicy.java b/java/client-apis/src/main/java/client/apis/retry/BackoffRetryPolicy.java
new file mode 100644
index 0000000..2c5e0ff
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/retry/BackoffRetryPolicy.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.retry;
+
+import com.google.common.base.MoreObjects;
+import java.time.Duration;
+import java.util.Random;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The {@link BackoffRetryPolicy} defines a policy to do more attempts when failure is encountered, mainly refer to
+ * <a href="https://github.com/grpc/proposal/blob/master/A6-client-retries.md">gRPC Retry Design</a>.
+ */
+public class BackoffRetryPolicy implements RetryPolicy {
+ public static BackOffRetryPolicyBuilder newBuilder() {
+ return new BackOffRetryPolicyBuilder();
+ }
+
+ private final Random random;
+ private final int maxAttempts;
+ private final Duration initialBackoff;
+ private final Duration maxBackoff;
+ private final int backoffMultiplier;
+
+ public BackoffRetryPolicy(int maxAttempts, Duration initialBackoff, Duration maxBackoff, int backoffMultiplier) {
+ checkArgument(maxBackoff.compareTo(initialBackoff) <= 0, "initialBackoff should not be minor than maxBackoff");
+ checkArgument(maxAttempts > 0, "maxAttempts must be positive");
+ this.random = new Random();
+ this.maxAttempts = maxAttempts;
+ this.initialBackoff = checkNotNull(initialBackoff, "initialBackoff should not be null");
+ this.maxBackoff = maxBackoff;
+ this.backoffMultiplier = backoffMultiplier;
+ }
+
+ @Override
+ public int getMaxAttempts() {
+ return maxAttempts;
+ }
+
+ @Override
+ public Duration getNextAttemptDelay(int attempt) {
+ checkArgument(attempt > 0, "attempt must be positive");
+ int randomNumberBound = Math.min(initialBackoff.getNano() * (backoffMultiplier ^ (attempt - 1)),
+ maxBackoff.getNano());
+ return Duration.ofNanos(random.nextInt(randomNumberBound));
+ }
+
+ public Duration getInitialBackoff() {
+ return initialBackoff;
+ }
+
+ public Duration getMaxBackoff() {
+ return maxBackoff;
+ }
+
+ public int getBackoffMultiplier() {
+ return backoffMultiplier;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("maxAttempts", maxAttempts)
+ .add("initialBackoff", initialBackoff)
+ .add("maxBackoff", maxBackoff)
+ .add("backoffMultiplier", backoffMultiplier)
+ .toString();
+ }
+}
diff --git a/java/client-apis/src/main/java/client/apis/retry/RetryPolicy.java b/java/client-apis/src/main/java/client/apis/retry/RetryPolicy.java
new file mode 100644
index 0000000..c890dcf
--- /dev/null
+++ b/java/client-apis/src/main/java/client/apis/retry/RetryPolicy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client.apis.retry;
+
+import java.time.Duration;
+
+/**
+ * Internal interface for retry policy.
+ */
+interface RetryPolicy {
+ /**
+ * Get the max attempt times for retry.
+ *
+ * @return max attempt times.
+ */
+ int getMaxAttempts();
+
+ /**
+ * Get await time after current attempts, the attempt index starts at 1.
+ *
+ * @param attempt current attempt.
+ * @return await time.
+ */
+ Duration getNextAttemptDelay(int attempt);
+}
diff --git a/java/pom.xml b/java/client-java/pom.xml
similarity index 62%
copy from java/pom.xml
copy to java/client-java/pom.xml
index 049a500..a5be176 100644
--- a/java/pom.xml
+++ b/java/client-java/pom.xml
@@ -2,15 +2,14 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-client-java-parent</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>5.0.0-SNAPSHOT</version>
+ </parent>
<modelVersion>4.0.0</modelVersion>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client-java-parent</artifactId>
- <packaging>pom</packaging>
- <version>5.0.0-SNAPSHOT</version>
- <modules>
- <module>apis</module>
- </modules>
+ <artifactId>rocketmq-client-java</artifactId>
<properties>
<maven.compiler.release>8</maven.compiler.release>
diff --git a/java/pom.xml b/java/pom.xml
index 049a500..39886f4 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -9,7 +9,8 @@
<packaging>pom</packaging>
<version>5.0.0-SNAPSHOT</version>
<modules>
- <module>apis</module>
+ <module>client-apis</module>
+ <module>client-java</module>
</modules>
<properties>