You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/06/18 10:34:10 UTC
[rocketmq-clients] 01/01: Java: fix checkstyle
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit a40055ead61e0ec56ae1d2e0f2e9c56849d06f25
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Sat Jun 18 18:21:17 2022 +0800
Java: fix checkstyle
---
java/README.md | 48 ++-
.../rocketmq/client/apis/ClientConfiguration.java | 11 +-
.../client/apis/ClientConfigurationBuilder.java | 4 +-
.../rocketmq/client/apis/SessionCredentials.java | 4 +-
.../client/apis/consumer/FilterExpression.java | 7 +-
.../client/apis/consumer/MessageListener.java | 3 +-
.../client/apis/consumer/SimpleConsumer.java | 25 +-
.../apis/consumer/SimpleConsumerBuilder.java | 3 +-
java/client-shade/pom.xml | 18 +
java/{client-java => client}/pom.xml | 2 +-
.../client/java/ClientServiceProviderImpl.java | 0
.../org/apache/rocketmq/client/java/UserAgent.java | 5 +-
.../java/exception/ResourceNotFoundException.java | 0
.../client/java/hook/MessageHookPoints.java | 0
.../client/java/hook/MessageHookPointsStatus.java | 0
.../client/java/hook/MessageInterceptor.java | 0
.../apache/rocketmq/client/java/impl/Client.java | 0
.../rocketmq/client/java/impl/ClientImpl.java | 96 ++---
.../rocketmq/client/java/impl/ClientManager.java | 0
.../client/java/impl/ClientManagerImpl.java | 4 +-
.../client/java/impl/ClientManagerRegistry.java | 0
.../rocketmq/client/java/impl/ClientSettings.java | 0
.../rocketmq/client/java/impl/ClientType.java | 0
.../client/java/impl/TelemetrySession.java | 44 ++-
.../client/java/impl/consumer/Assignment.java | 0
.../client/java/impl/consumer/Assignments.java | 0
.../client/java/impl/consumer/ConsumeService.java | 4 +-
.../client/java/impl/consumer/ConsumeTask.java | 7 +-
.../client/java/impl/consumer/ConsumerImpl.java | 36 +-
.../java/impl/consumer/FifoConsumeService.java | 7 +-
.../client/java/impl/consumer/ProcessQueue.java | 0
.../java/impl/consumer/ProcessQueueImpl.java | 105 ++++--
.../impl/consumer/PushConsumerBuilderImpl.java | 9 +-
.../java/impl/consumer/PushConsumerImpl.java | 55 +--
.../java/impl/consumer/PushConsumerSettings.java | 19 +-
.../java/impl/consumer/ReceiveMessageResult.java | 0
.../impl/consumer/SimpleConsumerBuilderImpl.java | 13 +-
.../java/impl/consumer/SimpleConsumerImpl.java | 50 ++-
.../java/impl/consumer/SimpleConsumerSettings.java | 16 +-
.../java/impl/consumer/StandardConsumeService.java | 7 +-
.../consumer/SubscriptionTopicRouteDataResult.java | 3 +-
.../java/impl/producer/ProducerBuilderImpl.java | 9 +-
.../client/java/impl/producer/ProducerImpl.java | 99 ++++--
.../java/impl/producer/ProducerSettings.java | 14 +-
.../producer/PublishingTopicRouteDataResult.java | 0
.../client/java/impl/producer/SendReceiptImpl.java | 4 +-
.../client/java/impl/producer/TransactionImpl.java | 24 +-
.../client/java/logging/CustomConsoleAppender.java | 3 +-
.../client/java/logging/ProcessIdConverter.java | 0
.../client/java/message/MessageBuilderImpl.java | 6 +-
.../client/java/message/MessageCommon.java | 6 +-
.../client/java/message/MessageIdCodec.java | 5 +-
.../client/java/message/MessageIdImpl.java | 0
.../rocketmq/client/java/message/MessageImpl.java | 14 +-
.../rocketmq/client/java/message/MessageType.java | 0
.../client/java/message/MessageViewImpl.java | 31 +-
.../client/java/message/PublishingMessageImpl.java | 6 +-
.../client/java/message/protocol/Encoding.java | 0
.../client/java/message/protocol/Resource.java | 0
.../client/java/metrics/MessageCacheObserver.java | 0
.../rocketmq/client/java/metrics/MessageMeter.java | 55 ++-
.../rocketmq/client/java/metrics/Metric.java | 0
.../java/metrics/MetricMessageInterceptor.java | 10 +-
.../rocketmq/client/java/metrics/MetricName.java | 0
.../client/java/metrics/RocketmqAttributes.java | 9 +-
.../rocketmq/client/java/misc/Dispatcher.java | 4 +-
.../client/java/misc/ExecutorServices.java | 0
.../rocketmq/client/java/misc/LinkedElement.java | 0
.../rocketmq/client/java/misc/LinkedIterator.java | 0
.../rocketmq/client/java/misc/MetadataUtils.java | 0
.../client/java/misc/RequestIdGenerator.java | 0
.../client/java/misc/ThreadFactoryImpl.java | 0
.../rocketmq/client/java/misc/Utilities.java | 6 +-
.../java/retry/CustomizedBackoffRetryPolicy.java | 10 +-
.../java/retry/ExponentialBackoffRetryPolicy.java | 12 +-
.../rocketmq/client/java/retry/RetryPolicy.java | 0
.../apache/rocketmq/client/java/route/Address.java | 0
.../rocketmq/client/java/route/AddressScheme.java | 0
.../apache/rocketmq/client/java/route/Broker.java | 3 +-
.../rocketmq/client/java/route/Endpoints.java | 7 +-
.../client/java/route/MessageQueueImpl.java | 0
.../rocketmq/client/java/route/Permission.java | 0
.../rocketmq/client/java/route/TopicRouteData.java | 3 +-
.../client/java/route/TopicRouteDataResult.java | 4 +-
.../rocketmq/client/java/rpc/AuthInterceptor.java | 4 +-
.../client/java/rpc/IpNameResolverFactory.java | 0
.../client/java/rpc/LoggingInterceptor.java | 16 +-
.../apache/rocketmq/client/java/rpc/RpcClient.java | 0
.../rocketmq/client/java/rpc/RpcClientImpl.java | 2 +-
.../apache/rocketmq/client/java/rpc/Signature.java | 0
.../apache/rocketmq/client/java/rpc/TLSHelper.java | 0
.../rocketmq.metadata.properties | 0
...ache.rocketmq.client.apis.ClientServiceProvider | 0
.../src/main/resources/logback.xml | 0
.../client/java/ClientServiceProviderImplTest.java | 10 +-
.../client/java/impl/ClientManagerImplTest.java | 0
.../java/impl/consumer/ProcessQueueImplTest.java | 51 +--
.../impl/consumer/PushConsumerBuilderImplTest.java | 3 +-
.../java/impl/consumer/PushConsumerImplTest.java | 60 ++--
.../impl/consumer/SimpleConsumerBuilderTest.java | 3 +-
.../java/impl/consumer/SimpleConsumerImplTest.java | 76 ++--
.../impl/consumer/StandardConsumeServiceTest.java | 15 +-
.../impl/producer/ProducerBuilderImplTest.java | 5 +-
.../java/impl/producer/ProducerImplTest.java | 93 +++--
.../java/impl/producer/TransactionImplTest.java | 32 +-
.../client/java/message/MessageIdCodecTest.java | 5 +-
.../client/java/message/MessageImplTest.java | 22 +-
.../retry/CustomizedBackoffRetryPolicyTest.java | 6 +-
.../rocketmq/client/java/route/EndpointsTest.java | 6 +-
.../apache/rocketmq/client/java/tool/TestBase.java | 36 +-
.../org.mockito.plugins.MockMaker | 0
java/pom.xml | 35 +-
java/style/checkstyle.xml | 385 +++++++++++++++++++++
java/style/intellij-codestyle.xml | 67 ++++
java/style/spotbugs-suppressions.xml | 12 +
115 files changed, 1363 insertions(+), 530 deletions(-)
diff --git a/java/README.md b/java/README.md
index 2a91aa5..dc36bc9 100644
--- a/java/README.md
+++ b/java/README.md
@@ -1 +1,47 @@
-## RocketMQ Clients for Java
+# RocketMQ Clients for Java
+
+[![Java](https://github.com/apache/rocketmq-clients/actions/workflows/java_build.yml/badge.svg)](https://github.com/apache/rocketmq-clients/actions/workflows/java_build.yml)
+
+The java implementation of client for [Apache RocketMQ](https://rocketmq.apache.org/).
+
+## Prerequisites
+
+Java 11 or higher is required to build this project. The built artifacts can be used on Java 8 or
+higher.
+
+<table>
+ <tr>
+ <td><b>Build required:</b></td>
+ <td><b>Java 11 or later</b></td>
+ </tr>
+ <tr>
+ <td><b>Runtime required:</b></td>
+ <td><b>Java 8 or later</b></td>
+ </tr>
+</table>
+
+## Getting Started
+
+Add dependency to your `pom.xml`, and replace the `${rocketmq.version}` by the latest version.
+
+```xml
+<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client-java</artifactId>
+ <version>${rocketmq.version}</version>
+</dependency>
+```
+
+It is worth noting that `rocketmq-client-java` is a shaded jar, which means you could not substitute its dependencies.
+From the perspective of avoiding dependency conflicts, you may need a shaded client in most case, but we also provided
+the no-shaded client.
+
+```xml
+<dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client-java-noshade</artifactId>
+ <version>${rocketmq.version}</version>
+</dependency>
+```
+
+
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
index 866b0f4..c7006e4 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
@@ -28,20 +28,21 @@ public class ClientConfiguration {
private final SessionCredentialsProvider sessionCredentialsProvider;
private final Duration requestTimeout;
- public static ClientConfigurationBuilder newBuilder() {
- return new ClientConfigurationBuilder();
- }
-
/**
* The caller is supposed to have validated the arguments and handled throwing exception or
* logging warnings already, so we avoid repeating args check here.
*/
- ClientConfiguration(String accessPoint, SessionCredentialsProvider sessionCredentialsProvider, Duration requestTimeout) {
+ ClientConfiguration(String accessPoint, SessionCredentialsProvider sessionCredentialsProvider,
+ Duration requestTimeout) {
this.accessPoint = accessPoint;
this.sessionCredentialsProvider = sessionCredentialsProvider;
this.requestTimeout = requestTimeout;
}
+ public static ClientConfigurationBuilder newBuilder() {
+ return new ClientConfigurationBuilder();
+ }
+
public String getAccessPoint() {
return accessPoint;
}
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index ca67644..672826e 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -17,10 +17,10 @@
package org.apache.rocketmq.client.apis;
-import java.time.Duration;
-
import static com.google.common.base.Preconditions.checkNotNull;
+import java.time.Duration;
+
/**
* Builder to set {@link ClientConfiguration}.
*/
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/SessionCredentials.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/SessionCredentials.java
index e16decd..5328eab 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/SessionCredentials.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/SessionCredentials.java
@@ -17,10 +17,10 @@
package org.apache.rocketmq.client.apis;
-import java.util.Optional;
-
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Optional;
+
/**
* Session credentials used in service authentications.
*/
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java
index 2f21cd2..dcda10e 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java
@@ -15,13 +15,12 @@
* limitations under the License.
*/
-
package org.apache.rocketmq.client.apis.consumer;
-import java.util.Objects;
-
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.base.Objects;
+
/**
* Filter expression is an efficient way to filter message for {@link SimpleConsumer} and {@link PushConsumer}.
* Consumer who applied the filter expression only can receive the filtered messages.
@@ -68,6 +67,6 @@ public class FilterExpression {
@Override
public int hashCode() {
- return Objects.hash(expression, filterExpressionType);
+ return Objects.hashCode(expression, filterExpressionType);
}
}
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java
index a26b741..66e4d43 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java
@@ -23,7 +23,8 @@ import org.apache.rocketmq.client.apis.message.MessageView;
* MessageListener is used only for push consumer to process message consumption synchronously.
*
* <p> Refer to {@link PushConsumer}, push consumer will get message from server
- * and dispatch the message to backend thread pool which control by parameter threadCount to consumer message concurrently.
+ * and dispatch the message to backend thread pool which control by parameter threadCount to consumer message
+ * concurrently.
*/
public interface MessageListener {
/**
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
index 3773008..90ab988 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
@@ -41,9 +41,10 @@ import org.apache.rocketmq.client.apis.message.MessageView;
* should be set in this case.
*
* <p> Simple consumer divide message consumption to 3 parts.
- * Firstly, call receive api get messages from server; Then process message by yourself; At last, your must call Ack api to commit this message.
- * If there is error when process message ,your can reconsume the message later which control by the invisibleDuration parameter.
- * Also, you can change the invisibleDuration by call changeInvisibleDuration api.
+ * Firstly, call receive api get messages from server; Then process message by yourself; At last, your must call Ack api
+ * to commit this message.
+ * If there is error when process message ,your can reconsume the message later which control by the invisibleDuration
+ * parameter. Also, you can change the invisibleDuration by call changeInvisibleDuration api.
*/
public interface SimpleConsumer extends Closeable {
/**
@@ -92,7 +93,8 @@ public interface SimpleConsumer extends Closeable {
* Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned.
*
* @param maxMessageNum max message num when server returns.
- * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout.
+ * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be
+ * invisible to other consumer unless timout.
* @return list of messageView
*/
List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException;
@@ -103,7 +105,8 @@ public interface SimpleConsumer extends Closeable {
* Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned.
*
* @param maxMessageNum max message num when server returns.
- * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout.
+ * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be
+ * invisible to other consumer unless timout.
* @return list of messageView
*/
CompletableFuture<List<MessageView>> receiveAsync(int maxMessageNum, Duration invisibleDuration);
@@ -133,8 +136,10 @@ public interface SimpleConsumer extends Closeable {
* <p> The origin invisible duration for a message decide by ack request.
*
* <p>You must call change request before the origin invisible duration timeout.
- * If called change request later than the origin invisible duration, this request does not take effect and throw exception.
- * Duplicate change request will refresh the next visible time of this message to other consumers.
+ * If called change request later than the origin invisible duration, this request does not take effect and throw
+ * exception.
+ *
+ * <p>Duplicate change request will refresh the next visible time of this message to other consumers.
*
* @param messageView special messageView with handle want to change.
* @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time.
@@ -147,8 +152,10 @@ public interface SimpleConsumer extends Closeable {
* <p> The origin invisible duration for a message decide by ack request.
*
* <p> You must call change request before the origin invisible duration timeout.
- * If called change request later than the origin invisible duration, this request does not take effect and throw exception.
- * Duplicate change request will refresh the next visible time of this message to other consumers.
+ * If called change request later than the origin invisible duration, this request does not take effect and throw
+ * exception.
+ *
+ * <p>Duplicate change request will refresh the next visible time of this message to other consumers.
*
* @param messageView special messageView with handle want to change.
* @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time.
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
index d2c6134..49a2804 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
@@ -49,7 +49,8 @@ public interface SimpleConsumerBuilder {
/**
* Set the max await time when receive message from server.
- * The simple consumer will hold this long-polling receive requests until a message is returned or a timeout occurs.
+ * The simple consumer will hold this long-polling receive requests until a message is returned or a timeout
+ * occurs.
*
* @param awaitDuration The maximum time to block when no message available.
* @return the consumer builder instance.
diff --git a/java/client-shade/pom.xml b/java/client-shade/pom.xml
new file mode 100644
index 0000000..a5be176
--- /dev/null
+++ b/java/client-shade/pom.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-client-java-parent</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>5.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-client-java</artifactId>
+
+ <properties>
+ <maven.compiler.release>8</maven.compiler.release>
+ </properties>
+
+</project>
\ No newline at end of file
diff --git a/java/client-java/pom.xml b/java/client/pom.xml
similarity index 97%
rename from java/client-java/pom.xml
rename to java/client/pom.xml
index b7b5e4b..3337ced 100644
--- a/java/client-java/pom.xml
+++ b/java/client/pom.xml
@@ -9,7 +9,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>rocketmq-client-java</artifactId>
+ <artifactId>rocketmq-client-java-noshade</artifactId>
<properties>
<maven.compiler.release>8</maven.compiler.release>
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/ClientServiceProviderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/ClientServiceProviderImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/ClientServiceProviderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/ClientServiceProviderImpl.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/UserAgent.java b/java/client/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
similarity index 94%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
index bdd7754..138f49c 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
@@ -23,6 +23,9 @@ import org.apache.rocketmq.client.java.misc.MetadataUtils;
import org.apache.rocketmq.client.java.misc.Utilities;
public class UserAgent {
+ public static final UserAgent INSTANCE = new UserAgent(MetadataUtils.getVersion(), Utilities.getOsDescription(),
+ Utilities.hostName());
+
private final String version;
private final String platform;
private final String hostName;
@@ -33,8 +36,6 @@ public class UserAgent {
this.hostName = hostName;
}
- public static final UserAgent INSTANCE = new UserAgent(MetadataUtils.getVersion(), Utilities.getOsDescription(), Utilities.hostName());
-
public UA toProtoBuf() {
return UA.newBuilder()
.setLanguage(Language.JAVA)
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPoints.java b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPoints.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPoints.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPoints.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPointsStatus.java b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPointsStatus.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPointsStatus.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPointsStatus.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/Client.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
similarity index 94%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 8bd2ad0..b7b45fa 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.client.java.impl;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
@@ -39,8 +41,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import io.grpc.Metadata;
import java.io.UnsupportedEncodingException;
import java.security.InvalidKeyException;
@@ -84,8 +84,8 @@ import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.TopicRouteData;
import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
import org.apache.rocketmq.client.java.rpc.Signature;
-
-import static com.google.common.base.Preconditions.checkNotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
public abstract class ClientImpl extends AbstractIdleService implements Client, MessageInterceptor {
@@ -95,10 +95,18 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
protected volatile ClientManager clientManager;
protected final ClientConfiguration clientConfiguration;
protected final Endpoints accessEndpoints;
+ protected final Set<String> topics;
+ // Thread-safe set.
+ protected final Set<Endpoints> isolated;
+ protected final ExecutorService clientCallbackExecutor;
+ protected final MessageMeter messageMeter;
+ /**
+ * Telemetry command executor, which is aims to execute commands from remote.
+ */
+ protected final ThreadPoolExecutor telemetryCommandExecutor;
+ protected final String clientId;
private volatile ScheduledFuture<?> updateRouteCacheFuture;
-
- protected final Set<String> topics;
private final ConcurrentMap<String, TopicRouteDataResult> topicRouteResultCache;
@GuardedBy("inflightRouteFutureLock")
@@ -109,24 +117,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
private final ConcurrentMap<Endpoints, TelemetrySession> telemetrySessionTable;
private final ReadWriteLock telemetrySessionsLock;
- // Thread-safe set.
- protected final Set<Endpoints> isolated;
-
@GuardedBy("messageInterceptorsLock")
private final List<MessageInterceptor> messageInterceptors;
private final ReadWriteLock messageInterceptorsLock;
- protected final ExecutorService clientCallbackExecutor;
-
- protected final MessageMeter messageMeter;
-
- /**
- * Telemetry command executor, which is aims to execute commands from remote.
- */
- protected final ThreadPoolExecutor telemetryCommandExecutor;
-
- protected final String clientId;
-
public ClientImpl(ClientConfiguration clientConfiguration, Set<String> topics) {
this.clientConfiguration = checkNotNull(clientConfiguration, "clientConfiguration should not be null");
final String accessPoint = clientConfiguration.getAccessPoint();
@@ -181,16 +175,19 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
// Register client after client id generation.
this.clientManager = ClientManagerRegistry.getInstance().registerClient(this);
// Fetch topic route from remote.
- LOGGER.info("Begin to fetch topic(s) route data from remote during client startup, clientId={}, topics={}", clientId, topics);
+ LOGGER.info("Begin to fetch topic(s) route data from remote during client startup, clientId={}, topics={}",
+ clientId, topics);
// Aggregate all topic route data futures into a composited future.
final List<ListenableFuture<TopicRouteDataResult>> futures = topics.stream()
.map(this::getRouteDataResult)
.collect(Collectors.toList());
List<TopicRouteDataResult> results;
try {
- results = Futures.allAsList(futures).get(TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP.toNanos(), TimeUnit.NANOSECONDS);
+ results = Futures.allAsList(futures).get(TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP.toNanos(),
+ TimeUnit.NANOSECONDS);
} catch (Throwable t) {
- LOGGER.error("Failed to get topic route data result from remote during client startup, clientId={}, topics={}", clientId, topics, t);
+ LOGGER.error("Failed to get topic route data result from remote during client startup, clientId={}, "
+ + "topics={}", clientId, topics, t);
throw new ResourceNotFoundException(t);
}
// Find any topic whose topic route data is failed to fetch from remote.
@@ -203,7 +200,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
final Status status = result.getStatus();
throw new ClientException(status.getCode().getNumber(), status.getMessage());
}
- LOGGER.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}", clientId, topics);
+ LOGGER.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}",
+ clientId, topics);
// Update route cache periodically.
final ScheduledExecutorService scheduler = clientManager.getScheduler();
this.updateRouteCacheFuture = scheduler.scheduleWithFixedDelay(() -> {
@@ -260,7 +258,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
try {
interceptor.doBefore(hookPoint, messageCommons);
} catch (Throwable t) {
- LOGGER.warn("Exception raised while intercepting message, hookPoint={}, clientId={}", hookPoint, clientId);
+ LOGGER.warn("Exception raised while intercepting message, hookPoint={}, clientId={}", hookPoint,
+ clientId);
}
}
} finally {
@@ -277,7 +276,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
try {
interceptor.doAfter(hookPoints, messageCommons, duration, status);
} catch (Throwable t) {
- LOGGER.warn("Exception raised while intercepting message, hookPoint={}, clientId={}", hookPoints, clientId);
+ LOGGER.warn("Exception raised while intercepting message, hookPoint={}, clientId={}", hookPoints,
+ clientId);
}
}
} finally {
@@ -305,13 +305,15 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
.build();
telemeter(endpoints, telemetryCommand);
} catch (Throwable t) {
- LOGGER.error("Failed to send thread stack trace to remote, endpoints={}, nonce={}, clientId={}", endpoints, nonce, clientId, t);
+ LOGGER.error("Failed to send thread stack trace to remote, endpoints={}, nonce={}, clientId={}",
+ endpoints, nonce, clientId, t);
}
};
try {
telemetryCommandExecutor.submit(task);
} catch (Throwable t) {
- LOGGER.error("[Bug] Exception raised while submitting task to print thread stack trace, endpoints={}, nonce={}, clientId={}", endpoints, nonce, clientId, t);
+ LOGGER.error("[Bug] Exception raised while submitting task to print thread stack trace, endpoints={}, "
+ + "nonce={}, clientId={}", endpoints, nonce, clientId, t);
}
}
@@ -342,7 +344,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
try {
telemeter(endpoints, command);
} catch (Throwable t) {
- LOGGER.error("Failed to telemeter settings to remote, clientId={}, endpoints={}", clientId, endpoints, t);
+ LOGGER.error("Failed to telemeter settings, clientId={}, endpoints={}", clientId, endpoints, t);
}
}
}
@@ -432,14 +434,16 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
Futures.addCallback(future, new FutureCallback<List<TelemetrySession>>() {
@Override
public void onSuccess(List<TelemetrySession> sessions) {
- LOGGER.info("Register session successfully, current route will be cached, topic={}, topicRouteDataResult={}", topic, topicRouteDataResult);
+ LOGGER.info("Register session successfully, current route will be cached, topic={}, "
+ + "topicRouteDataResult={}", topic, topicRouteDataResult);
final TopicRouteDataResult old = topicRouteResultCache.put(topic, topicRouteDataResult);
if (topicRouteDataResult.equals(old)) {
// Log if topic route result remains the same.
LOGGER.info("Topic route result remains the same, topic={}, clientId={}", topic, clientId);
} else {
// Log if topic route result is updated.
- LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId, old, topicRouteDataResult);
+ LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId,
+ old, topicRouteDataResult);
}
onTopicRouteDataResultUpdate0(topic, topicRouteDataResult);
future0.set(null);
@@ -448,7 +452,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
@Override
public void onFailure(Throwable t) {
// Note: Topic route would not be updated if failed to register session.
- LOGGER.error("Failed to register session, current route will NOT be cached, topic={}, topicRouteDataResult={}", topic, topicRouteDataResult);
+ LOGGER.error("Failed to register session, current route will NOT be cached, topic={}, "
+ + "topicRouteDataResult={}", topic, topicRouteDataResult);
future0.setException(t);
}
}, MoreExecutors.directExecutor());
@@ -465,7 +470,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
* @param command request of message consume verification from remote.
*/
public void onVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command) {
- LOGGER.warn("Ignore verify message command from remote, which is not expected, clientId={}, command={}", clientId, command);
+ LOGGER.warn("Ignore verify message command from remote, which is not expected, clientId={}, command={}",
+ clientId, command);
final String nonce = command.getNonce();
final Status status = Status.newBuilder().setCode(Code.NOT_IMPLEMENTED).build();
VerifyMessageResult verifyMessageResult = VerifyMessageResult.newBuilder().setNonce(nonce).build();
@@ -487,14 +493,16 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
* @param command request of orphaned transaction recovery from remote.
*/
public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand command) {
- LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, client id={}, command={}", clientId, command);
+ LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, client id={}, "
+ + "command={}", clientId, command);
}
private void updateRouteCache() {
LOGGER.info("Start to update route cache for a new round, clientId={}", clientId);
topicRouteResultCache.keySet().forEach(topic -> {
// Set timeout for future on purpose.
- final ListenableFuture<TopicRouteDataResult> future = Futures.withTimeout(fetchTopicRoute(topic), TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP, getScheduler());
+ final ListenableFuture<TopicRouteDataResult> future = Futures.withTimeout(fetchTopicRoute(topic),
+ TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP, getScheduler());
Futures.addCallback(future, new FutureCallback<TopicRouteDataResult>() {
@Override
public void onSuccess(TopicRouteDataResult topicRouteDataResult) {
@@ -503,7 +511,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Failed to fetch topic route for update cache, topic={}, clientId={}", topic, clientId, t);
+ LOGGER.error("Failed to fetch topic route for update cache, topic={}, clientId={}", topic,
+ clientId, t);
}
}, MoreExecutors.directExecutor());
});
@@ -583,7 +592,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
LOGGER.info("Send heartbeat successfully, endpoints={}, clientId={}", endpoints, clientId);
final boolean removed = isolated.remove(endpoints);
if (removed) {
- LOGGER.info("Rejoin endpoints which is isolated before, clientId={}, endpoints={}", clientId, endpoints);
+ LOGGER.info("Rejoin endpoints which is isolated before, clientId={}, endpoints={}", clientId,
+ endpoints);
}
}
@@ -593,7 +603,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
}
}, MoreExecutors.directExecutor());
} catch (Throwable e) {
- LOGGER.error("Exception raised while preparing heartbeat, endpoints={}, clientId={}", endpoints, clientId, e);
+ LOGGER.error("Exception raised while preparing heartbeat, endpoints={}, clientId={}", endpoints, clientId,
+ e);
}
}
@@ -679,10 +690,12 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
Futures.whenAllSucceed(updateFuture).run(() -> {
inflightRouteFutureLock.lock();
try {
- final Set<SettableFuture<TopicRouteDataResult>> newFutureSet = inflightRouteFutureTable.remove(topic);
+ final Set<SettableFuture<TopicRouteDataResult>> newFutureSet =
+ inflightRouteFutureTable.remove(topic);
if (null == newFutureSet) {
// Should never reach here.
- LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic, clientId);
+ LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic,
+ clientId);
return;
}
LOGGER.debug("Fetch topic route successfully, topic={}, in-flight route future "
@@ -692,7 +705,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
}
} catch (Throwable t) {
// Should never reach here.
- LOGGER.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic, clientId, t);
+ LOGGER.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic,
+ clientId, t);
} finally {
inflightRouteFutureLock.unlock();
}
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index cfad8ad..ddbbbf9 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -42,8 +42,6 @@ import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
@@ -69,6 +67,8 @@ import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.rpc.RpcClient;
import org.apache.rocketmq.client.java.rpc.RpcClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @see ClientManager
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
similarity index 88%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
index 90b02f3..bb51870 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
@@ -27,8 +27,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
import java.io.UnsupportedEncodingException;
@@ -37,6 +35,8 @@ import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Telemetry session is constructed before first communication between client and remote route endpoints.
@@ -50,17 +50,17 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
private final Endpoints endpoints;
private volatile StreamObserver<TelemetryCommand> requestObserver;
- public static ListenableFuture<TelemetrySession> register(ClientImpl client, ClientManager clientManager,
- Endpoints endpoints) {
- return new TelemetrySession(client, clientManager, endpoints).register();
- }
-
private TelemetrySession(ClientImpl client, ClientManager clientManager, Endpoints endpoints) {
this.client = client;
this.clientManager = clientManager;
this.endpoints = endpoints;
}
+ public static ListenableFuture<TelemetrySession> register(ClientImpl client, ClientManager clientManager,
+ Endpoints endpoints) {
+ return new TelemetrySession(client, clientManager, endpoints).register();
+ }
+
private ListenableFuture<TelemetrySession> register() {
ListenableFuture<TelemetrySession> future;
try {
@@ -69,7 +69,8 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
final Settings settings = clientSettings.toProtobuf();
final TelemetryCommand settingsCommand = TelemetryCommand.newBuilder().setSettings(settings).build();
this.telemeter(settingsCommand);
- future = Futures.transform(clientSettings.getArrivedFuture(), input -> this, MoreExecutors.directExecutor());
+ future = Futures.transform(clientSettings.getArrivedFuture(), input -> this,
+ MoreExecutors.directExecutor());
} catch (Throwable t) {
SettableFuture<TelemetrySession> future0 = SettableFuture.create();
future0.setException(t);
@@ -78,12 +79,14 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
Futures.addCallback(future, new FutureCallback<TelemetrySession>() {
@Override
public void onSuccess(TelemetrySession session) {
- LOGGER.info("Register telemetry session successfully, endpoints={}, clientId={}", endpoints, client.getClientId());
+ LOGGER.info("Register telemetry session successfully, endpoints={}, clientId={}", endpoints,
+ client.getClientId());
}
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Failed to register telemetry session, endpoints={}, clientId={}", endpoints, client.getClientId(), t);
+ LOGGER.error("Failed to register telemetry session, endpoints={}, clientId={}", endpoints,
+ client.getClientId(), t);
release();
}
}, MoreExecutors.directExecutor());
@@ -106,7 +109,8 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
/**
* Initialize telemetry session.
*/
- private synchronized void init() throws UnsupportedEncodingException, NoSuchAlgorithmException, InvalidKeyException, ClientException {
+ private synchronized void init() throws UnsupportedEncodingException, NoSuchAlgorithmException,
+ InvalidKeyException, ClientException {
this.release();
final Metadata metadata = client.sign();
this.requestObserver = clientManager.telemetry(endpoints, metadata, Duration.ofNanos(Long.MAX_VALUE), this);
@@ -141,35 +145,41 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
switch (command.getCommandCase()) {
case SETTINGS: {
final Settings settings = command.getSettings();
- LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints, client.getClientId());
+ LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints,
+ client.getClientId());
client.onSettingsCommand(endpoints, settings);
break;
}
case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
final RecoverOrphanedTransactionCommand recoverOrphanedTransactionCommand =
command.getRecoverOrphanedTransactionCommand();
- LOGGER.info("Receive orphaned transaction recovery command from remote, endpoints={}, clientId={}", endpoints, client.getClientId());
+ LOGGER.info("Receive orphaned transaction recovery command from remote, endpoints={}, "
+ + "clientId={}", endpoints, client.getClientId());
client.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand);
break;
}
case VERIFY_MESSAGE_COMMAND: {
final VerifyMessageCommand verifyMessageCommand = command.getVerifyMessageCommand();
- LOGGER.info("Receive message verification command from remote, endpoints={}, clientId={}", client.getClientId());
+ LOGGER.info("Receive message verification command from remote, endpoints={}, clientId={}",
+ client.getClientId());
client.onVerifyMessageCommand(endpoints, verifyMessageCommand);
break;
}
case PRINT_THREAD_STACK_TRACE_COMMAND: {
final PrintThreadStackTraceCommand printThreadStackTraceCommand =
command.getPrintThreadStackTraceCommand();
- LOGGER.info("Receive thread stack print command from remote, endpoints={}, clientId={}", endpoints, client.getClientId());
+ LOGGER.info("Receive thread stack print command from remote, endpoints={}, clientId={}",
+ endpoints, client.getClientId());
client.onPrintThreadStackCommand(endpoints, printThreadStackTraceCommand);
break;
}
default:
- LOGGER.warn("Receive unrecognized command from remote, endpoints={}, command={}, clientId={}", endpoints, command, client.getClientId());
+ LOGGER.warn("Receive unrecognized command from remote, endpoints={}, command={}, clientId={}",
+ endpoints, command, client.getClientId());
}
} catch (Throwable t) {
- LOGGER.error("[Bug] unexpected exception raised while receiving command from remote, command={}, clientId={}", command, client.getClientId());
+ LOGGER.error("[Bug] unexpected exception raised while receiving command from remote, command={}, "
+ + "clientId={}", command, client.getClientId());
}
}
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignment.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignment.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignment.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignment.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignments.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignments.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignments.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignments.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
index b045548..793b425 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
@@ -23,8 +23,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
@@ -36,6 +34,8 @@ import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.Dispatcher;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings("NullableProblems")
public abstract class ConsumeService extends Dispatcher {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
index b68556b..27fbc36 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
@@ -18,8 +18,6 @@
package org.apache.rocketmq.client.java.impl.consumer;
import com.google.common.base.Stopwatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@@ -31,6 +29,8 @@ import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageCommon;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ConsumeTask implements Callable<ConsumeResult> {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumeTask.class);
@@ -67,7 +67,8 @@ public class ConsumeTask implements Callable<ConsumeResult> {
consumeResult = ConsumeResult.FAILURE;
}
final Duration duration = stopwatch.elapsed();
- MessageHookPointsStatus status = ConsumeResult.SUCCESS.equals(consumeResult) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+ MessageHookPointsStatus status = ConsumeResult.SUCCESS.equals(consumeResult) ? MessageHookPointsStatus.OK :
+ MessageHookPointsStatus.ERROR;
messageInterceptor.doAfter(MessageHookPoints.CONSUME, messageCommons, duration, status);
// Make sure that the return value is the subset of messageViews.
return consumeResult;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
similarity index 91%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index b44b112..f674d01 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -39,8 +39,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Durations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import io.grpc.Metadata;
import java.time.Duration;
import java.util.ArrayList;
@@ -59,6 +57,8 @@ import org.apache.rocketmq.client.java.message.MessageCommon;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
abstract class ConsumerImpl extends ClientImpl {
@@ -79,7 +79,8 @@ abstract class ConsumerImpl extends ClientImpl {
try {
Metadata metadata = sign();
final Endpoints endpoints = mq.getBroker().getEndpoints();
- final ListenableFuture<Iterator<ReceiveMessageResponse>> future = clientManager.receiveMessage(endpoints, metadata, request, timeout);
+ final ListenableFuture<Iterator<ReceiveMessageResponse>> future = clientManager.receiveMessage(endpoints,
+ metadata, request, timeout);
return Futures.transform(future, it -> {
// Null here means status not set yet.
Status status = null;
@@ -96,13 +97,15 @@ abstract class ConsumerImpl extends ClientImpl {
break;
case DELIVERY_TIMESTAMP:
deliveryTimestampFromRemote = response.getDeliveryTimestamp();
+ break;
default:
- LOGGER.warn("[Bug] Not recognized content for receive message response, mq={}, clientId={}, resp={}", mq, clientId, response);
+ LOGGER.warn("[Bug] Not recognized content for receive message response, mq={}," +
+ " clientId={}, resp={}", mq, clientId, response);
}
}
for (Message message : messageList) {
- final MessageViewImpl messageView = MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote);
- messages.add(messageView);
+ final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote);
+ messages.add(view);
}
return new ReceiveMessageResult(endpoints, status, messages);
}, MoreExecutors.directExecutor());
@@ -156,9 +159,11 @@ abstract class ConsumerImpl extends ClientImpl {
final Status status = response.getStatus();
final Code code = status.getCode();
final Duration duration = stopwatch.elapsed();
- MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+ MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ?
+ MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
if (!Code.OK.equals(code)) {
- LOGGER.error("Failed to ack message, code={}, status message=[{}], topic={}, messageId={}, clientId={}", code, status.getMessage(), topic, messageId, clientId);
+ LOGGER.error("Failed to ack message, code={}, status message=[{}], topic={}, messageId={}," +
+ " clientId={}", code, status.getMessage(), topic, messageId, clientId);
}
doAfter(MessageHookPoints.ACK, messageCommons, duration, messageHookPointsStatus);
}
@@ -167,7 +172,8 @@ abstract class ConsumerImpl extends ClientImpl {
public void onFailure(Throwable t) {
final Duration duration = stopwatch.elapsed();
doAfter(MessageHookPoints.ACK, messageCommons, duration, MessageHookPointsStatus.ERROR);
- LOGGER.error("Exception raised during message acknowledgement, topic={}, messageId={}, clientId={}", topic, messageId, clientId, t);
+ LOGGER.error("Exception raised during message acknowledgement, topic={}, messageId={}, clientId={}",
+ topic, messageId, clientId, t);
}
}, MoreExecutors.directExecutor());
return future;
@@ -184,7 +190,8 @@ abstract class ConsumerImpl extends ClientImpl {
try {
final ChangeInvisibleDurationRequest request = wrapChangeInvisibleDuration(messageView, invisibleDuration);
final Metadata metadata = sign();
- future = clientManager.changeInvisibleDuration(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
+ future = clientManager.changeInvisibleDuration(endpoints, metadata, request,
+ clientConfiguration.getRequestTimeout());
} catch (Throwable t) {
final SettableFuture<ChangeInvisibleDurationResponse> future0 = SettableFuture.create();
future0.setException(t);
@@ -197,9 +204,11 @@ abstract class ConsumerImpl extends ClientImpl {
final Status status = response.getStatus();
final Code code = status.getCode();
final Duration duration = stopwatch.elapsed();
- MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+ MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ?
+ MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
if (!Code.OK.equals(code)) {
- LOGGER.error("Failed to change message invisible duration, messageId={}, endpoints={}, code={}, status message=[{}], clientId={}", messageId, endpoints, code, status.getMessage(), clientId);
+ LOGGER.error("Failed to change message invisible duration, messageId={}, endpoints={}, code={}," +
+ " status message=[{}], clientId={}", messageId, endpoints, code, status.getMessage(), clientId);
}
doAfter(MessageHookPoints.CHANGE_INVISIBLE_DURATION, messageCommons, duration, messageHookPointsStatus);
}
@@ -208,7 +217,8 @@ abstract class ConsumerImpl extends ClientImpl {
public void onFailure(Throwable t) {
final Duration duration = stopwatch.elapsed();
doAfter(MessageHookPoints.ACK, messageCommons, duration, MessageHookPointsStatus.ERROR);
- LOGGER.error("Exception raised during message acknowledgement, messageId={}, endpoints={}, clientId={}", messageId, endpoints, clientId, t);
+ LOGGER.error("Exception raised during message acknowledgement, messageId={}, endpoints={}, clientId={}",
+ messageId, endpoints, clientId, t);
}
}, MoreExecutors.directExecutor());
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
index 0169915..c8beeb2 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
@@ -21,8 +21,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -35,6 +33,8 @@ import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings("NullableProblems")
class FifoConsumeService extends ConsumeService {
@@ -53,7 +53,8 @@ class FifoConsumeService extends ConsumeService {
}
final MessageViewImpl next = iterator.next();
final ListenableFuture<ConsumeResult> future0 = consume(next);
- final ListenableFuture<Void> future = Futures.transformAsync(future0, result -> pq.eraseFifoMessage(next, result), MoreExecutors.directExecutor());
+ final ListenableFuture<Void> future = Futures.transformAsync(future0, result -> pq.eraseFifoMessage(next,
+ result), MoreExecutors.directExecutor());
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(Void ignore) {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
similarity index 87%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index bd93e26..3183627 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -29,8 +29,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -53,6 +51,8 @@ import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Default implementation of {@link ProcessQueue}.
@@ -63,12 +63,12 @@ import org.apache.rocketmq.client.java.route.MessageQueueImpl;
*/
@SuppressWarnings({"NullableProblems", "UnstableApiUsage"})
class ProcessQueueImpl implements ProcessQueue {
- private static final Logger LOGGER = LoggerFactory.getLogger(ProcessQueueImpl.class);
-
public static final Duration FORWARD_FIFO_MESSAGE_TO_DLQ_DELAY = Duration.ofMillis(100);
public static final Duration ACK_FIFO_MESSAGE_DELAY = Duration.ofMillis(100);
public static final Duration RECEIVE_LATER_DELAY = Duration.ofSeconds(3);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ProcessQueueImpl.class);
+
private final PushConsumerImpl consumer;
/**
@@ -120,12 +120,14 @@ class ProcessQueueImpl implements ProcessQueue {
@Override
public boolean expired() {
- Duration maxIdleDuration = Duration.ofNanos(2 * consumer.getPushConsumerSettings().getLongPollingTimeout().toNanos());
+ final PushConsumerSettings settings = consumer.getPushConsumerSettings();
+ Duration maxIdleDuration = Duration.ofNanos(2 * settings.getLongPollingTimeout().toNanos());
final Duration idleDuration = Duration.ofNanos(System.nanoTime() - activityNanoTime);
if (idleDuration.compareTo(maxIdleDuration) < 0) {
return false;
}
- LOGGER.warn("Process queue is idle, idle duration={}, max idle duration={}, mq={}, clientId={}", idleDuration, maxIdleDuration, mq, consumer.getClientId());
+ LOGGER.warn("Process queue is idle, idle duration={}, max idle duration={}, mq={}, clientId={}", idleDuration,
+ maxIdleDuration, mq, consumer.getClientId());
return true;
}
@@ -153,11 +155,13 @@ class ProcessQueueImpl implements ProcessQueue {
corrupted.forEach(messageView -> {
final MessageId messageId = messageView.getMessageId();
if (consumer.getPushConsumerSettings().isFifo()) {
- LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}, messageId={}, clientId={}", mq, messageId, consumer.getClientId());
+ LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}," +
+ " messageId={}, clientId={}", mq, messageId, consumer.getClientId());
forwardToDeadLetterQueue(messageView);
return;
}
- LOGGER.error("Message is corrupted, nack it in standard mode, mq={}, messageId={}, clientId={}", mq, messageId, consumer.getClientId());
+ LOGGER.error("Message is corrupted, nack it in standard mode, mq={}, messageId={}, clientId={}", mq,
+ messageId, consumer.getClientId());
nackMessage(messageView);
});
}
@@ -188,18 +192,21 @@ class ProcessQueueImpl implements ProcessQueue {
return;
}
// Should never reach here.
- LOGGER.error("[Bug] Failed to schedule receive message request, mq={}, clientId={}", mq, consumer.getClientId(), t);
+ LOGGER.error("[Bug] Failed to schedule receive message request, mq={}, clientId={}", mq,
+ consumer.getClientId(), t);
receiveMessageLater();
}
}
public void receiveMessage() {
if (dropped) {
- LOGGER.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", mq, consumer.getClientId());
+ LOGGER.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", mq,
+ consumer.getClientId());
return;
}
if (this.isCacheFull()) {
- LOGGER.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq, consumer.getClientId());
+ LOGGER.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq,
+ consumer.getClientId());
receiveMessageLater();
return;
}
@@ -208,7 +215,8 @@ class ProcessQueueImpl implements ProcessQueue {
private void receiveMessageImmediately() {
if (!consumer.isRunning()) {
- LOGGER.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, consumer.getClientId());
+ LOGGER.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq,
+ consumer.getClientId());
return;
}
try {
@@ -228,18 +236,20 @@ class ProcessQueueImpl implements ProcessQueue {
public void onSuccess(ReceiveMessageResult result) {
// Intercept after message reception.
final Duration duration = stopwatch.elapsed();
- final List<MessageCommon> messageCommons = result.getMessages().stream().map(MessageViewImpl::getMessageCommon).collect(Collectors.toList());
+ final List<MessageCommon> commons = result.getMessages().stream()
+ .map(MessageViewImpl::getMessageCommon).collect(Collectors.toList());
if (result.getStatus().isPresent() && Code.OK.equals(result.getStatus().get().getCode())) {
- consumer.doAfter(MessageHookPoints.RECEIVE, messageCommons, duration, MessageHookPointsStatus.OK);
+ consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.OK);
} else {
- consumer.doAfter(MessageHookPoints.RECEIVE, messageCommons, duration, MessageHookPointsStatus.ERROR);
+ consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.ERROR);
}
try {
onReceiveMessageResult(result);
} catch (Throwable t) {
// Should never reach here.
- LOGGER.error("[Bug] Exception raised while handling receive result, would receive later, mq={}, endpoints={}, clientId={}",
+ LOGGER.error("[Bug] Exception raised while handling receive result, would receive later," +
+ " mq={}, endpoints={}, clientId={}",
mq, endpoints, consumer.getClientId(), t);
receiveMessageLater();
}
@@ -249,16 +259,18 @@ class ProcessQueueImpl implements ProcessQueue {
public void onFailure(Throwable t) {
// Intercept after message reception.
final Duration duration = stopwatch.elapsed();
- consumer.doAfter(MessageHookPoints.RECEIVE, Collections.emptyList(), duration, MessageHookPointsStatus.ERROR);
+ consumer.doAfter(MessageHookPoints.RECEIVE, Collections.emptyList(), duration,
+ MessageHookPointsStatus.ERROR);
- LOGGER.error("Exception raised while message reception, would receive later, mq={}, endpoints={}, clientId={}",
- mq, endpoints, consumer.getClientId(), t);
+ LOGGER.error("Exception raised while message reception, would receive later, mq={}, endpoints={}," +
+ " clientId={}", mq, endpoints, consumer.getClientId(), t);
receiveMessageLater();
}
}, MoreExecutors.directExecutor());
consumer.getReceptionTimes().getAndIncrement();
} catch (Throwable t) {
- LOGGER.error("Exception raised while message reception, would receive later, mq={}, clientId={}", mq, consumer.getClientId(), t);
+ LOGGER.error("Exception raised while message reception, would receive later, mq={}, clientId={}", mq,
+ consumer.getClientId(), t);
receiveMessageLater();
}
}
@@ -267,14 +279,16 @@ class ProcessQueueImpl implements ProcessQueue {
final int cacheMessageCountThresholdPerQueue = consumer.cacheMessageCountThresholdPerQueue();
final long actualMessagesQuantity = this.cachedMessagesCount();
if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity) {
- LOGGER.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}, mq={}, clientId={}",
+ LOGGER.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}," +
+ " mq={}, clientId={}",
cacheMessageCountThresholdPerQueue, actualMessagesQuantity, mq, consumer.getClientId());
return true;
}
final int cacheMessageBytesThresholdPerQueue = consumer.cacheMessageBytesThresholdPerQueue();
final long actualCachedMessagesBytes = this.cachedMessageBytes();
if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes) {
- LOGGER.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes, actual={} bytes, mq={}, clientId={}",
+ LOGGER.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes," +
+ " actual={} bytes, mq={}, clientId={}",
cacheMessageBytesThresholdPerQueue, actualCachedMessagesBytes, mq, consumer.getClientId());
return true;
}
@@ -316,7 +330,8 @@ class ProcessQueueImpl implements ProcessQueue {
receiveMessage();
}
optionalStatus = Optional.of(Status.newBuilder().setCode(Code.OK).build());
- LOGGER.error("[Bug] Status not set but message(s) found in the receive result, fix the status to OK, mq={}, endpoints={}", mq, endpoints);
+ LOGGER.error("[Bug] Status not set but message(s) found in the receive result, fix the status to OK," +
+ " mq={}, endpoints={}", mq, endpoints);
}
final Status status = optionalStatus.get();
final Code code = status.getCode();
@@ -327,17 +342,20 @@ class ProcessQueueImpl implements ProcessQueue {
consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
consumer.getConsumeService().signal();
}
- LOGGER.debug("Receive message with OK, mq={}, endpoints={}, messages found count={}, clientId={}", mq, endpoints, messages.size(), consumer.getClientId());
+ LOGGER.debug("Receive message with OK, mq={}, endpoints={}, messages found count={}, clientId={}",
+ mq, endpoints, messages.size(), consumer.getClientId());
receiveMessage();
break;
case MESSAGE_NOT_FOUND:
- LOGGER.info("Message not found while receiving message, mq={}, endpoints={}, clientId={}, code={}, status message=[{}]",
+ LOGGER.info("Message not found while receiving message, mq={}, endpoints={}, clientId={}, code={}," +
+ " status message=[{}]",
mq, endpoints, consumer.getClientId(), code.getNumber(), status.getMessage());
receiveMessage();
break;
// Fall through on purpose.
default:
- LOGGER.error("Failed to receive message from remote, try it later, mq={}, endpoints={}, clientId={}, code={}, status message=[{}]",
+ LOGGER.error("Failed to receive message from remote, try it later, mq={}, endpoints={}, clientId={}," +
+ " code={}, status message=[{}]",
mq, endpoints, consumer.getClientId(), code.getNumber(), status.getMessage());
receiveMessageLater();
}
@@ -431,14 +449,18 @@ class ProcessQueueImpl implements ProcessQueue {
if (ConsumeResult.FAILURE.equals(consumeResult) && attempt < maxAttempts) {
final Duration nextAttemptDelay = retryPolicy.getNextAttemptDelay(attempt);
attempt = messageView.incrementAndGetDeliveryAttempt();
- LOGGER.debug("Prepare to redeliver the fifo message because of the consumption failure, maxAttempt={}, attempt={}, mq={}, messageId={}, nextAttemptDelay={}, clientId={}",
+ LOGGER.debug("Prepare to redeliver the fifo message because of the consumption failure, maxAttempt={}," +
+ " attempt={}, mq={}, messageId={}, nextAttemptDelay={}, clientId={}",
maxAttempts, attempt, mq, messageId, nextAttemptDelay, consumer.getClientId());
final ListenableFuture<ConsumeResult> future = service.consume(messageView, nextAttemptDelay);
- return Futures.transformAsync(future, result -> eraseFifoMessage(messageView, result), MoreExecutors.directExecutor());
+ return Futures.transformAsync(future, result -> eraseFifoMessage(messageView, result),
+ MoreExecutors.directExecutor());
}
boolean ok = ConsumeResult.SUCCESS.equals(consumeResult);
if (!ok) {
- LOGGER.info("Failed to consume fifo message finally, run out of attempt times, maxAttempts={}, attempt={}, mq={}, messageId={}, clientId={}",
+ LOGGER.info("Failed to consume fifo message finally, run out of attempt times, maxAttempts={}, "
+ + "attempt={}," +
+ " mq={}, messageId={}, clientId={}",
maxAttempts, attempt, mq, messageId, consumer.getClientId());
}
// Ack message or forward it to DLQ depends on consumption result.
@@ -465,7 +487,8 @@ class ProcessQueueImpl implements ProcessQueue {
final Code code = status.getCode();
// Log failure and retry later.
if (!Code.OK.equals(code)) {
- LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later, clientId={}, messageId={}, attempt={}, mq={}, code={}, status message={}",
+ LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later," +
+ " clientId={}, messageId={}, attempt={}, mq={}, code={}, status message={}",
clientId, messageView.getMessageId(), attempt, mq, code, status.getMessage());
forwardToDeadLetterQueue(messageView, 1 + attempt, future0);
return;
@@ -496,7 +519,8 @@ class ProcessQueueImpl implements ProcessQueue {
final String clientId = consumer.getClientId();
// Process queue is dropped, no need to proceed.
if (dropped) {
- LOGGER.info("Process queue was dropped, give up to forward message to dead letter queue, mq={}, messageId={}, clientId={}", mq, messageId, clientId);
+ LOGGER.info("Process queue was dropped, give up to forward message to dead letter queue, mq={}," +
+ " messageId={}, clientId={}", mq, messageId, clientId);
return;
}
final ScheduledExecutorService scheduler = consumer.getScheduler();
@@ -508,7 +532,8 @@ class ProcessQueueImpl implements ProcessQueue {
return;
}
// Should never reach here.
- LOGGER.error("[Bug] Failed to schedule DLQ message request, mq={}, messageId={}, clientId={}", mq, messageView.getMessageId(), clientId);
+ LOGGER.error("[Bug] Failed to schedule DLQ message request, mq={}, messageId={}, clientId={}", mq,
+ messageView.getMessageId(), clientId);
forwardToDeadLetterQueueLater(messageView, 1 + attempt, future0);
}
}
@@ -531,15 +556,16 @@ class ProcessQueueImpl implements ProcessQueue {
final Code code = status.getCode();
// Log failure and retry later.
if (!Code.OK.equals(code)) {
- LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, attempt={}, messageId={}, mq={}, code={}, endpoints={}, status message=[{}]",
+ LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, attempt={}," +
+ " messageId={}, mq={}, code={}, endpoints={}, status message=[{}]",
clientId, attempt, messageView.getMessageId(), mq, code, endpoints, status.getMessage());
ackFifoMessageLater(messageView, 1 + attempt, future0);
return;
}
// Log retries.
if (1 < attempt) {
- LOGGER.info("Re-ack fifo message successfully, clientId={}, attempt={}, messageId={}, mq={}, endpoints={}",
- clientId, attempt, messageView.getMessageId(), mq, endpoints);
+ LOGGER.info("Re-ack fifo message successfully, clientId={}, attempt={}, messageId={}, mq={}," +
+ " endpoints={}", clientId, attempt, messageView.getMessageId(), mq, endpoints);
}
// Set result if FIFO message is acknowledged successfully.
future0.set(null);
@@ -548,7 +574,8 @@ class ProcessQueueImpl implements ProcessQueue {
@Override
public void onFailure(Throwable t) {
// Log failure and retry later.
- LOGGER.error("Exception raised while ack fifo message, clientId={}, would attempt to re-ack later, attempt={}, messageId={}, mq={}, endpoints={}",
+ LOGGER.error("Exception raised while ack fifo message, clientId={}, would attempt to re-ack later," +
+ " attempt={}, messageId={}, mq={}, endpoints={}",
clientId, attempt, messageView.getMessageId(), mq, endpoints, t);
ackFifoMessageLater(messageView, 1 + attempt, future0);
}
@@ -561,7 +588,8 @@ class ProcessQueueImpl implements ProcessQueue {
final String clientId = consumer.getClientId();
// Process queue is dropped, no need to proceed.
if (dropped) {
- LOGGER.info("Process queue was dropped, give up to ack message, mq={}, messageId={}, clientId={}", mq, messageId, clientId);
+ LOGGER.info("Process queue was dropped, give up to ack message, mq={}, messageId={}, clientId={}",
+ mq, messageId, clientId);
return;
}
final ScheduledExecutorService scheduler = consumer.getScheduler();
@@ -573,7 +601,8 @@ class ProcessQueueImpl implements ProcessQueue {
return;
}
// Should never reach here.
- LOGGER.error("[Bug] Failed to schedule ack fifo message request, mq={}, messageId={}, clientId={}", mq, messageId, clientId);
+ LOGGER.error("[Bug] Failed to schedule ack fifo message request, mq={}, messageId={}, clientId={}",
+ mq, messageId, clientId);
ackFifoMessageLater(messageView, 1 + attempt, future0);
}
}
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
index 13cae19..de3c6ef 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.client.java.impl.consumer;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
@@ -27,9 +30,6 @@ import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* Implementation of {@link PushConsumerBuilder}
*/
@@ -59,7 +59,8 @@ public class PushConsumerBuilderImpl implements PushConsumerBuilder {
@Override
public PushConsumerBuilder setConsumerGroup(String consumerGroup) {
checkNotNull(consumerGroup, "consumerGroup should not be null");
- checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(), "consumerGroup does not match the regex [regex=%s]", CONSUMER_GROUP_PATTERN.pattern());
+ checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(), "consumerGroup does not match the "
+ + "regex [regex=%s]", CONSUMER_GROUP_PATTERN.pattern());
this.consumerGroup = consumerGroup;
return this;
}
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
similarity index 93%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 59784a2..b14c047 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -33,8 +33,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import io.grpc.Metadata;
import java.time.Duration;
import java.util.Collections;
@@ -73,6 +71,8 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Default implementation of {@link PushConsumer}
@@ -86,6 +86,9 @@ import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCacheObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerImpl.class);
+ final AtomicLong consumptionOkQuantity;
+ final AtomicLong consumptionErrorQuantity;
+
private final ClientConfiguration clientConfiguration;
private final PushConsumerSettings pushConsumerSettings;
private final String consumerGroup;
@@ -104,9 +107,6 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
*/
private final AtomicLong receivedMessagesQuantity;
- final AtomicLong consumptionOkQuantity;
- final AtomicLong consumptionErrorQuantity;
-
private final ThreadPoolExecutor consumptionExecutor;
private final ConcurrentMap<MessageQueueImpl, ProcessQueue> processQueueTable;
private ConsumeService consumeService;
@@ -215,7 +215,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
public PushConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException {
// Check consumer status.
if (!this.isRunning()) {
- LOGGER.error("Unable to add subscription because push consumer is not running, state={}, clientId={}", this.state(), clientId);
+ LOGGER.error("Unable to add subscription because push consumer is not running, state={}, clientId={}",
+ this.state(), clientId);
throw new IllegalStateException("Push consumer is not running now");
}
@@ -237,7 +238,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
public PushConsumer unsubscribe(String topic) {
// Check consumer status.
if (!this.isRunning()) {
- LOGGER.error("Unable to remove subscription because push consumer is not running, state={}, clientId={}", this.state(), clientId);
+ LOGGER.error("Unable to remove subscription because push consumer is not running, state={}, clientId={}",
+ this.state(), clientId);
throw new IllegalStateException("Push consumer is not running now");
}
subscriptionExpressions.remove(topic);
@@ -278,7 +280,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
final Status status = response.getStatus();
final Code code = status.getCode();
if (!Code.OK.equals(code)) {
- final String message = String.format("Failed to query assignment, code=%d, status message=[{%s}]", code.getNumber(), status.getMessage());
+ final String message = String.format("Failed to query assignment, code=%d, status message=[{%s}]",
+ code.getNumber(), status.getMessage());
throw new RuntimeException(message);
}
SettableFuture<Assignments> future0 = SettableFuture.create();
@@ -385,14 +388,17 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
public void onSuccess(Assignments latest) {
if (latest.getAssignmentList().isEmpty()) {
if (null == existed || existed.getAssignmentList().isEmpty()) {
- LOGGER.info("Acquired empty assignments from remote, would scan later, topic={}, clientId={}", topic, clientId);
+ LOGGER.info("Acquired empty assignments from remote, would scan later, topic={}, "
+ + "clientId={}", topic, clientId);
return;
}
- LOGGER.info("Attention!!! acquired empty assignments from remote, but existed assignments is not empty, topic={}, clientId={}", topic, clientId);
+ LOGGER.info("Attention!!! acquired empty assignments from remote, but existed assignments"
+ + " is not empty, topic={}, clientId={}", topic, clientId);
}
if (!latest.equals(existed)) {
- LOGGER.info("Assignments of topic={} has changed, {} => {}, clientId={}", topic, existed, latest, clientId);
+ LOGGER.info("Assignments of topic={} has changed, {} => {}, clientId={}", topic, existed,
+ latest, clientId);
syncProcessQueue(topic, latest, filterExpression);
cacheAssignments.put(topic, latest);
return;
@@ -404,7 +410,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
@SuppressWarnings("NullableProblems")
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Exception raised while scanning the assignments, topic={}, clientId={}", topic, clientId, t);
+ LOGGER.error("Exception raised while scanning the assignments, topic={}, clientId={}", topic,
+ clientId, t);
}
}, MoreExecutors.directExecutor());
}
@@ -472,7 +479,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
public void onSuccess(ConsumeResult consumeResult) {
Code code = ConsumeResult.SUCCESS.equals(consumeResult) ? Code.OK : Code.FAILED_TO_CONSUME_MESSAGE;
Status status = Status.newBuilder().setCode(code).build();
- final VerifyMessageResult verifyMessageResult = VerifyMessageResult.newBuilder().setNonce(nonce).build();
+ final VerifyMessageResult verifyMessageResult =
+ VerifyMessageResult.newBuilder().setNonce(nonce).build();
TelemetryCommand command = TelemetryCommand.newBuilder()
.setVerifyMessageResult(verifyMessageResult)
.setStatus(status)
@@ -480,14 +488,16 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
try {
telemeter(endpoints, command);
} catch (Throwable t) {
- LOGGER.error("Failed to send message verification result command, endpoints={}, command={}, messageId={}, clientId={}", endpoints, command, messageId, clientId, t);
+ LOGGER.error("Failed to send message verification result command, endpoints={}, command={}, "
+ + "messageId={}, clientId={}", endpoints, command, messageId, clientId, t);
}
}
@Override
public void onFailure(Throwable t) {
// Should never reach here.
- LOGGER.error("[Bug] Failed to get message verification result, endpoints={}, messageId={}, clientId={}", endpoints, messageId, clientId, t);
+ LOGGER.error("[Bug] Failed to get message verification result, endpoints={}, messageId={}, "
+ + "clientId={}", endpoints, messageId, clientId, t);
}
}, MoreExecutors.directExecutor());
}
@@ -527,7 +537,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
@Override
public void onSuccess(ForwardMessageToDeadLetterQueueResponse response) {
final Duration duration = stopwatch.elapsed();
- MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(response.getStatus().getCode()) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+ MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(response.getStatus().getCode()) ?
+ MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
// Intercept after forwarding message to DLQ.
doAfter(MessageHookPoints.FORWARD_TO_DLQ, messageCommons, duration, messageHookPointsStatus);
}
@@ -550,11 +561,15 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
final long consumptionOkQuantity = this.consumptionOkQuantity.getAndSet(0);
final long consumptionErrorQuantity = this.consumptionErrorQuantity.getAndSet(0);
- LOGGER.info("clientId={}, consumerGroup={}, receptionTimes={}, receivedMessagesQuantity={}, consumptionOkQuantity={}, consumptionErrorQuantity={}",
- clientId, consumerGroup, receptionTimes, receivedMessagesQuantity, consumptionOkQuantity, consumptionErrorQuantity);
+ LOGGER.info("clientId={}, consumerGroup={}, receptionTimes={}, receivedMessagesQuantity={}, "
+ + "consumptionOkQuantity={}, consumptionErrorQuantity={}",
+ clientId, consumerGroup, receptionTimes, receivedMessagesQuantity, consumptionOkQuantity,
+ consumptionErrorQuantity);
for (ProcessQueue pq : processQueueTable.values()) {
- LOGGER.info("Process queue stats: clientId={}, mq={}, pendingMessageCount={}, inflightMessageCount={}, cachedMessageBytes={}",
- clientId, pq.getMessageQueue(), pq.getPendingMessageCount(), pq.getInflightMessageCount(), pq.getCachedMessageBytes());
+ LOGGER.info("Process queue stats: clientId={}, mq={}, pendingMessageCount={}, inflightMessageCount={}, "
+ + "cachedMessageBytes={}",
+ clientId, pq.getMessageQueue(), pq.getPendingMessageCount(), pq.getInflightMessageCount(),
+ pq.getCachedMessageBytes());
}
}
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
index 90e96a2..b9e97a5 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
@@ -24,8 +24,6 @@ import apache.rocketmq.v2.Subscription;
import apache.rocketmq.v2.SubscriptionEntry;
import com.google.common.base.MoreObjects;
import com.google.protobuf.util.Durations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@@ -39,6 +37,8 @@ import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PushConsumerSettings extends ClientSettings {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerSettings.class);
@@ -73,8 +73,10 @@ public class PushConsumerSettings extends ClientSettings {
List<SubscriptionEntry> subscriptionEntries = new ArrayList<>();
for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
final FilterExpression filterExpression = entry.getValue();
- apache.rocketmq.v2.Resource topic = apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
- final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder = apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
+ apache.rocketmq.v2.Resource topic =
+ apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
+ final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
+ apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
final FilterExpressionType type = filterExpression.getFilterExpressionType();
switch (type) {
case TAG:
@@ -86,10 +88,12 @@ public class PushConsumerSettings extends ClientSettings {
default:
LOGGER.warn("[Bug] Unrecognized filter type, type={}", type);
}
- SubscriptionEntry subscriptionEntry = SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
+ SubscriptionEntry subscriptionEntry =
+ SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
subscriptionEntries.add(subscriptionEntry);
}
- Subscription subscription = Subscription.newBuilder().setGroup(group.toProtobuf()).addAllSubscriptions(subscriptionEntries).build();
+ Subscription subscription =
+ Subscription.newBuilder().setGroup(group.toProtobuf()).addAllSubscriptions(subscriptionEntries).build();
return Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setSubscription(subscription)
.setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
@@ -99,7 +103,8 @@ public class PushConsumerSettings extends ClientSettings {
public void applySettingsCommand(Settings settings) {
final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
- LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, client type={}", clientId, pubSubCase, clientType);
+ LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, "
+ + "client type={}", clientId, pubSubCase, clientType);
return;
}
final Subscription subscription = settings.getSubscription();
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
similarity index 94%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
index fe19f4b..297845d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.client.java.impl.consumer;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -27,9 +30,6 @@ import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
public class SimpleConsumerBuilderImpl implements SimpleConsumerBuilder {
private static final Pattern CONSUMER_GROUP_PATTERN = Pattern.compile("^[%|a-zA-Z0-9._-]{1,255}$");
@@ -53,8 +53,8 @@ public class SimpleConsumerBuilderImpl implements SimpleConsumerBuilder {
@Override
public SimpleConsumerBuilder setConsumerGroup(String consumerGroup) {
checkNotNull(consumerGroup, "consumerGroup should not be null");
- checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(), "consumerGroup does not match the regex [regex=%s]",
- CONSUMER_GROUP_PATTERN.pattern());
+ checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(),
+ "consumerGroup does not match the regex [regex=%s]", CONSUMER_GROUP_PATTERN.pattern());
this.consumerGroup = consumerGroup;
return this;
}
@@ -79,7 +79,8 @@ public class SimpleConsumerBuilderImpl implements SimpleConsumerBuilder {
checkNotNull(consumerGroup, "consumerGroup has not been set yet");
checkArgument(!subscriptionExpressions.isEmpty(), "subscriptionExpressions have not been set yet");
checkNotNull(awaitDuration, "awaitDuration has not been set yet");
- final SimpleConsumerImpl consumer = new SimpleConsumerImpl(clientConfiguration, consumerGroup, awaitDuration, subscriptionExpressions);
+ final SimpleConsumerImpl consumer = new SimpleConsumerImpl(clientConfiguration, consumerGroup, awaitDuration,
+ subscriptionExpressions);
consumer.startAsync().awaitRunning();
return consumer;
}
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index f0605b6..db48b87 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -27,8 +27,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
@@ -51,6 +49,8 @@ import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implementation of {@link SimpleConsumer}
@@ -66,20 +66,21 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
private final AtomicInteger topicIndex;
private final Map<String /* topic */, FilterExpression> subscriptionExpressions;
- private final ConcurrentMap<String /* topic */, SubscriptionTopicRouteDataResult> subscriptionTopicRouteDataResultCache;
+ private final ConcurrentMap<String /* topic */, SubscriptionTopicRouteDataResult> subTopicRouteDataResultCache;
public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration,
Map<String, FilterExpression> subscriptionExpressions) {
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
Resource groupResource = new Resource(consumerGroup);
- this.simpleConsumerSettings = new SimpleConsumerSettings(clientId, accessEndpoints, groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
+ this.simpleConsumerSettings = new SimpleConsumerSettings(clientId, accessEndpoints, groupResource,
+ clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.awaitDuration = awaitDuration;
this.topicIndex = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
this.subscriptionExpressions = subscriptionExpressions;
- this.subscriptionTopicRouteDataResultCache = new ConcurrentHashMap<>();
+ this.subTopicRouteDataResultCache = new ConcurrentHashMap<>();
}
@Override
@@ -111,7 +112,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
public SimpleConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException {
// Check consumer status.
if (!this.isRunning()) {
- LOGGER.error("Unable to add subscription because simple consumer is not running, state={}, clientId={}", this.state(), clientId);
+ LOGGER.error("Unable to add subscription because simple consumer is not running, state={}, clientId={}",
+ this.state(), clientId);
throw new IllegalStateException("Simple consumer is not running now");
}
final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
@@ -131,7 +133,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
@Override
public SimpleConsumer unsubscribe(String topic) {
if (!this.isRunning()) {
- LOGGER.error("Unable to remove subscription because simple consumer is not running, state={}, clientId={}", this.state(), clientId);
+ LOGGER.error("Unable to remove subscription because simple consumer is not running, state={}, "
+ + "clientId={}", this.state(), clientId);
throw new IllegalStateException("Simple consumer is not running now");
}
subscriptionExpressions.remove(topic);
@@ -167,7 +170,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
public ListenableFuture<List<MessageView>> receive0(int maxMessageNum, Duration invisibleDuration) {
SettableFuture<List<MessageView>> future = SettableFuture.create();
if (!this.isRunning()) {
- LOGGER.error("Unable to receive message because simple consumer is not running, state={}, clientId={}", this.state(), clientId);
+ LOGGER.error("Unable to receive message because simple consumer is not running, state={}, clientId={}",
+ this.state(), clientId);
future.setException(new IllegalStateException("Simple consumer is not running now"));
return future;
}
@@ -175,7 +179,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
final ArrayList<String> topics = new ArrayList<>(copy.keySet());
// All topic is subscribed.
if (topics.isEmpty()) {
- final IllegalArgumentException exception = new IllegalArgumentException("There is no topic to receive message");
+ final IllegalArgumentException exception = new IllegalArgumentException("There is no topic to receive "
+ + "message");
future.setException(exception);
return future;
}
@@ -184,7 +189,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
final ListenableFuture<SubscriptionTopicRouteDataResult> routeFuture = getSubscriptionTopicRouteResult(topic);
final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
final MessageQueueImpl mq = result.takeMessageQueue();
- final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, invisibleDuration);
+ final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
+ invisibleDuration);
return receiveMessage(request, mq, awaitDuration);
}, MoreExecutors.directExecutor());
return Futures.transformAsync(future0, result -> {
@@ -225,12 +231,14 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
SettableFuture<Void> future0 = SettableFuture.create();
// Check consumer status.
if (!this.isRunning()) {
- LOGGER.error("Unable to ack message because simple consumer is not running, state={}, clientId={}", this.state(), clientId);
+ LOGGER.error("Unable to ack message because simple consumer is not running, state={}, clientId={}",
+ this.state(), clientId);
future0.setException(new IllegalStateException("Simple consumer is not running now"));
return future0;
}
if (!(messageView instanceof MessageViewImpl)) {
- final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for messageView");
+ final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for "
+ + "messageView");
future0.setException(exception);
return future0;
}
@@ -268,12 +276,14 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
public ListenableFuture<Void> changeInvisibleDuration0(MessageView messageView, Duration invisibleDuration) {
SettableFuture<Void> future0 = SettableFuture.create();
if (!(messageView instanceof MessageViewImpl)) {
- final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for messageView");
+ final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for "
+ + "messageView");
future0.setException(exception);
return future0;
}
MessageViewImpl impl = (MessageViewImpl) messageView;
- final ListenableFuture<ChangeInvisibleDurationResponse> future = changInvisibleDuration(impl, invisibleDuration);
+ final ListenableFuture<ChangeInvisibleDurationResponse> future = changInvisibleDuration(impl,
+ invisibleDuration);
return Futures.transformAsync(future, response -> {
// Refresh receipt handle manually.
impl.setReceiptHandle(response.getReceiptHandle());
@@ -302,21 +312,23 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
}
public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
- final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult = new SubscriptionTopicRouteDataResult(topicRouteDataResult);
- subscriptionTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
+ final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult =
+ new SubscriptionTopicRouteDataResult(topicRouteDataResult);
+ subTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
}
private ListenableFuture<SubscriptionTopicRouteDataResult> getSubscriptionTopicRouteResult(final String topic) {
SettableFuture<SubscriptionTopicRouteDataResult> future0 = SettableFuture.create();
- final SubscriptionTopicRouteDataResult result = subscriptionTopicRouteDataResultCache.get(topic);
+ final SubscriptionTopicRouteDataResult result = subTopicRouteDataResultCache.get(topic);
if (null != result) {
future0.set(result);
return future0;
}
final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
return Futures.transform(future, topicRouteDataResult -> {
- final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult = new SubscriptionTopicRouteDataResult(topicRouteDataResult);
- subscriptionTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
+ final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult =
+ new SubscriptionTopicRouteDataResult(topicRouteDataResult);
+ subTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
return subscriptionTopicRouteDataResult;
}, MoreExecutors.directExecutor());
}
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
index 70c981a..0117237 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
@@ -23,8 +23,6 @@ import apache.rocketmq.v2.Subscription;
import apache.rocketmq.v2.SubscriptionEntry;
import com.google.common.base.MoreObjects;
import com.google.protobuf.util.Durations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@@ -36,6 +34,8 @@ import org.apache.rocketmq.client.java.impl.ClientSettings;
import org.apache.rocketmq.client.java.impl.ClientType;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SimpleConsumerSettings extends ClientSettings {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerSettings.class);
@@ -57,8 +57,10 @@ public class SimpleConsumerSettings extends ClientSettings {
List<SubscriptionEntry> subscriptionEntries = new ArrayList<>();
for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
final FilterExpression filterExpression = entry.getValue();
- apache.rocketmq.v2.Resource topic = apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
- final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder = apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
+ apache.rocketmq.v2.Resource topic =
+ apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
+ final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
+ apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
final FilterExpressionType type = filterExpression.getFilterExpressionType();
switch (type) {
case TAG:
@@ -70,7 +72,8 @@ public class SimpleConsumerSettings extends ClientSettings {
default:
LOGGER.warn("[Bug] Unrecognized filter type for simple consumer, type={}", type);
}
- SubscriptionEntry subscriptionEntry = SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
+ SubscriptionEntry subscriptionEntry =
+ SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
subscriptionEntries.add(subscriptionEntry);
}
Subscription subscription = Subscription.newBuilder().setGroup(group.toProtobuf())
@@ -85,7 +88,8 @@ public class SimpleConsumerSettings extends ClientSettings {
public void applySettingsCommand(Settings settings) {
final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
- LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, client type={}", clientId, pubSubCase, clientType);
+ LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, "
+ + "client type={}", clientId, pubSubCase, clientType);
return;
}
this.arrivedFuture.set(null);
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
similarity index 99%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
index bb0681d..52d0c21 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
@@ -21,8 +21,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -35,6 +33,8 @@ import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings("NullableProblems")
public class StandardConsumeService extends ConsumeService {
@@ -89,7 +89,6 @@ public class StandardConsumeService extends ConsumeService {
boolean dispatched;
do {
dispatched = dispatch0();
- }
- while (dispatched);
+ } while (dispatched);
}
}
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
index 522934d..804cf3f 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
@@ -63,7 +63,8 @@ public class SubscriptionTopicRouteDataResult {
}
if (messageQueues.isEmpty()) {
// Should never reach here.
- throw new ResourceNotFoundException("Failed to take message queue due to readable message queue doesn't exist");
+ throw new ResourceNotFoundException("Failed to take message queue due to readable message queue doesn't "
+ + "exist");
}
final int next = messageQueueIndex.getAndIncrement();
return messageQueues.get(IntMath.mod(next, messageQueues.size()));
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
similarity index 95%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
index 3ab768a..30d4f9d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.client.java.impl.producer;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@@ -27,9 +30,6 @@ import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
import org.apache.rocketmq.client.apis.producer.TransactionChecker;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* Implementation of {@link ProducerBuilder}
*/
@@ -57,7 +57,8 @@ public class ProducerBuilderImpl implements ProducerBuilder {
@Override
public ProducerBuilder setTopics(String... topics) {
final Set<String> set = Arrays.stream(topics).peek(topic -> checkNotNull(topic, "topic should not be null"))
- .peek(topic -> checkArgument(MessageBuilderImpl.TOPIC_PATTERN.matcher(topic).matches(), "topic does not match the regex [regex=%s]", MessageBuilderImpl.TOPIC_PATTERN.pattern()))
+ .peek(topic -> checkArgument(MessageBuilderImpl.TOPIC_PATTERN.matcher(topic).matches(), "topic does not "
+ + "match the regex [regex=%s]", MessageBuilderImpl.TOPIC_PATTERN.pattern()))
.collect(Collectors.toSet());
this.topics.addAll(set);
return this;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
similarity index 87%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 0877499..7c3d658 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -34,8 +34,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import io.grpc.Metadata;
import java.time.Duration;
import java.util.ArrayList;
@@ -73,6 +71,8 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Default implementation of {@link Producer}
@@ -92,10 +92,12 @@ class ProducerImpl extends ClientImpl implements Producer {
* The caller is supposed to have validated the arguments and handled throwing exception or
* logging warnings already, so we avoid repeating args check here.
*/
- ProducerImpl(ClientConfiguration clientConfiguration, Set<String> topics, int maxAttempts, TransactionChecker checker) {
+ ProducerImpl(ClientConfiguration clientConfiguration, Set<String> topics, int maxAttempts,
+ TransactionChecker checker) {
super(clientConfiguration, topics);
ExponentialBackoffRetryPolicy retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
- this.producerSettings = new ProducerSettings(clientId, accessEndpoints, retryPolicy, clientConfiguration.getRequestTimeout(), topics.stream().map(Resource::new).collect(Collectors.toSet()));
+ this.producerSettings = new ProducerSettings(clientId, accessEndpoints, retryPolicy,
+ clientConfiguration.getRequestTimeout(), topics.stream().map(Resource::new).collect(Collectors.toSet()));
this.checker = checker;
this.publishingRouteDataResultCache = new ConcurrentHashMap<>();
@@ -121,14 +123,17 @@ class ProducerImpl extends ClientImpl implements Producer {
final String transactionId = command.getTransactionId();
final String messageId = command.getOrphanedTransactionalMessage().getSystemProperties().getMessageId();
if (null == checker) {
- LOGGER.error("No transaction checker registered, ignore it, messageId={}, transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId);
+ LOGGER.error("No transaction checker registered, ignore it, messageId={}, transactionId={}, endpoints={},"
+ + " clientId={}", messageId, transactionId, endpoints, clientId);
return;
}
MessageViewImpl messageView;
try {
messageView = MessageViewImpl.fromProtobuf(command.getOrphanedTransactionalMessage(), mq);
} catch (Throwable t) {
- LOGGER.error("[Bug] Failed to decode message during orphaned transaction message recovery, messageId={}, transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, endpoints, clientId, t);
+ LOGGER.error("[Bug] Failed to decode message during orphaned transaction message recovery, messageId={}, "
+ + "transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, endpoints,
+ clientId, t);
return;
}
ListenableFuture<TransactionResolution> future;
@@ -148,15 +153,18 @@ class ProducerImpl extends ClientImpl implements Producer {
if (null == resolution || TransactionResolution.UNKNOWN.equals(resolution)) {
return;
}
- endTransaction(endpoints, messageView.getMessageCommon(), messageView.getMessageId(), transactionId, resolution);
+ endTransaction(endpoints, messageView.getMessageCommon(), messageView.getMessageId(),
+ transactionId, resolution);
} catch (Throwable t) {
- LOGGER.error("Exception raised while ending the transaction, messageId={}, transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
+ LOGGER.error("Exception raised while ending the transaction, messageId={}, transactionId={}, "
+ + "endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
}
}
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Exception raised while checking the transaction, messageId={}, transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
+ LOGGER.error("Exception raised while checking the transaction, messageId={}, transactionId={}, "
+ + "endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
}
}, MoreExecutors.directExecutor());
@@ -202,7 +210,8 @@ class ProducerImpl extends ClientImpl implements Producer {
} catch (Throwable t) {
throw new ClientException(t);
}
- final ListenableFuture<List<SendReceiptImpl>> future = send0(Collections.singletonList(publishingMessage), true);
+ final ListenableFuture<List<SendReceiptImpl>> future = send0(Collections.singletonList(publishingMessage),
+ true);
final List<SendReceiptImpl> receipts = handleClientFuture(future);
final SendReceiptImpl sendReceipt = receipts.iterator().next();
((TransactionImpl) transaction).tryAddReceipt(publishingMessage, sendReceipt);
@@ -235,7 +244,8 @@ class ProducerImpl extends ClientImpl implements Producer {
@Override
public Transaction beginTransaction() {
if (!this.isRunning()) {
- LOGGER.error("Unable to begin a transaction because producer is not running, state={}, clientId={}", this.state(), clientId);
+ LOGGER.error("Unable to begin a transaction because producer is not running, state={}, clientId={}",
+ this.state(), clientId);
throw new IllegalStateException("Producer is not running now");
}
return new TransactionImpl(this);
@@ -270,7 +280,8 @@ class ProducerImpl extends ClientImpl implements Producer {
final Stopwatch stopwatch = Stopwatch.createStarted();
final List<MessageCommon> messageCommons = Collections.singletonList(messageCommon);
- MessageHookPoints messageHookPoints = TransactionResolution.COMMIT.equals(resolution) ? MessageHookPoints.COMMIT_TRANSACTION : MessageHookPoints.ROLLBACK_TRANSACTION;
+ MessageHookPoints messageHookPoints = TransactionResolution.COMMIT.equals(resolution) ?
+ MessageHookPoints.COMMIT_TRANSACTION : MessageHookPoints.ROLLBACK_TRANSACTION;
doBefore(messageHookPoints, messageCommons);
final ListenableFuture<EndTransactionResponse> future =
@@ -281,7 +292,8 @@ class ProducerImpl extends ClientImpl implements Producer {
final Duration duration = stopwatch.elapsed();
final Status status = result.getStatus();
final Code code = status.getCode();
- MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+ MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK :
+ MessageHookPointsStatus.ERROR;
doAfter(messageHookPoints, messageCommons, duration, messageHookPointsStatus);
}
@@ -324,14 +336,16 @@ class ProducerImpl extends ClientImpl implements Producer {
if (!this.isRunning()) {
final IllegalStateException e = new IllegalStateException("Producer is not running now");
future.setException(e);
- LOGGER.error("Unable to send message because producer is not running, state={}, clientId={}", this.state(), clientId);
+ LOGGER.error("Unable to send message because producer is not running, state={}, clientId={}",
+ this.state(), clientId);
return future;
}
List<PublishingMessageImpl> pubMessages = new ArrayList<>();
for (Message message : messages) {
try {
- final PublishingMessageImpl pubMessage = new PublishingMessageImpl(message, producerSettings, txEnabled);
+ final PublishingMessageImpl pubMessage = new PublishingMessageImpl(message, producerSettings,
+ txEnabled);
pubMessages.add(pubMessage);
} catch (Throwable t) {
// Failed to refine message, no need to proceed.
@@ -347,7 +361,8 @@ class ProducerImpl extends ClientImpl implements Producer {
// Messages have different topics, no need to proceed.
final IllegalArgumentException e = new IllegalArgumentException("Messages to send have different topics");
future.setException(e);
- LOGGER.error("Messages to be sent have different topics, no need to proceed, topic(s)={}, clientId={}", topics, clientId);
+ LOGGER.error("Messages to be sent have different topics, no need to proceed, topic(s)={}, clientId={}",
+ topics, clientId);
return future;
}
@@ -358,9 +373,11 @@ class ProducerImpl extends ClientImpl implements Producer {
.collect(Collectors.toSet());
if (1 < messageTypes.size()) {
// Messages have different message type, no need to proceed.
- final IllegalArgumentException e = new IllegalArgumentException("Messages to send have different types, please check");
+ final IllegalArgumentException e = new IllegalArgumentException("Messages to send have different types, "
+ + "please check");
future.setException(e);
- LOGGER.error("Messages to be sent have different message types, no need to proceed, topic={}, messageType(s)={}, clientId={}", topic, messageTypes, clientId, e);
+ LOGGER.error("Messages to be sent have different message types, no need to proceed, topic={}, messageType"
+ + "(s)={}, clientId={}", topic, messageTypes, clientId, e);
return future;
}
@@ -374,9 +391,11 @@ class ProducerImpl extends ClientImpl implements Producer {
.map(Optional::get).collect(Collectors.toSet());
if (1 < messageGroups.size()) {
- final IllegalArgumentException e = new IllegalArgumentException("FIFO messages to send have different message groups, messageGroups=" + messageGroups);
+ final IllegalArgumentException e = new IllegalArgumentException("FIFO messages to send have different"
+ + " message groups, messageGroups=" + messageGroups);
future.setException(e);
- LOGGER.error("FIFO messages to be sent have different message groups, no need to proceed, topic={}, messageGroups={}, clientId={}", topic, messageGroups, clientId, e);
+ LOGGER.error("FIFO messages to be sent have different message groups, no need to proceed, topic={}, "
+ + "messageGroups={}, clientId={}", topic, messageGroups, clientId, e);
return future;
}
messageGroup = messageGroups.iterator().next();
@@ -418,28 +437,31 @@ class ProducerImpl extends ClientImpl implements Producer {
}
// Calculate the current message queue.
final MessageQueueImpl messageQueue = candidates.get(IntMath.mod(attempt - 1, candidates.size()));
-// if (!messageQueue.matchMessageType(messageType)) {
-// final IllegalArgumentException e = new IllegalArgumentException("Current message type not match with topic accept message types");
-// future.setException(e);
-// return;
-// }
+ // if (!messageQueue.matchMessageType(messageType)) {
+ // final IllegalArgumentException e = new IllegalArgumentException("Current message type not match
+ // with topic accept message types");
+ // future.setException(e);
+ // return;
+ // }
final Endpoints endpoints = messageQueue.getBroker().getEndpoints();
final SendMessageRequest request = wrapSendMessageRequest(messages);
final ListenableFuture<SendMessageResponse> responseFuture = clientManager.sendMessage(endpoints, metadata,
request, clientConfiguration.getRequestTimeout());
- final ListenableFuture<List<SendReceiptImpl>> attemptFuture = Futures.transformAsync(responseFuture, response -> {
- final SettableFuture<List<SendReceiptImpl>> future0 = SettableFuture.create();
- future0.set(SendReceiptImpl.processSendResponse(messageQueue, response));
- return future0;
- }, MoreExecutors.directExecutor());
+ final ListenableFuture<List<SendReceiptImpl>> attemptFuture = Futures.transformAsync(responseFuture,
+ response -> {
+ final SettableFuture<List<SendReceiptImpl>> future0 = SettableFuture.create();
+ future0.set(SendReceiptImpl.processSendResponse(messageQueue, response));
+ return future0;
+ }, MoreExecutors.directExecutor());
final int maxAttempts = this.getRetryPolicy().getMaxAttempts();
// Intercept before message publishing.
final Stopwatch stopwatch = Stopwatch.createStarted();
- final List<MessageCommon> messageCommons = messages.stream().map(PublishingMessageImpl::getMessageCommon).collect(Collectors.toList());
+ final List<MessageCommon> messageCommons =
+ messages.stream().map(PublishingMessageImpl::getMessageCommon).collect(Collectors.toList());
doBefore(MessageHookPoints.SEND, messageCommons);
Futures.addCallback(attemptFuture, new FutureCallback<List<SendReceiptImpl>>() {
@@ -494,16 +516,19 @@ class ProducerImpl extends ClientImpl implements Producer {
if (MessageType.TRANSACTION.equals(messageType)) {
future.setException(t);
LOGGER.error("Failed to send transactional message finally, maxAttempts=1, attempt={}, " +
- "topic={}, messageId(s), endpoints={}, clientId={}", attempt, topic, messageIds, endpoints, clientId, t);
+ "topic={}, messageId(s), endpoints={}, clientId={}", attempt, topic, messageIds, endpoints,
+ clientId, t);
return;
}
// Try to do more attempts.
int nextAttempt = 1 + attempt;
final Duration delay = ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
LOGGER.warn("Failed to send message, would attempt to resend after {}, maxAttempts={}," +
- " attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay, maxAttempts, attempt,
+ " attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay, maxAttempts,
+ attempt,
topic, messageIds, endpoints, clientId, t);
- clientManager.getScheduler().schedule(() -> send0(future, topic, messageType, candidates, messages, nextAttempt),
+ clientManager.getScheduler().schedule(() -> send0(future, topic, messageType, candidates, messages,
+ nextAttempt),
delay.toNanos(), TimeUnit.NANOSECONDS);
}
}, clientCallbackExecutor);
@@ -511,7 +536,8 @@ class ProducerImpl extends ClientImpl implements Producer {
@Override
public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
- final PublishingTopicRouteDataResult publishingTopicRouteDataResult = new PublishingTopicRouteDataResult(topicRouteDataResult);
+ final PublishingTopicRouteDataResult publishingTopicRouteDataResult =
+ new PublishingTopicRouteDataResult(topicRouteDataResult);
publishingRouteDataResultCache.put(topic, publishingTopicRouteDataResult);
}
@@ -524,7 +550,8 @@ class ProducerImpl extends ClientImpl implements Producer {
}
return Futures.transformAsync(getRouteDataResult(topic), topicRouteDataResult -> {
SettableFuture<PublishingTopicRouteDataResult> future = SettableFuture.create();
- final PublishingTopicRouteDataResult publishingTopicRouteDataResult = new PublishingTopicRouteDataResult(topicRouteDataResult);
+ final PublishingTopicRouteDataResult publishingTopicRouteDataResult =
+ new PublishingTopicRouteDataResult(topicRouteDataResult);
publishingRouteDataResultCache.put(topic, publishingTopicRouteDataResult);
future.set(publishingTopicRouteDataResult);
return future;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index 8e6e90e..9445b30 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -21,8 +21,6 @@ import apache.rocketmq.v2.Publishing;
import apache.rocketmq.v2.Settings;
import com.google.common.base.MoreObjects;
import com.google.protobuf.util.Durations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Set;
import java.util.stream.Collectors;
@@ -32,6 +30,8 @@ import org.apache.rocketmq.client.java.impl.ClientType;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ProducerSettings extends ClientSettings {
private static final Logger LOGGER = LoggerFactory.getLogger(ProducerSettings.class);
@@ -68,8 +68,11 @@ public class ProducerSettings extends ClientSettings {
@Override
public Settings toProtobuf() {
- final Publishing publishing = Publishing.newBuilder().addAllTopics(topics.stream().map(Resource::toProtobuf).collect(Collectors.toList())).build();
- final Settings.Builder builder = Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
+ final Publishing publishing =
+ Publishing.newBuilder().addAllTopics(topics.stream().map(Resource::toProtobuf)
+ .collect(Collectors.toList())).build();
+ final Settings.Builder builder =
+ Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
return builder.setBackoffPolicy(retryPolicy.toProtobuf()).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
}
@@ -78,7 +81,8 @@ public class ProducerSettings extends ClientSettings {
public void applySettingsCommand(Settings settings) {
final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
if (!Settings.PubSubCase.PUBLISHING.equals(pubSubCase)) {
- LOGGER.error("[Bug] Issued settings not match with the client type, clientId={}, pubSubCase={}, clientType={}", clientId, pubSubCase, clientType);
+ LOGGER.error("[Bug] Issued settings not match with the client type, clientId={}, pubSubCase={}, "
+ + "clientType={}", clientId, pubSubCase, clientType);
return;
}
final Publishing publishing = settings.getPublishing();
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
similarity index 94%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index 7f1ac10..e718228 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -84,7 +84,9 @@ public class SendReceiptImpl implements SendReceipt {
throwableList.add(e);
continue;
}
- final SendReceiptImpl impl = new SendReceiptImpl(MessageIdCodec.getInstance().decode(entry.getMessageId()), entry.getTransactionId(), mq, entry.getOffset());
+ final SendReceiptImpl impl =
+ new SendReceiptImpl(MessageIdCodec.getInstance().decode(entry.getMessageId()),
+ entry.getTransactionId(), mq, entry.getOffset());
sendReceipts.add(impl);
}
if (!throwableList.isEmpty()) {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
index e3f8dd0..aef1098 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
@@ -18,8 +18,6 @@
package org.apache.rocketmq.client.java.impl.producer;
import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
@@ -33,18 +31,17 @@ import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class TransactionImpl implements Transaction {
private static final Logger LOGGER = LoggerFactory.getLogger(ProducerImpl.class);
-
- private final ProducerImpl producerImpl;
-
private static final int MAX_MESSAGE_NUM = 1;
+ private final ProducerImpl producerImpl;
@GuardedBy("messagesLock")
private final Set<PublishingMessageImpl> messages;
private final ReadWriteLock messagesLock;
-
private final ConcurrentMap<PublishingMessageImpl, SendReceiptImpl> messageSendReceiptMap;
public TransactionImpl(ProducerImpl producerImpl) {
@@ -58,7 +55,8 @@ class TransactionImpl implements Transaction {
messagesLock.readLock().lock();
try {
if (messages.size() > MAX_MESSAGE_NUM) {
- throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " + MAX_MESSAGE_NUM);
+ throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " +
+ MAX_MESSAGE_NUM);
}
} finally {
messagesLock.readLock().unlock();
@@ -66,9 +64,11 @@ class TransactionImpl implements Transaction {
messagesLock.writeLock().lock();
try {
if (messages.size() >= MAX_MESSAGE_NUM) {
- throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " + MAX_MESSAGE_NUM);
+ throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " +
+ MAX_MESSAGE_NUM);
}
- final PublishingMessageImpl publishingMessage = new PublishingMessageImpl(message, producerImpl.producerSettings, true);
+ final PublishingMessageImpl publishingMessage = new PublishingMessageImpl(message,
+ producerImpl.producerSettings, true);
messages.add(publishingMessage);
return publishingMessage;
} finally {
@@ -97,7 +97,8 @@ class TransactionImpl implements Transaction {
for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) {
final PublishingMessageImpl publishingMessage = entry.getKey();
final SendReceiptImpl sendReceipt = entry.getValue();
- producerImpl.endTransaction(sendReceipt.getEndpoints(), publishingMessage.getMessageCommon(), sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.COMMIT);
+ producerImpl.endTransaction(sendReceipt.getEndpoints(), publishingMessage.getMessageCommon(),
+ sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.COMMIT);
}
}
@@ -109,7 +110,8 @@ class TransactionImpl implements Transaction {
for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) {
final PublishingMessageImpl publishingMessage = entry.getKey();
final SendReceiptImpl sendReceipt = entry.getValue();
- producerImpl.endTransaction(sendReceipt.getEndpoints(), publishingMessage.getMessageCommon(), sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.ROLLBACK);
+ producerImpl.endTransaction(sendReceipt.getEndpoints(), publishingMessage.getMessageCommon(),
+ sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.ROLLBACK);
}
}
}
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java b/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
similarity index 91%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
index 5904626..e131471 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
@@ -21,7 +21,8 @@ import ch.qos.logback.core.ConsoleAppender;
public class CustomConsoleAppender<E> extends ConsoleAppender<E> {
public static final String ENABLE_CONSOLE_APPENDER_KEY = "mq.consoleAppender.enabled";
- private final boolean enabled = Boolean.parseBoolean(System.getenv(ENABLE_CONSOLE_APPENDER_KEY)) || Boolean.parseBoolean(System.getProperty(ENABLE_CONSOLE_APPENDER_KEY));
+ private final boolean enabled = Boolean.parseBoolean(System.getenv(ENABLE_CONSOLE_APPENDER_KEY)) ||
+ Boolean.parseBoolean(System.getProperty(ENABLE_CONSOLE_APPENDER_KEY));
public CustomConsoleAppender() {
}
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/ProcessIdConverter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/logging/ProcessIdConverter.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/ProcessIdConverter.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/logging/ProcessIdConverter.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
index 8343e18..112fbb7 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.client.java.message;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -28,9 +31,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
public class MessageBuilderImpl implements MessageBuilder {
public static final Pattern TOPIC_PATTERN = Pattern.compile("^[%|a-zA-Z0-9._-]{1,127}$");
private static final int MESSAGE_BODY_LENGTH_THRESHOLD = 1024 * 1024 * 4;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
similarity index 95%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
index 33d075f..737e878 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
@@ -58,14 +58,16 @@ public class MessageCommon {
public MessageCommon(String topic, byte[] body, String tag, String messageGroup, Long deliveryTimestamp,
String parentTraceContext, Collection<String> keys, Map<String, String> properties) {
- this(null, topic, body, properties, tag, keys, messageGroup, deliveryTimestamp, null, parentTraceContext, null, null, null, null, null);
+ this(null, topic, body, properties, tag, keys, messageGroup, deliveryTimestamp, null, parentTraceContext,
+ null, null, null, null, null);
}
public MessageCommon(MessageId messageId, String topic, byte[] body, String tag, String messageGroup,
Long deliveryTimestamp, Collection<String> keys, Map<String, String> properties, String bornHost,
String traceContext, long bornTimestamp, int deliveryAttempt, Stopwatch decodeStopwatch,
Timestamp deliveryTimestampFromRemote) {
- this(messageId, topic, body, properties, tag, keys, messageGroup, deliveryTimestamp, bornHost, null, traceContext, bornTimestamp, deliveryAttempt, decodeStopwatch, deliveryTimestampFromRemote);
+ this(messageId, topic, body, properties, tag, keys, messageGroup, deliveryTimestamp, bornHost, null,
+ traceContext, bornTimestamp, deliveryAttempt, decodeStopwatch, deliveryTimestampFromRemote);
}
private MessageCommon(@Nullable MessageId messageId, String topic, byte[] body,
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
similarity index 99%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
index 9369e74..59d1578 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
@@ -53,13 +53,12 @@ import org.apache.rocketmq.client.java.misc.Utilities;
* </pre>
*/
public class MessageIdCodec {
- private static final MessageIdCodec INSTANCE = new MessageIdCodec();
-
public static final int MESSAGE_ID_LENGTH_FOR_V1_OR_LATER = 34;
-
public static final String MESSAGE_ID_VERSION_V0 = "00";
public static final String MESSAGE_ID_VERSION_V1 = "01";
+ private static final MessageIdCodec INSTANCE = new MessageIdCodec();
+
private final String processFixedStringV1;
private final long secondsSinceCustomEpoch;
private final long secondsStartTimestamp;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdImpl.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
similarity index 98%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
index 1b03ca6..2293c23 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
@@ -33,8 +33,10 @@ import org.apache.rocketmq.client.apis.message.Message;
* @see Message
*/
public class MessageImpl implements Message {
- private final String topic;
+ protected final Collection<String> keys;
+
final byte[] body;
+ private final String topic;
@Nullable
private final String tag;
@@ -45,7 +47,6 @@ public class MessageImpl implements Message {
@Nullable
private final String parentTraceContext;
- protected final Collection<String> keys;
private final Map<String, String> properties;
/**
@@ -65,10 +66,6 @@ public class MessageImpl implements Message {
this.properties = properties;
}
- public MessageCommon getMessageCommon() {
- return new MessageCommon(topic, body, tag, messageGroup, deliveryTimestamp, parentTraceContext, keys, properties);
- }
-
MessageImpl(Message message) {
this.topic = message.getTopic();
if (message instanceof MessageImpl) {
@@ -89,6 +86,11 @@ public class MessageImpl implements Message {
this.properties = message.getProperties();
}
+ public MessageCommon getMessageCommon() {
+ return new MessageCommon(topic, body, tag, messageGroup, deliveryTimestamp, parentTraceContext, keys,
+ properties);
+ }
+
/**
* @see Message#getTopic()
*/
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
similarity index 93%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
index 9e0d2b7..5b3525d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.client.java.message;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import apache.rocketmq.v2.Digest;
import apache.rocketmq.v2.DigestType;
import apache.rocketmq.v2.Encoding;
@@ -27,8 +29,6 @@ import com.google.common.base.Stopwatch;
import com.google.protobuf.ProtocolStringList;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
@@ -44,8 +44,8 @@ import org.apache.rocketmq.client.java.misc.LinkedIterator;
import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageView {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageViewImpl.class);
@@ -73,8 +73,9 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
public MessageViewImpl(MessageId messageId, String topic, byte[] body, String tag, String messageGroup,
Long deliveryTimestamp, Collection<String> keys, Map<String, String> properties,
- String bornHost, long bornTimestamp, int deliveryAttempt, MessageQueueImpl messageQueue, String receiptHandle,
- String traceContext, long offset, boolean corrupted, Timestamp deliveryTimestampFromRemote) {
+ String bornHost, long bornTimestamp, int deliveryAttempt, MessageQueueImpl messageQueue,
+ String receiptHandle, String traceContext, long offset, boolean corrupted,
+ Timestamp deliveryTimestampFromRemote) {
this.messageId = checkNotNull(messageId, "messageId should not be null");
this.topic = checkNotNull(topic, "topic should not be null");
this.body = checkNotNull(body, "body should not be null");
@@ -98,7 +99,8 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
}
public MessageCommon getMessageCommon() {
- return new MessageCommon(messageId, topic, body, tag, messageGroup, deliveryTimestamp, keys, properties, bornHost, traceContext, bornTimestamp, deliveryAttempt, decodeStopwatch, null);
+ return new MessageCommon(messageId, topic, body, tag, messageGroup, deliveryTimestamp, keys, properties,
+ bornHost, traceContext, bornTimestamp, deliveryAttempt, decodeStopwatch, null);
}
/**
@@ -264,7 +266,8 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
}
} catch (NoSuchAlgorithmException e) {
corrupted = true;
- LOGGER.error("MD5 is not supported unexpectedly, skip it, topic={}, messageId={}", topic, messageId);
+ LOGGER.error("MD5 is not supported unexpectedly, skip it, topic={}, messageId={}", topic,
+ messageId);
}
break;
case SHA1:
@@ -275,11 +278,13 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
}
} catch (NoSuchAlgorithmException e) {
corrupted = true;
- LOGGER.error("SHA-1 is not supported unexpectedly, skip it, topic={}, messageId={}", topic, messageId);
+ LOGGER.error("SHA-1 is not supported unexpectedly, skip it, topic={}, messageId={}", topic,
+ messageId);
}
break;
default:
- LOGGER.error("Unsupported message body digest algorithm, digestType={}, topic={}, messageId={}", digestType, topic, messageId);
+ LOGGER.error("Unsupported message body digest algorithm, digestType={}, topic={}, messageId={}",
+ digestType, topic, messageId);
}
final Encoding bodyEncoding = systemProperties.getBodyEncoding();
switch (bodyEncoding) {
@@ -294,7 +299,8 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
case IDENTITY:
break;
default:
- LOGGER.error("Unsupported message encoding algorithm, topic={}, messageId={}, bodyEncoding={}", topic, messageId, bodyEncoding);
+ LOGGER.error("Unsupported message encoding algorithm, topic={}, messageId={}, bodyEncoding={}", topic,
+ messageId, bodyEncoding);
}
String tag = systemProperties.hasTag() ? systemProperties.getTag() : null;
@@ -309,7 +315,8 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
final Map<String, String> properties = message.getUserPropertiesMap();
final String receiptHandle = systemProperties.getReceiptHandle();
String traceContext = systemProperties.hasTraceContext() ? systemProperties.getTraceContext() : null;
- return new MessageViewImpl(messageId, topic, body, tag, messageGroup, deliveryTimestamp, keys, properties, bornHost, bornTimestamp, deliveryAttempt, mq, receiptHandle, traceContext, offset, corrupted, timestamp);
+ return new MessageViewImpl(messageId, topic, body, tag, messageGroup, deliveryTimestamp, keys, properties,
+ bornHost, bornTimestamp, deliveryAttempt, mq, receiptHandle, traceContext, offset, corrupted, timestamp);
}
@Override
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 97ea1ea..7c6ea9c 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -45,7 +45,8 @@ public class PublishingMessageImpl extends MessageImpl {
private final MessageType messageType;
private volatile String traceContext;
- public PublishingMessageImpl(Message message, ProducerSettings producerSettings, boolean txEnabled) throws IOException {
+ public PublishingMessageImpl(Message message, ProducerSettings producerSettings, boolean txEnabled)
+ throws IOException {
super(message);
this.traceContext = null;
final int length = message.getBody().remaining();
@@ -66,7 +67,8 @@ public class PublishingMessageImpl extends MessageImpl {
body = new byte[length];
message.getBody().get(body);
}
- final byte[] compressed = Utilities.compressBytesGzip(body, producerSettings.getMessageGzipCompressionLevel());
+ final byte[] compressed = Utilities.compressBytesGzip(body,
+ producerSettings.getMessageGzipCompressionLevel());
this.compressedBody = ByteBuffer.wrap(compressed).asReadOnlyBuffer();
this.encoding = Encoding.GZIP;
} else {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/protocol/Encoding.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Encoding.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/protocol/Encoding.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Encoding.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
similarity index 88%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
index 3a9b4bd..edec539 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.client.java.metrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
@@ -51,6 +49,8 @@ import org.apache.rocketmq.client.java.impl.ClientImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.rpc.AuthInterceptor;
import org.apache.rocketmq.client.java.rpc.IpNameResolverFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MessageMeter {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageMeter.class);
@@ -83,51 +83,63 @@ public class MessageMeter {
}
LongCounter getSendSuccessTotalCounter() {
- return counterMap.computeIfAbsent(MetricName.SEND_SUCCESS_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+ return counterMap.computeIfAbsent(MetricName.SEND_SUCCESS_TOTAL,
+ name -> meter.counterBuilder(name.getName()).build());
}
LongCounter getSendFailureTotalCounter() {
- return counterMap.computeIfAbsent(MetricName.SEND_FAILURE_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+ return counterMap.computeIfAbsent(MetricName.SEND_FAILURE_TOTAL,
+ name -> meter.counterBuilder(name.getName()).build());
}
DoubleHistogram getSendSuccessCostTimeHistogram() {
- return histogramMap.computeIfAbsent(MetricName.SEND_SUCCESS_COST_TIME, name -> meter.histogramBuilder(name.getName()).build());
+ return histogramMap.computeIfAbsent(MetricName.SEND_SUCCESS_COST_TIME,
+ name -> meter.histogramBuilder(name.getName()).build());
}
LongCounter getProcessSuccessTotalCounter() {
- return counterMap.computeIfAbsent(MetricName.PROCESS_SUCCESS_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+ return counterMap.computeIfAbsent(MetricName.PROCESS_SUCCESS_TOTAL,
+ name -> meter.counterBuilder(name.getName()).build());
}
LongCounter getProcessFailureTotalCounter() {
- return counterMap.computeIfAbsent(MetricName.PROCESS_FAILURE_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+ return counterMap.computeIfAbsent(MetricName.PROCESS_FAILURE_TOTAL,
+ name -> meter.counterBuilder(name.getName()).build());
}
DoubleHistogram getProcessCostTimeHistogram() {
- return histogramMap.computeIfAbsent(MetricName.PROCESS_TIME, name -> meter.histogramBuilder(name.getName()).build());
+ return histogramMap.computeIfAbsent(MetricName.PROCESS_TIME,
+ name -> meter.histogramBuilder(name.getName()).build());
}
LongCounter getAckSuccessTotalCounter() {
- return counterMap.computeIfAbsent(MetricName.ACK_SUCCESS_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+ return counterMap.computeIfAbsent(MetricName.ACK_SUCCESS_TOTAL,
+ name -> meter.counterBuilder(name.getName()).build());
}
LongCounter getAckFailureTotalCounter() {
- return counterMap.computeIfAbsent(MetricName.ACK_FAILURE_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+ return counterMap.computeIfAbsent(MetricName.ACK_FAILURE_TOTAL,
+ name -> meter.counterBuilder(name.getName()).build());
}
LongCounter getChangeInvisibleDurationSuccessCounter() {
- return counterMap.computeIfAbsent(MetricName.CHANGE_INVISIBLE_DURATION_SUCCESS_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+ return counterMap.computeIfAbsent(MetricName.CHANGE_INVISIBLE_DURATION_SUCCESS_TOTAL,
+ name -> meter.counterBuilder(name.getName()).build());
}
LongCounter getChangeInvisibleDurationFailureCounter() {
- return counterMap.computeIfAbsent(MetricName.CHANGE_INVISIBLE_DURATION_FAILURE_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+ return counterMap.computeIfAbsent(MetricName.CHANGE_INVISIBLE_DURATION_FAILURE_TOTAL,
+ name -> meter.counterBuilder(name.getName()).build());
}
DoubleHistogram getMessageDeliveryLatencyHistogram() {
- return histogramMap.computeIfAbsent(MetricName.DELIVERY_LATENCY, name -> meter.histogramBuilder(name.getName()).build());
+ return histogramMap.computeIfAbsent(MetricName.DELIVERY_LATENCY,
+ name -> meter.histogramBuilder(name.getName()).build());
}
DoubleHistogram getMessageAwaitTimeHistogram() {
- return histogramMap.computeIfAbsent(MetricName.AWAIT_TIME, name -> meter.histogramBuilder(name.getName()).build());
+ return histogramMap.computeIfAbsent(MetricName.AWAIT_TIME,
+ name -> meter.histogramBuilder(name.getName()).build());
}
@SuppressWarnings("deprecation")
@@ -138,12 +150,14 @@ public class MessageMeter {
}
final Optional<Endpoints> optionalEndpoints = metric.tryGetMetricEndpoints();
if (!optionalEndpoints.isPresent()) {
- LOGGER.error("[Bug] Metric switch is on but endpoints is not filled, clientId={}", client.getClientId());
+ LOGGER.error("[Bug] Metric switch is on but endpoints is not filled, clientId={}",
+ client.getClientId());
return;
}
final Endpoints newMetricEndpoints = optionalEndpoints.get();
if (newMetricEndpoints.equals(metricEndpoints)) {
- LOGGER.debug("Message metric exporter endpoints remains the same, clientId={}, endpoints={}", client.getClientId(), newMetricEndpoints);
+ LOGGER.debug("Message metric exporter endpoints remains the same, clientId={}, endpoints={}",
+ client.getClientId(), newMetricEndpoints);
return;
}
final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
@@ -172,13 +186,15 @@ public class MessageMeter {
.setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(0.1d, 1d, 10d, 30d, 60d)))
.build();
- PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter).setInterval(Duration.ofSeconds(1)).build();
+ PeriodicMetricReader reader =
+ PeriodicMetricReader.builder(exporter).setInterval(Duration.ofSeconds(1)).build();
provider = SdkMeterProvider.builder().registerMetricReader(reader)
.registerView(instrumentSelector, view)
.build();
final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build();
meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
- LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", client.getClientId(), metricEndpoints, newMetricEndpoints);
+ LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", client.getClientId(),
+ metricEndpoints, newMetricEndpoints);
this.reset();
metricEndpoints = newMetricEndpoints;
} catch (Throwable t) {
@@ -205,7 +221,8 @@ public class MessageMeter {
try {
latch.await();
} catch (Throwable t) {
- LOGGER.error("Exception raised while waiting for the shutdown of meter, clientId={}", client.getClientId());
+ LOGGER.error("Exception raised while waiting for the shutdown of meter, clientId={}",
+ client.getClientId());
}
}
LOGGER.info("Shutdown the message meter, clientId={}", client.getClientId());
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
similarity index 98%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
index 97a6657..440f89d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
@@ -19,8 +19,6 @@ package org.apache.rocketmq.client.java.metrics;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
@@ -34,6 +32,8 @@ import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
import org.apache.rocketmq.client.java.impl.ClientImpl;
import org.apache.rocketmq.client.java.message.MessageCommon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MetricMessageInterceptor implements MessageInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(MetricMessageInterceptor.class);
@@ -159,8 +159,10 @@ public class MetricMessageInterceptor implements MessageInterceptor {
}
private void doAfterChangInvisibleDuration(List<MessageCommon> messageCommons, MessageHookPointsStatus status) {
- final LongCounter changeInvisibleDurationSuccessCounter = messageMeter.getChangeInvisibleDurationSuccessCounter();
- final LongCounter changeInvisibleDurationFailureCounter = messageMeter.getChangeInvisibleDurationFailureCounter();
+ final LongCounter changeInvisibleDurationSuccessCounter =
+ messageMeter.getChangeInvisibleDurationSuccessCounter();
+ final LongCounter changeInvisibleDurationFailureCounter =
+ messageMeter.getChangeInvisibleDurationFailureCounter();
final Optional<String> optionalConsumerGroup = messageMeter.tryGetConsumerGroup();
if (!optionalConsumerGroup.isPresent()) {
LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", messageMeter.getClient());
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
similarity index 96%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
index 5cc8465..d7a2d28 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
@@ -17,14 +17,15 @@
package org.apache.rocketmq.client.java.metrics;
-import io.opentelemetry.api.common.AttributeKey;
-
import static io.opentelemetry.api.common.AttributeKey.stringKey;
+import io.opentelemetry.api.common.AttributeKey;
+
public class RocketmqAttributes {
public static final AttributeKey<String> TOPIC = stringKey("topic");
-
public static final AttributeKey<String> CLIENT_ID = stringKey("client_id");
-
public static final AttributeKey<String> CONSUMER_GROUP = stringKey("consumer_group");
+
+ private RocketmqAttributes() {
+ }
}
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
index ff0e8af..c4e1c71 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
@@ -18,12 +18,12 @@
package org.apache.rocketmq.client.java.misc;
import com.google.common.util.concurrent.AbstractIdleService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A driver to notify downstream to dispatch task.
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/ExecutorServices.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/ExecutorServices.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/ExecutorServices.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/ExecutorServices.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/MetadataUtils.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/MetadataUtils.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/MetadataUtils.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/MetadataUtils.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/RequestIdGenerator.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/RequestIdGenerator.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/RequestIdGenerator.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/RequestIdGenerator.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/ThreadFactoryImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/ThreadFactoryImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/ThreadFactoryImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/ThreadFactoryImpl.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
similarity index 98%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
index 639365f..2705cd7 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
@@ -60,12 +60,14 @@ public class Utilities {
/**
* Used to build output as Hex
*/
- private static final char[] DIGITS_LOWER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+ private static final char[] DIGITS_LOWER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd',
+ 'e', 'f'};
/**
* Used to build output as Hex
*/
- private static final char[] DIGITS_UPPER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
+ private static final char[] DIGITS_UPPER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D',
+ 'E', 'F'};
private static final String CLIENT_ID_SEPARATOR = "@";
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
similarity index 95%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
index 5c177bf..2157613 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.client.java.retry;
+import static apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
+import static com.google.common.base.Preconditions.checkArgument;
+
import apache.rocketmq.v2.CustomizedBackoff;
import com.google.common.base.MoreObjects;
import com.google.protobuf.util.Durations;
@@ -24,9 +27,6 @@ import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
-import static apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
-import static com.google.common.base.Preconditions.checkArgument;
-
public class CustomizedBackoffRetryPolicy implements RetryPolicy {
private final List<Duration> durations;
private final int maxAttempts;
@@ -70,7 +70,9 @@ public class CustomizedBackoffRetryPolicy implements RetryPolicy {
@Override
public apache.rocketmq.v2.RetryPolicy toProtobuf() {
CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder()
- .addAllNext(durations.stream().map(duration -> Durations.fromNanos(duration.toNanos())).collect(Collectors.toList()))
+ .addAllNext(durations.stream()
+ .map(duration -> Durations.fromNanos(duration.toNanos()))
+ .collect(Collectors.toList()))
.build();
return apache.rocketmq.v2.RetryPolicy.newBuilder()
.setMaxAttempts(maxAttempts)
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
similarity index 96%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
index b216f06..79f033a 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
@@ -17,17 +17,18 @@
package org.apache.rocketmq.client.java.retry;
+import static apache.rocketmq.v2.RetryPolicy.StrategyCase.EXPONENTIAL_BACKOFF;
+import static com.google.common.base.Preconditions.checkArgument;
+
import apache.rocketmq.v2.ExponentialBackoff;
import com.google.common.base.MoreObjects;
import com.google.protobuf.util.Durations;
import java.time.Duration;
import java.util.Random;
-import static apache.rocketmq.v2.RetryPolicy.StrategyCase.EXPONENTIAL_BACKOFF;
-import static com.google.common.base.Preconditions.checkArgument;
-
/**
- * The {@link ExponentialBackoffRetryPolicy} defines a policy to do more attempts when failure is encountered, mainly refer to
+ * The {@link ExponentialBackoffRetryPolicy} defines a policy to do more attempts when failure is encountered, mainly
+ * refer to
* <a href="https://github.com/grpc/proposal/blob/master/A6-client-retries.md">gRPC Retry Design</a>.
*/
public class ExponentialBackoffRetryPolicy implements RetryPolicy {
@@ -62,7 +63,8 @@ public class ExponentialBackoffRetryPolicy implements RetryPolicy {
@Override
public Duration getNextAttemptDelay(int attempt) {
checkArgument(attempt > 0, "attempt must be positive");
- int randomNumberBound = (int) Math.min(initialBackoff.toNanos() * Math.pow(backoffMultiplier, 1.0 * (attempt - 1)),
+ int randomNumberBound = (int) Math.min(initialBackoff.toNanos() * Math.pow(backoffMultiplier,
+ 1.0 * (attempt - 1)),
maxBackoff.toNanos());
if (randomNumberBound <= 0) {
return Duration.ZERO;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Address.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Address.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Address.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/Address.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/AddressScheme.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/AddressScheme.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/AddressScheme.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/AddressScheme.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Broker.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
similarity index 96%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
index 590149c..ab77a3d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
@@ -34,7 +34,8 @@ public class Broker {
}
public apache.rocketmq.v2.Broker toProtobuf() {
- return apache.rocketmq.v2.Broker.newBuilder().setName(name).setId(id).setEndpoints(endpoints.toProtobuf()).build();
+ return apache.rocketmq.v2.Broker.newBuilder().setName(name).setId(id).setEndpoints(endpoints.toProtobuf())
+ .build();
}
public String getName() {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
similarity index 98%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
index 8e50e76..4a4f562 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.client.java.route;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import com.google.common.base.Objects;
import com.google.common.net.InternetDomainName;
import java.net.InetSocketAddress;
@@ -24,12 +26,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
-import static com.google.common.base.Preconditions.checkNotNull;
-
public class Endpoints {
public static final int DEFAULT_PORT = 80;
- private static final Pattern IPV4_HOST_PATTERN = Pattern.compile("^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])*$");
+ private static final Pattern IPV4_HOST_PATTERN = Pattern.compile("^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0"
+ + "-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])*$");
private static final String ENDPOINT_SEPARATOR = ";";
private static final String ADDRESS_SEPARATOR = ",";
private static final String COLON = ":";
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Permission.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Permission.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Permission.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/Permission.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
index dbed25c..a0725a4 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
@@ -65,7 +65,8 @@ public class TopicRouteData {
public Endpoints pickEndpointsToQueryAssignments() throws ResourceNotFoundException {
int nextIndex = index.getAndIncrement();
for (int i = 0; i < messageQueueImpls.size(); i++) {
- final MessageQueueImpl messageQueueImpl = messageQueueImpls.get(IntMath.mod(nextIndex++, messageQueueImpls.size()));
+ final MessageQueueImpl messageQueueImpl = messageQueueImpls.get(IntMath.mod(nextIndex++,
+ messageQueueImpls.size()));
final Broker broker = messageQueueImpl.getBroker();
if (Utilities.MASTER_BROKER_ID != broker.getId()) {
continue;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
index dd0f553..1efb034 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
@@ -17,13 +17,13 @@
package org.apache.rocketmq.client.java.route;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import apache.rocketmq.v2.Status;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import javax.annotation.concurrent.Immutable;
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* Result topic route data fetched from remote.
*/
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
index 11ca662..66820e6 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.client.java.rpc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
@@ -27,6 +25,8 @@ import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AuthInterceptor implements ClientInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(AuthInterceptor.class);
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/IpNameResolverFactory.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/IpNameResolverFactory.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/IpNameResolverFactory.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/IpNameResolverFactory.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
similarity index 86%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
index b6f7ba9..bff53d7 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.client.java.rpc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
@@ -28,6 +26,8 @@ import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The client log interceptor based on grpc can track any remote procedure call that interacts with the client locally.
@@ -53,18 +53,21 @@ public class LoggingInterceptor implements ClientInterceptor {
return new ForwardingClientCall.SimpleForwardingClientCall<T, E>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<E> responseListener, final Metadata headers) {
- LOGGER.trace("gRPC request header, rpcId={}, serviceName={}, methodName={}, authority={}, headers={}", rpcId, serviceName, methodName, authority, headers);
+ LOGGER.trace("gRPC request header, rpcId={}, serviceName={}, methodName={}, authority={}, headers={}",
+ rpcId, serviceName, methodName, authority, headers);
Listener<E> observabilityListener =
new ForwardingClientCallListener.SimpleForwardingClientCallListener<E>(responseListener) {
@Override
public void onMessage(E response) {
- LOGGER.trace("gRPC response, rpcId={}, serviceName={}, methodName={}, content:\n{}", rpcId, serviceName, methodName, response);
+ LOGGER.trace("gRPC response, rpcId={}, serviceName={}, methodName={}, content:\n{}",
+ rpcId, serviceName, methodName, response);
super.onMessage(response);
}
@Override
public void onHeaders(Metadata headers) {
- LOGGER.trace("gRPC response header, rpcId={}, serviceName={}, methodName={}, authority={}, headers={}", rpcId, serviceName, methodName, authority, headers);
+ LOGGER.trace("gRPC response header, rpcId={}, serviceName={}, methodName={}, "
+ + "authority={}, headers={}", rpcId, serviceName, methodName, authority, headers);
super.onHeaders(headers);
}
};
@@ -73,7 +76,8 @@ public class LoggingInterceptor implements ClientInterceptor {
@Override
public void sendMessage(T request) {
- LOGGER.trace("gRPC request, rpcId={}, serviceName={}, methodName={}, content:\n{}", rpcId, serviceName, methodName, request);
+ LOGGER.trace("gRPC request, rpcId={}, serviceName={}, methodName={}, content:\n{}", rpcId,
+ serviceName, methodName, request);
super.sendMessage(request);
}
};
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
similarity index 99%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index d31a64e..15b94aa 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -81,7 +81,7 @@ public class RpcClientImpl implements RpcClient {
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forTarget(endpoints.getGrpcTarget())
-// .withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
+ // .withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.keepAliveTime(KEEP_ALIVE_DURATION.toNanos(), TimeUnit.NANOSECONDS)
.maxInboundMessageSize(GRPC_MAX_MESSAGE_SIZE)
.intercept(LoggingInterceptor.getInstance())
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
diff --git a/java/client-java/src/main/resources-filtered/rocketmq.metadata.properties b/java/client/src/main/resources-filtered/rocketmq.metadata.properties
similarity index 100%
rename from java/client-java/src/main/resources-filtered/rocketmq.metadata.properties
rename to java/client/src/main/resources-filtered/rocketmq.metadata.properties
diff --git a/java/client-java/src/main/resources/META-INF/services/org.apache.rocketmq.client.apis.ClientServiceProvider b/java/client/src/main/resources/META-INF/services/org.apache.rocketmq.client.apis.ClientServiceProvider
similarity index 100%
rename from java/client-java/src/main/resources/META-INF/services/org.apache.rocketmq.client.apis.ClientServiceProvider
rename to java/client/src/main/resources/META-INF/services/org.apache.rocketmq.client.apis.ClientServiceProvider
diff --git a/java/client-java/src/main/resources/logback.xml b/java/client/src/main/resources/logback.xml
similarity index 100%
rename from java/client-java/src/main/resources/logback.xml
rename to java/client/src/main/resources/logback.xml
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
similarity index 86%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
index 23992be..ca198d9 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.client.java;
+import static org.junit.Assert.assertEquals;
+
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl;
import org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerBuilderImpl;
@@ -24,8 +26,6 @@ import org.apache.rocketmq.client.java.impl.producer.ProducerBuilderImpl;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
public class ClientServiceProviderImplTest {
@Test
@@ -35,12 +35,14 @@ public class ClientServiceProviderImplTest {
@Test
public void testNewPushConsumerBuilder() {
- assertEquals(PushConsumerBuilderImpl.class, ClientServiceProvider.loadService().newPushConsumerBuilder().getClass());
+ assertEquals(PushConsumerBuilderImpl.class,
+ ClientServiceProvider.loadService().newPushConsumerBuilder().getClass());
}
@Test
public void testNewSimpleConsumerBuilder() {
- assertEquals(SimpleConsumerBuilderImpl.class, ClientServiceProvider.loadService().newSimpleConsumerBuilder().getClass());
+ assertEquals(SimpleConsumerBuilderImpl.class,
+ ClientServiceProvider.loadService().newSimpleConsumerBuilder().getClass());
}
@Test
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
similarity index 100%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
similarity index 90%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 7312c52..7c9baeb 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -17,6 +17,15 @@
package org.apache.rocketmq.client.java.impl.consumer;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
@@ -44,15 +53,6 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
@RunWith(MockitoJUnitRunner.class)
public class ProcessQueueImplTest extends TestBase {
@Mock
@@ -116,9 +116,11 @@ public class ProcessQueueImplTest extends TestBase {
when(retryPolicy.getNextAttemptDelay(anyInt())).thenReturn(Duration.ofSeconds(1));
when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
when(pushConsumerSettings.isFifo()).thenReturn(false);
- when(pushConsumer.changInvisibleDuration(any(MessageViewImpl.class), any(Duration.class))).thenReturn(okChangeInvisibleDurationFuture());
+ when(pushConsumer.changInvisibleDuration(any(MessageViewImpl.class), any(Duration.class)))
+ .thenReturn(okChangeInvisibleDurationFuture());
processQueue.cacheMessages(messageViewList);
- verify(pushConsumer, times(1)).changInvisibleDuration(any(MessageViewImpl.class), any(Duration.class));
+ verify(pushConsumer, times(1))
+ .changInvisibleDuration(any(MessageViewImpl.class), any(Duration.class));
}
@Test
@@ -127,9 +129,11 @@ public class ProcessQueueImplTest extends TestBase {
List<MessageViewImpl> messageViewList = new ArrayList<>();
messageViewList.add(corruptedMessageView0);
when(pushConsumerSettings.isFifo()).thenReturn(true);
- when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class))).thenReturn(okForwardMessageToDeadLetterQueueResponseFuture());
+ when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class)))
+ .thenReturn(okForwardMessageToDeadLetterQueueResponseFuture());
processQueue.cacheMessages(messageViewList);
- verify(pushConsumer, times(1)).forwardMessageToDeadLetterQueue(any(MessageViewImpl.class));
+ verify(pushConsumer, times(1))
+ .forwardMessageToDeadLetterQueue(any(MessageViewImpl.class));
final Iterator<MessageViewImpl> iterator = processQueue.tryTakeFifoMessages();
assertFalse(iterator.hasNext());
}
@@ -147,10 +151,12 @@ public class ProcessQueueImplTest extends TestBase {
ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(fakeEndpoints(), status, messageViewList);
SettableFuture<ReceiveMessageResult> future0 = SettableFuture.create();
future0.set(receiveMessageResult);
- when(pushConsumer.receiveMessage(any(ReceiveMessageRequest.class), any(MessageQueueImpl.class), any(Duration.class))).thenReturn(future0);
+ when(pushConsumer.receiveMessage(any(ReceiveMessageRequest.class), any(MessageQueueImpl.class),
+ any(Duration.class))).thenReturn(future0);
when(pushConsumerSettings.getReceiveBatchSize()).thenReturn(32);
ReceiveMessageRequest request = ReceiveMessageRequest.newBuilder().build();
- when(pushConsumer.wrapReceiveMessageRequest(anyInt(), any(MessageQueueImpl.class), any(FilterExpression.class))).thenReturn(request);
+ when(pushConsumer.wrapReceiveMessageRequest(anyInt(), any(MessageQueueImpl.class),
+ any(FilterExpression.class))).thenReturn(request);
processQueue.fetchMessageImmediately();
Thread.sleep(ProcessQueueImpl.RECEIVE_LATER_DELAY.toMillis() / 2);
verify(pushConsumer, times(cachedMessagesCountThresholdPerQueue))
@@ -175,7 +181,8 @@ public class ProcessQueueImplTest extends TestBase {
final ListenableFuture<AckMessageResponse> future = okAckMessageResponseFuture();
when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future);
processQueue.eraseMessage(optionalMessageView.get(), ConsumeResult.SUCCESS);
- future.addListener(() -> verify(pushConsumer, times(1)).ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
+ future.addListener(() -> verify(pushConsumer, times(1))
+ .ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
assertEquals(processQueue.cachedMessagesCount(), cachedMessageCount - 1);
assertEquals(processQueue.inflightMessagesCount(), 0);
assertEquals(consumptionOkQuantity.get(), 1);
@@ -220,7 +227,8 @@ public class ProcessQueueImplTest extends TestBase {
when(retryPolicy.getMaxAttempts()).thenReturn(1);
when(pushConsumer.getConsumptionExecutor()).thenReturn(SINGLE_THREAD_POOL_EXECUTOR);
final ListenableFuture<Void> future = processQueue.eraseFifoMessage(messageView, ConsumeResult.SUCCESS);
- future.addListener(() -> verify(pushConsumer, times(1)).ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
+ future.addListener(() -> verify(pushConsumer, times(1))
+ .ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
}
@Test
@@ -229,13 +237,15 @@ public class ProcessQueueImplTest extends TestBase {
final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
messageViewList.add(messageView);
processQueue.cacheMessages(messageViewList);
- ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future0 = okForwardMessageToDeadLetterQueueResponseFuture();
+ ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future0 =
+ okForwardMessageToDeadLetterQueueResponseFuture();
when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class))).thenReturn(future0);
when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
when(retryPolicy.getMaxAttempts()).thenReturn(1);
when(pushConsumer.getConsumptionExecutor()).thenReturn(SINGLE_THREAD_POOL_EXECUTOR);
final ListenableFuture<Void> future = processQueue.eraseFifoMessage(messageView, ConsumeResult.FAILURE);
- future.addListener(() -> verify(pushConsumer, times(1)).forwardMessageToDeadLetterQueue(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
+ future.addListener(() -> verify(pushConsumer, times(1))
+ .forwardMessageToDeadLetterQueue(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
}
@Test
@@ -254,6 +264,7 @@ public class ProcessQueueImplTest extends TestBase {
when(consumeService.consume(any(MessageViewImpl.class), any(Duration.class))).thenReturn(consumeFuture);
when(pushConsumer.getConsumeService()).thenReturn(consumeService);
final ListenableFuture<Void> future = processQueue.eraseFifoMessage(messageView, ConsumeResult.FAILURE);
- future.addListener(() -> verify(pushConsumer, times(1)).ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
+ future.addListener(() -> verify(pushConsumer, times(1))
+ .ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
}
}
\ No newline at end of file
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
similarity index 96%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
index e1c6673..cb37b6b 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
@@ -72,7 +72,8 @@ public class PushConsumerBuilderImplTest extends TestBase {
@Test(expected = IllegalArgumentException.class)
public void testBuildWithoutExpressions() throws ClientException {
final PushConsumerBuilderImpl builder = new PushConsumerBuilderImpl();
- ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+ ClientConfiguration clientConfiguration =
+ ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_GROUP_0)
.setMessageListener(messageView -> ConsumeResult.SUCCESS)
.build();
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
similarity index 80%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
index deff901..54e507c 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
@@ -17,6 +17,13 @@
package org.apache.rocketmq.client.java.impl.consumer;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import apache.rocketmq.v2.CustomizedBackoff;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryRouteRequest;
@@ -33,16 +40,16 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.tool.TestBase;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -50,13 +57,6 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
@RunWith(MockitoJUnitRunner.class)
public class PushConsumerImplTest extends TestBase {
@Mock
@@ -77,21 +77,27 @@ public class PushConsumerImplTest extends TestBase {
private final int maxCacheMessageSizeInBytes = 1024;
private final int consumptionThreadCount = 4;
- private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+ private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+ .setEndpoints(FAKE_ACCESS_POINT).build();
private PushConsumerImpl pushConsumer;
private void start(PushConsumerImpl pushConsumer) throws ClientException {
- when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class)))
+ when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
+ any(Duration.class)))
.thenReturn(okQueryRouteResponseFuture());
- when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class)))
+ when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+ any(TelemetrySession.class)))
.thenReturn(telemetryRequestObserver);
- final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("TestScheduler"));
+ final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
+ "TestScheduler"));
when(clientManager.getScheduler()).thenReturn(scheduler);
doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
- CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder().addNext(Durations.fromNanos(Duration.ofSeconds(3).toNanos())).build();
- RetryPolicy retryPolicy = RetryPolicy.newBuilder().setMaxAttempts(17).setCustomizedBackoff(customizedBackoff).build();
+ CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder()
+ .addNext(Durations.fromNanos(Duration.ofSeconds(3).toNanos())).build();
+ RetryPolicy retryPolicy = RetryPolicy.newBuilder().setMaxAttempts(17).setCustomizedBackoff(customizedBackoff)
+ .build();
Subscription subscription = Subscription.newBuilder().build();
Settings settings = Settings.newBuilder().setSubscription(subscription).setBackoffPolicy(retryPolicy).build();
final Service service = pushConsumer.startAsync();
@@ -108,14 +114,19 @@ public class PushConsumerImplTest extends TestBase {
@Test
public void testScanAssignment() throws ExecutionException, InterruptedException, ClientException {
- pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+ pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
+ maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
start(pushConsumer);
- when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class))).thenReturn(okQueryAssignmentResponseFuture());
+ when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class),
+ any(Duration.class))).thenReturn(okQueryAssignmentResponseFuture());
pushConsumer.scanAssignments();
- verify(clientManager, atLeast(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
- verify(clientManager, atLeast(1)).queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class));
+ verify(clientManager, atLeast(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+ any(QueryRouteRequest.class), any(Duration.class));
+ verify(clientManager, atLeast(1)).queryAssignment(any(Endpoints.class),
+ any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class));
Assert.assertEquals(okQueryAssignmentResponseFuture().get().getAssignmentsCount(), pushConsumer.getQueueSize());
- when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class))).thenReturn(okEmptyQueryAssignmentResponseFuture());
+ when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class),
+ any(Duration.class))).thenReturn(okEmptyQueryAssignmentResponseFuture());
pushConsumer.scanAssignments();
Assert.assertEquals(0, pushConsumer.getQueueSize());
shutdown(pushConsumer);
@@ -123,19 +134,22 @@ public class PushConsumerImplTest extends TestBase {
@Test(expected = IllegalStateException.class)
public void testSubscribeWithoutStart() throws ClientException {
- pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+ pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
+ maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
pushConsumer.subscribe(FAKE_TOPIC_0, new FilterExpression(FAKE_TOPIC_0));
}
@Test(expected = IllegalStateException.class)
public void testUnsubscribeWithoutStart() {
- pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+ pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
+ maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
pushConsumer.unsubscribe(FAKE_TOPIC_0);
}
@Test
public void testSubscribeWithSubscriptionOverwriting() throws ClientException {
- pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+ pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
+ maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
start(pushConsumer);
final FilterExpression filterExpression = new FilterExpression(FAKE_TAG_0);
pushConsumer.subscribe(FAKE_TOPIC_0, filterExpression);
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
similarity index 94%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
index 315af9b..31609c4 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
@@ -47,7 +47,8 @@ public class SimpleConsumerBuilderTest extends TestBase {
@Test(expected = IllegalArgumentException.class)
public void testBuildWithoutExpressions() throws ClientException {
final SimpleConsumerBuilderImpl builder = new SimpleConsumerBuilderImpl();
- ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+ ClientConfiguration clientConfiguration =
+ ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_GROUP_0)
.build();
}
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
similarity index 84%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index 59a2530..9568a81 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -17,6 +17,14 @@
package org.apache.rocketmq.client.java.impl.consumer;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.Broker;
@@ -53,23 +61,15 @@ import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
import org.apache.rocketmq.client.java.impl.TelemetrySession;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.tool.TestBase;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
@RunWith(MockitoJUnitRunner.class)
public class SimpleConsumerImplTest extends TestBase {
@Mock
@@ -78,10 +78,11 @@ public class SimpleConsumerImplTest extends TestBase {
private StreamObserver<TelemetryCommand> telemetryRequestObserver;
@InjectMocks
private ClientManagerRegistry clientManagerRegistry = ClientManagerRegistry.getInstance();
- private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+ private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+ .setEndpoints(FAKE_ACCESS_POINT).build();
private final Duration awaitDuration = Duration.ofSeconds(3);
- private final Map<String, FilterExpression> subscriptionExpressions = createSubscriptionExpressions(FAKE_TOPIC_0);
+ private final Map<String, FilterExpression> subExpressions = createSubscriptionExpressions(FAKE_TOPIC_0);
private SimpleConsumerImpl simpleConsumer;
@@ -91,15 +92,20 @@ public class SimpleConsumerImplTest extends TestBase {
List<MessageQueue> messageQueueList = new ArrayList<>();
MessageQueue mq = MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
.setPermission(Permission.READ_WRITE)
- .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0())).setId(0).build();
+ .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0())).setId(0)
+ .build();
messageQueueList.add(mq);
- QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status).addAllMessageQueues(messageQueueList).build();
+ QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status)
+ .addAllMessageQueues(messageQueueList).build();
future0.set(response);
- when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class)))
+ when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
+ any(Duration.class)))
.thenReturn(future0);
- when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class)))
+ when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+ any(TelemetrySession.class)))
.thenReturn(telemetryRequestObserver);
- final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("TestScheduler"));
+ final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
+ new ThreadFactoryImpl("TestScheduler"));
when(clientManager.getScheduler()).thenReturn(scheduler);
doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
@@ -119,38 +125,38 @@ public class SimpleConsumerImplTest extends TestBase {
@Test(expected = IllegalStateException.class)
public void testReceiveWithoutStart() throws ClientException {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
simpleConsumer.receive(1, Duration.ofSeconds(1));
}
@Test(expected = IllegalStateException.class)
public void testAckWithoutStart() throws ClientException {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
simpleConsumer.ack(fakeMessageViewImpl());
}
@Test(expected = IllegalStateException.class)
public void testSubscribeWithoutStart() throws ClientException {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
simpleConsumer.subscribe(FAKE_TOPIC_1, FilterExpression.SUB_ALL);
}
@Test(expected = IllegalStateException.class)
public void testUnsubscribeWithoutStart() {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
simpleConsumer.unsubscribe(FAKE_TOPIC_0);
}
@Test
public void testStartAndShutdown() throws ClientException {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
start(simpleConsumer);
shutdown(simpleConsumer);
}
@Test
public void testSubscribeWithSubscriptionOverwriting() throws ClientException {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
start(simpleConsumer);
final FilterExpression filterExpression = new FilterExpression(FAKE_TAG_0);
simpleConsumer.subscribe(FAKE_TOPIC_0, filterExpression);
@@ -159,7 +165,7 @@ public class SimpleConsumerImplTest extends TestBase {
@Test(expected = IllegalArgumentException.class)
public void testReceiveWithAllTopicsAreUnsubscribed() throws ClientException {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
start(simpleConsumer);
simpleConsumer.unsubscribe(FAKE_TOPIC_0);
try {
@@ -171,25 +177,29 @@ public class SimpleConsumerImplTest extends TestBase {
@Test
public void testReceiveMessageSuccess() throws ClientException {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
start(simpleConsumer);
int receivedMessageCount = 16;
- final ListenableFuture<Iterator<ReceiveMessageResponse>> future = okReceiveMessageResponsesFuture(FAKE_TOPIC_0, receivedMessageCount);
- when(clientManager.receiveMessage(any(Endpoints.class), any(Metadata.class), any(ReceiveMessageRequest.class), any(Duration.class))).thenReturn(future);
+ final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
+ okReceiveMessageResponsesFuture(FAKE_TOPIC_0, receivedMessageCount);
+ when(clientManager.receiveMessage(any(Endpoints.class), any(Metadata.class), any(ReceiveMessageRequest.class),
+ any(Duration.class))).thenReturn(future);
final List<MessageView> messageViews = simpleConsumer.receive(1, Duration.ofSeconds(1));
- verify(clientManager, times(1)).receiveMessage(any(Endpoints.class), any(Metadata.class), any(ReceiveMessageRequest.class), any(Duration.class));
+ verify(clientManager, times(1)).receiveMessage(any(Endpoints.class),
+ any(Metadata.class), any(ReceiveMessageRequest.class), any(Duration.class));
assertEquals(receivedMessageCount, messageViews.size());
shutdown(simpleConsumer);
}
@Test
public void testAckMessageSuccess() throws ClientException {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
start(simpleConsumer);
try {
final MessageViewImpl messageView = fakeMessageViewImpl();
final ListenableFuture<AckMessageResponse> future = okAckMessageResponseFuture();
- when(clientManager.ackMessage(any(Endpoints.class), any(Metadata.class), any(AckMessageRequest.class), any(Duration.class))).thenReturn(future);
+ when(clientManager.ackMessage(any(Endpoints.class), any(Metadata.class), any(AckMessageRequest.class),
+ any(Duration.class))).thenReturn(future);
simpleConsumer.ack(messageView);
} finally {
shutdown(simpleConsumer);
@@ -198,12 +208,14 @@ public class SimpleConsumerImplTest extends TestBase {
@Test
public void testChangeInvisibleDurationSuccess() throws ClientException {
- simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+ simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
start(simpleConsumer);
try {
final MessageViewImpl messageView = fakeMessageViewImpl();
- final ListenableFuture<ChangeInvisibleDurationResponse> future = okChangeInvisibleDurationResponseFuture(FAKE_RECEIPT_HANDLE_1);
- when(clientManager.changeInvisibleDuration(any(Endpoints.class), any(Metadata.class), any(ChangeInvisibleDurationRequest.class), any(Duration.class))).thenReturn(future);
+ final ListenableFuture<ChangeInvisibleDurationResponse> future =
+ okChangeInvisibleDurationResponseFuture(FAKE_RECEIPT_HANDLE_1);
+ when(clientManager.changeInvisibleDuration(any(Endpoints.class), any(Metadata.class),
+ any(ChangeInvisibleDurationRequest.class), any(Duration.class))).thenReturn(future);
simpleConsumer.changeInvisibleDuration0(messageView, Duration.ofSeconds(3));
assertEquals(FAKE_RECEIPT_HANDLE_1, messageView.getReceiptHandle());
} finally {
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
similarity index 96%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
index a64babf..e1298bd 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
@@ -17,6 +17,12 @@
package org.apache.rocketmq.client.java.impl.consumer;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import java.time.Duration;
import java.util.List;
import java.util.Optional;
@@ -34,12 +40,6 @@ import org.apache.rocketmq.client.java.tool.TestBase;
import org.junit.Ignore;
import org.junit.Test;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
public class StandardConsumeServiceTest extends TestBase {
@Test
@@ -73,7 +73,8 @@ public class StandardConsumeServiceTest extends TestBase {
Duration duration, MessageHookPointsStatus status) {
}
};
- final StandardConsumeService service = new StandardConsumeService(FAKE_CLIENT_ID, processQueueTable, listener, SINGLE_THREAD_POOL_EXECUTOR, interceptor, SCHEDULER);
+ final StandardConsumeService service = new StandardConsumeService(FAKE_CLIENT_ID, processQueueTable, listener,
+ SINGLE_THREAD_POOL_EXECUTOR, interceptor, SCHEDULER);
service.dispatch0();
verify(processQueue0, times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
verify(processQueue0, times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
similarity index 97%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
index 2bf5520..5fab2b2 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
@@ -20,8 +20,8 @@ package org.apache.rocketmq.client.java.impl.producer;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.apis.ClientConfiguration;
-import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
import org.apache.rocketmq.client.java.impl.ClientManager;
@@ -79,7 +79,8 @@ public class ProducerBuilderImplTest {
@Test
public void testBuildWithoutTopic() throws ClientException, IOException {
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints("127.0.0.1:80").build();
- Producer producer = ClientServiceProvider.loadService().newProducerBuilder().setClientConfiguration(clientConfiguration).build();
+ Producer producer = ClientServiceProvider.loadService().newProducerBuilder()
+ .setClientConfiguration(clientConfiguration).build();
producer.close();
}
}
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
similarity index 82%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index 1f2d1ec..29f4278 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -17,6 +17,15 @@
package org.apache.rocketmq.client.java.impl.producer;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import apache.rocketmq.v2.Broker;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.MessageQueue;
@@ -50,24 +59,15 @@ import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.tool.TestBase;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
@RunWith(MockitoJUnitRunner.class)
public class ProducerImplTest extends TestBase {
@Mock
@@ -78,7 +78,9 @@ public class ProducerImplTest extends TestBase {
@InjectMocks
private ClientManagerRegistry clientManagerRegistry = ClientManagerRegistry.getInstance();
- private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+ private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+ .setEndpoints(FAKE_ACCESS_POINT).build();
+
private final String[] str = {FAKE_TOPIC_0};
private final Set<String> set = new HashSet<>(Arrays.asList(str));
@@ -88,7 +90,8 @@ public class ProducerImplTest extends TestBase {
private final ProducerImpl producer = new ProducerImpl(clientConfiguration, set, 1, null);
@InjectMocks
- private final ProducerImpl producerWithoutTopicBinding = new ProducerImpl(clientConfiguration, new HashSet<>(), 1, null);
+ private final ProducerImpl producerWithoutTopicBinding = new ProducerImpl(clientConfiguration, new HashSet<>(), 1,
+ null);
private void start(ProducerImpl producer) throws ClientException {
SettableFuture<QueryRouteResponse> future0 = SettableFuture.create();
@@ -96,15 +99,20 @@ public class ProducerImplTest extends TestBase {
List<MessageQueue> messageQueueList = new ArrayList<>();
MessageQueue mq = MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
.setPermission(Permission.READ_WRITE)
- .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0())).setId(0).build();
+ .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0()))
+ .setId(0).build();
messageQueueList.add(mq);
- QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status).addAllMessageQueues(messageQueueList).build();
+ QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status)
+ .addAllMessageQueues(messageQueueList).build();
future0.set(response);
- when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class)))
+ when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
+ any(Duration.class)))
.thenReturn(future0);
- when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class)))
+ when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+ any(TelemetrySession.class)))
.thenReturn(telemetryRequestObserver);
- final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("TestScheduler"));
+ final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
+ "TestScheduler"));
when(clientManager.getScheduler()).thenReturn(scheduler);
doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
@@ -131,11 +139,14 @@ public class ProducerImplTest extends TestBase {
@Test
public void testSendWithTopicBinding() throws ClientException, ExecutionException, InterruptedException {
start(producer);
- verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
- verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+ verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+ any(QueryRouteRequest.class), any(Duration.class));
+ verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
+ any(Duration.class), any(TelemetrySession.class));
final Message message = fakeMessage(FAKE_TOPIC_0);
final ListenableFuture<SendMessageResponse> future = okSendMessageResponseFutureWithSingleEntry();
- when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future);
+ when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
+ any(Duration.class))).thenReturn(future);
final SendMessageResponse response = future.get();
assertEquals(1, response.getEntriesCount());
final apache.rocketmq.v2.SendResultEntry receipt = response.getEntriesList().iterator().next();
@@ -147,16 +158,21 @@ public class ProducerImplTest extends TestBase {
@Test
public void testSendWithoutTopicBinding() throws ClientException, ExecutionException, InterruptedException {
start(producerWithoutTopicBinding);
- verify(clientManager, never()).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
- verify(clientManager, never()).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+ verify(clientManager, never()).queryRoute(any(Endpoints.class), any(Metadata.class),
+ any(QueryRouteRequest.class), any(Duration.class));
+ verify(clientManager, never()).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+ any(TelemetrySession.class));
final Message message = fakeMessage(FAKE_TOPIC_0);
final ListenableFuture<SendMessageResponse> future = okSendMessageResponseFutureWithSingleEntry();
- when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future);
+ when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
+ any(Duration.class))).thenReturn(future);
final SendMessageResponse response = future.get();
assertEquals(1, response.getEntriesCount());
final SendReceipt sendReceipt = producerWithoutTopicBinding.send(message);
- verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
- verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+ verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+ any(QueryRouteRequest.class), any(Duration.class));
+ verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
+ any(Duration.class), any(TelemetrySession.class));
final apache.rocketmq.v2.SendResultEntry receipt = response.getEntriesList().iterator().next();
assertEquals(receipt.getMessageId(), sendReceipt.getMessageId().toString());
shutdown(producerWithoutTopicBinding);
@@ -165,8 +181,10 @@ public class ProducerImplTest extends TestBase {
@Test
public void testSendBatchMessage() throws ClientException, ExecutionException, InterruptedException {
start(producer);
- verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
- verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+ verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+ any(QueryRouteRequest.class), any(Duration.class));
+ verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
+ any(Duration.class), any(TelemetrySession.class));
int batchMessageNum = 2;
List<Message> messages = new ArrayList<>();
for (int i = 0; i < batchMessageNum; i++) {
@@ -175,7 +193,8 @@ public class ProducerImplTest extends TestBase {
}
final ListenableFuture<SendMessageResponse> future = okBatchSendMessageResponseFuture();
- when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future);
+ when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
+ any(Duration.class))).thenReturn(future);
final SendMessageResponse response = future.get();
assertEquals(batchMessageNum, response.getEntriesCount());
final List<apache.rocketmq.v2.SendResultEntry> receipts = response.getEntriesList();
@@ -188,10 +207,13 @@ public class ProducerImplTest extends TestBase {
}
@Test(expected = IllegalArgumentException.class)
- public void testSendBatchMessageWithDifferentTopic() throws ClientException, ExecutionException, InterruptedException {
+ public void testSendBatchMessageWithDifferentTopic() throws ClientException, ExecutionException,
+ InterruptedException {
start(producer);
- verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
- verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+ verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+ any(QueryRouteRequest.class), any(Duration.class));
+ verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+ any(TelemetrySession.class));
int batchMessageNum = 2;
List<Message> messages = new ArrayList<>();
@@ -214,10 +236,13 @@ public class ProducerImplTest extends TestBase {
@Test(expected = ClientException.class)
public void testSendMessageWithFailure() throws ClientException {
start(producer);
- verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
- verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+ verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+ any(QueryRouteRequest.class), any(Duration.class));
+ verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+ any(TelemetrySession.class));
final ListenableFuture<SendMessageResponse> future = failureSendMessageResponseFuture();
- when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future);
+ when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
+ any(Duration.class))).thenReturn(future);
Message message0 = fakeMessage(FAKE_TOPIC_0);
try {
producer.send(message0);
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
similarity index 90%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
index a66e84d..3519385 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
@@ -17,6 +17,11 @@
package org.apache.rocketmq.client.java.impl.producer;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
@@ -29,21 +34,12 @@ import org.apache.rocketmq.client.java.message.MessageCommon;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.tool.TestBase;
-import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.when;
-
@RunWith(MockitoJUnitRunner.class)
public class TransactionImplTest extends TestBase {
@Mock
@@ -98,7 +94,8 @@ public class TransactionImplTest extends TestBase {
}
@Test
- public void testCommit() throws IOException, ClientException, ExecutionException, InterruptedException, NoSuchFieldException, IllegalAccessException {
+ public void testCommit() throws IOException, ClientException, ExecutionException, InterruptedException,
+ NoSuchFieldException, IllegalAccessException {
final TransactionImpl transaction = new TransactionImpl(producer);
final Message message0 = fakeMessage(FAKE_TOPIC_0);
@@ -113,14 +110,17 @@ public class TransactionImplTest extends TestBase {
final PublishingMessageImpl publishingMessage = transaction.tryAddMessage(message0);
final SendReceiptImpl receipt = fakeSendReceiptImpl(fakeMessageQueueImpl0());
transaction.tryAddReceipt(publishingMessage, receipt);
- ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor = ArgumentCaptor.forClass(TransactionResolution.class);
- doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class), any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
+ ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor =
+ ArgumentCaptor.forClass(TransactionResolution.class);
+ doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class),
+ any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
transaction.commit();
assertEquals(TransactionResolution.COMMIT, resolutionArgumentCaptor.getValue());
}
@Test
- public void testRollback() throws IOException, ClientException, ExecutionException, InterruptedException, NoSuchFieldException, IllegalAccessException {
+ public void testRollback() throws IOException, ClientException, ExecutionException, InterruptedException,
+ NoSuchFieldException, IllegalAccessException {
final TransactionImpl transaction = new TransactionImpl(producer);
final Message message0 = fakeMessage(FAKE_TOPIC_0);
@@ -135,8 +135,10 @@ public class TransactionImplTest extends TestBase {
final PublishingMessageImpl publishingMessage = transaction.tryAddMessage(message0);
final SendReceiptImpl receipt = fakeSendReceiptImpl(fakeMessageQueueImpl0());
transaction.tryAddReceipt(publishingMessage, receipt);
- ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor = ArgumentCaptor.forClass(TransactionResolution.class);
- doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class), any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
+ ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor =
+ ArgumentCaptor.forClass(TransactionResolution.class);
+ doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class),
+ any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
transaction.rollback();
assertEquals(TransactionResolution.ROLLBACK, resolutionArgumentCaptor.getValue());
}
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
similarity index 99%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
index daa70b9..4e3b81d 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
@@ -17,13 +17,12 @@
package org.apache.rocketmq.client.java.message;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.junit.Assert;
import org.junit.Test;
-import java.util.HashSet;
-import java.util.Set;
-
public class MessageIdCodecTest {
private final MessageIdCodec codec = MessageIdCodec.getInstance();
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
similarity index 96%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
index c43f107..be03d1d 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
@@ -17,21 +17,19 @@
package org.apache.rocketmq.client.java.message;
-import java.util.Arrays;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.java.tool.TestBase;
import org.junit.Assert;
import org.junit.Test;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
public class MessageImplTest extends TestBase {
private final ClientServiceProvider provider = ClientServiceProvider.loadService();
private final String sampleTopic = "foobar";
@@ -70,7 +68,8 @@ public class MessageImplTest extends TestBase {
@Test
public void testTagSetter() {
- final Message message = provider.newMessageBuilder().setTag("tagA").setTopic(FAKE_TOPIC_0).setBody(FAKE_MESSAGE_BODY).build();
+ final Message message = provider.newMessageBuilder().setTag("tagA").setTopic(FAKE_TOPIC_0)
+ .setBody(FAKE_MESSAGE_BODY).build();
assertTrue(message.getTag().isPresent());
assertEquals("tagA", message.getTag().get());
}
@@ -92,7 +91,8 @@ public class MessageImplTest extends TestBase {
@Test
public void testKeySetter() {
- final Message message = provider.newMessageBuilder().setKeys("keyA").setTopic(FAKE_TOPIC_0).setBody(FAKE_MESSAGE_BODY).build();
+ final Message message = provider.newMessageBuilder().setKeys("keyA").setTopic(FAKE_TOPIC_0)
+ .setBody(FAKE_MESSAGE_BODY).build();
assertFalse(message.getKeys().isEmpty());
}
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
similarity index 99%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
index 2ff7238..0faaf23 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.client.java.retry;
+import static apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
+import static org.junit.Assert.assertEquals;
+
import apache.rocketmq.v2.CustomizedBackoff;
import apache.rocketmq.v2.RetryPolicy;
import com.google.protobuf.util.Durations;
@@ -25,9 +28,6 @@ import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
-import static apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
-import static org.junit.Assert.assertEquals;
-
public class CustomizedBackoffRetryPolicyTest {
@Test
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
similarity index 96%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
index d2163ce..c45673d 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
@@ -82,7 +82,8 @@ public class EndpointsTest {
@Test
public void testEndpointsWithMultipleIpv6() {
- final Endpoints endpoints = new Endpoints("1050:0000:0000:0000:0005:0600:300c:326b:8080;1050:0000:0000:0000:0005:0600:300c:326c:8081");
+ final Endpoints endpoints = new Endpoints("1050:0000:0000:0000:0005:0600:300c:326b:8080;" +
+ "1050:0000:0000:0000:0005:0600:300c:326c:8081");
Assert.assertEquals(AddressScheme.IPv6, endpoints.getScheme());
Assert.assertEquals(2, endpoints.getAddresses().size());
final Iterator<Address> iterator = endpoints.getAddresses().iterator();
@@ -96,7 +97,8 @@ public class EndpointsTest {
Assert.assertEquals(8081, address1.getPort());
Assert.assertEquals(AddressScheme.IPv6, endpoints.getScheme());
- Assert.assertEquals("ipv6:1050:0000:0000:0000:0005:0600:300c:326b:8080,1050:0000:0000:0000:0005:0600:300c:326c:8081", endpoints.getFacade());
+ Assert.assertEquals("ipv6:1050:0000:0000:0000:0005:0600:300c:326b:8080," +
+ "1050:0000:0000:0000:0005:0600:300c:326c:8081", endpoints.getFacade());
}
@Test
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
similarity index 94%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index cb5d68f..b30bf93 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -42,7 +42,6 @@ import apache.rocketmq.v2.SystemProperties;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
@@ -64,15 +63,15 @@ import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
-import org.apache.rocketmq.client.java.misc.Utilities;
-import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.apache.rocketmq.client.java.message.MessageIdCodec;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
+import org.apache.rocketmq.client.java.misc.Utilities;
+import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
public class TestBase {
protected static final String FAKE_CLIENT_ID = "mbp@29848@cno0nhxy";
@@ -163,7 +162,8 @@ public class TestBase {
final byte[] body = RandomUtils.nextBytes(bodySize);
Map<String, String> properties = new HashMap<>();
List<String> keys = new ArrayList<>();
- return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null, null, keys, properties, FAKE_HOST_0, 1, 1, mq, FAKE_RECEIPT_HANDLE_0, null, 1, corrupted, null);
+ return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null, null,
+ keys, properties, FAKE_HOST_0, 1, 1, mq, FAKE_RECEIPT_HANDLE_0, null, 1, corrupted, null);
}
protected MessageQueueImpl fakeMessageQueueImpl0() {
@@ -179,11 +179,13 @@ public class TestBase {
}
protected Broker fakePbBroker0() {
- return Broker.newBuilder().setEndpoints(fakePbEndpoints0()).setName(FAKE_BROKER_NAME_0).setId(Utilities.MASTER_BROKER_ID).build();
+ return Broker.newBuilder().setEndpoints(fakePbEndpoints0()).setName(FAKE_BROKER_NAME_0)
+ .setId(Utilities.MASTER_BROKER_ID).build();
}
protected Broker fakePbBroker1() {
- return Broker.newBuilder().setEndpoints(fakePbEndpoints1()).setName(FAKE_BROKER_NAME_1).setId(Utilities.MASTER_BROKER_ID).build();
+ return Broker.newBuilder().setEndpoints(fakePbEndpoints1()).setName(FAKE_BROKER_NAME_1)
+ .setId(Utilities.MASTER_BROKER_ID).build();
}
@@ -196,11 +198,13 @@ public class TestBase {
}
protected MessageQueue fakePbMessageQueue0(Resource topicResource) {
- return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker0()).setPermission(Permission.READ_WRITE).build();
+ return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker0())
+ .setPermission(Permission.READ_WRITE).build();
}
protected MessageQueue fakePbMessageQueue1(Resource topicResource) {
- return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker1()).setPermission(Permission.READ_WRITE).build();
+ return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker1())
+ .setPermission(Permission.READ_WRITE).build();
}
protected ListenableFuture<QueryRouteResponse> okQueryRouteResponseFuture() {
@@ -264,7 +268,8 @@ public class TestBase {
return future;
}
- protected ListenableFuture<ForwardMessageToDeadLetterQueueResponse> okForwardMessageToDeadLetterQueueResponseFuture() {
+ protected ListenableFuture<ForwardMessageToDeadLetterQueueResponse>
+ okForwardMessageToDeadLetterQueueResponseFuture() {
final Status status = Status.newBuilder().setCode(Code.OK).build();
SettableFuture<ForwardMessageToDeadLetterQueueResponse> future0 = SettableFuture.create();
final ForwardMessageToDeadLetterQueueResponse response =
@@ -277,8 +282,9 @@ public class TestBase {
final Status status = Status.newBuilder().setCode(Code.OK).build();
SettableFuture<SendMessageResponse> future0 = SettableFuture.create();
final String messageId = MessageIdCodec.getInstance().nextMessageId().toString();
- SendResultEntry entry = SendResultEntry.newBuilder().setMessageId(messageId).setTransactionId(FAKE_TRANSACTION_ID)
- .setOffset(1).build();
+ SendResultEntry entry =
+ SendResultEntry.newBuilder().setMessageId(messageId).setTransactionId(FAKE_TRANSACTION_ID)
+ .setOffset(1).build();
SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build();
future0.set(response);
return future0;
@@ -300,7 +306,8 @@ public class TestBase {
.setOffset(1).build();
SendResultEntry entry1 = SendResultEntry.newBuilder().setMessageId(messageId)
.setOffset(2).build();
- SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry0).addEntries(entry1).build();
+ SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry0)
+ .addEntries(entry1).build();
future0.set(response);
return future0;
}
@@ -355,7 +362,8 @@ public class TestBase {
}
protected ProducerSettings fakeProducerSettings() {
- return new ProducerSettings(FAKE_CLIENT_ID, fakeEndpoints(), fakeExponentialBackoffRetryPolicy(), Duration.ofSeconds(1), new HashSet<>());
+ return new ProducerSettings(FAKE_CLIENT_ID, fakeEndpoints(), fakeExponentialBackoffRetryPolicy(),
+ Duration.ofSeconds(1), new HashSet<>());
}
protected SendReceiptImpl fakeSendReceiptImpl(
diff --git a/java/client-java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/java/client/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
similarity index 100%
rename from java/client-java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
rename to java/client/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
diff --git a/java/pom.xml b/java/pom.xml
index ef38d67..abd7dc9 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -10,7 +10,8 @@
<version>5.0.0-SNAPSHOT</version>
<modules>
<module>client-apis</module>
- <module>client-java</module>
+ <module>client</module>
+ <module>client-shade</module>
</modules>
<properties>
@@ -32,6 +33,7 @@
<!-- plugin -->
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
+ <maven-checkstyle-plugin.version>3.1.1</maven-checkstyle-plugin.version>
</properties>
<repositories>
@@ -83,6 +85,16 @@
<artifactId>rocketmq-client-apis</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-client-java-noshade</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-client-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-proto</artifactId>
@@ -148,7 +160,26 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>${maven-checkstyle-plugin.version}</version>
+ <configuration>
+ <consoleOutput>true</consoleOutput>
+ <encoding>UTF-8</encoding>
+ <configLocation>style/checkstyle.xml</configLocation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <id>validate</id>
+ <phase>validate</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
-
</project>
\ No newline at end of file
diff --git a/java/style/checkstyle.xml b/java/style/checkstyle.xml
new file mode 100644
index 0000000..d9bfef4
--- /dev/null
+++ b/java/style/checkstyle.xml
@@ -0,0 +1,385 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<!DOCTYPE module PUBLIC
+ "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+ "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<!--
+ Checkstyle configuration originally derived from the Google coding conventions from Google Java Style that can be
+ found at https://google.github.io/styleguide/javaguide.html. Deviations have been made where desired.
+ -->
+<module name="Checker">
+ <property name="charset" value="UTF-8"/>
+ <property name="severity" value="error"/>
+ <property name="fileExtensions" value="java"/>
+
+ <!-- Files must not contain tabs. -->
+ <module name="FileTabCharacter">
+ <property name="eachLine" value="true"/>
+ </module>
+
+ <!-- header -->
+ <module name="RegexpHeader">
+ <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/>
+ <property name="fileExtensions" value="java"/>
+ </module>
+
+ <!-- <module name="SuppressionFilter">-->
+ <!-- <property name="file"-->
+ <!-- value="${checkstyle.suppressions.file}"-->
+ <!-- default="checkstyle-suppressions.xml"/>-->
+ <!-- </module>-->
+
+ <module name="TreeWalker">
+
+ <!-- Allow suppressing rules via comments. -->
+ <module name="SuppressionCommentFilter"/>
+
+ <!-- Class names must match the file name in which they are defined. -->
+ <module name="OuterTypeFilename"/>
+
+ <!-- Only one class may be defined per file. -->
+ <module name="OneTopLevelClass"/>
+
+ <!-- Special escape sequences like \n and \t must be used over the octal or unicode equivalent. -->
+ <module name="IllegalTokenText">
+ <property name="tokens" value="STRING_LITERAL, CHAR_LITERAL"/>
+ <property name="format"
+ value="\\u00(08|09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/>
+ <property name="message" value="Avoid using corresponding octal or Unicode escape."/>
+ </module>
+
+ <!-- Unicode escapes must not be used for printable characters. -->
+ <module name="AvoidEscapedUnicodeCharacters">
+ <property name="allowEscapesForControlCharacters" value="true"/>
+ <property name="allowByTailComment" value="true"/>
+ <property name="allowNonPrintableEscapes" value="true"/>
+ </module>
+
+ <!-- Stars must not be used in import statements. -->
+ <module name="AvoidStarImport"/>
+
+ <!-- Checks for unused imports. -->
+ <module name="UnusedImports"/>
+
+ <!-- Package name and imports must not be wrapped. -->
+ <module name="NoLineWrap"/>
+
+ <!-- Braces must be used for all blocks. -->
+ <module name="NeedBraces"/>
+
+ <!-- Braces must not be empty for most language constructs. -->
+ <module name="EmptyBlock">
+ <property name="option" value="TEXT"/>
+ <property name="tokens" value="LITERAL_TRY, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_SWITCH"/>
+ </module>
+
+ <!-- For language constructs related to the previous statement (eg. "else" or "catch"), the keywords must
+ be defined on the same line as the right curly brace. -->
+ <module name="RightCurly">
+ <property name="id" value="RightCurlySame"/>
+ <property name="tokens"
+ value="LITERAL_TRY, LITERAL_CATCH, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_DO"/>
+ </module>
+
+ <!-- For other language constructs, they must be defined on a separate line. -->
+ <module name="RightCurly">
+ <property name="id" value="RightCurlyAlone"/>
+ <property name="option" value="alone"/>
+ <property name="tokens"
+ value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT, INSTANCE_INIT"/>
+ </module>
+
+ <!-- Language constructs like "if" and "while" must be followed by whitespace. -->
+ <module name="WhitespaceAfter"/>
+
+ <!-- Language constructs must be surrounded by whitespace. -->
+ <module name="WhitespaceAround">
+ <property name="allowEmptyConstructors" value="true"/>
+ <property name="allowEmptyMethods" value="true"/>
+ <property name="allowEmptyTypes" value="true"/>
+ <property name="allowEmptyLoops" value="true"/>
+ <message key="ws.notFollowed"
+ value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement."/>
+ <message key="ws.notPreceded"
+ value="WhitespaceAround: ''{0}'' is not preceded with whitespace."/>
+ </module>
+
+ <!-- Only one statement per line is permitted. -->
+ <module name="OneStatementPerLine"/>
+
+ <!-- Variables must be defined on different lines. -->
+ <module name="MultipleVariableDeclarations"/>
+
+ <!-- No C-style array declarations are permitted (eg. String args[]). -->
+ <module name="ArrayTypeStyle"/>
+
+ <!-- Defaults must always be included for switch statements, even if they are empty. -->
+ <module name="MissingSwitchDefault"/>
+
+ <!-- Case blocks with statements on them must include a break, return, etc. or the comment "fall through". -->
+ <module name="FallThrough"/>
+
+ <!-- When defining long literals, an upper L must be used. -->
+ <module name="UpperEll"/>
+
+ <!-- Modifiers like public, abstract, static, etc. must follow a consistent order. -->
+ <module name="ModifierOrder"/>
+
+ <!-- Empty lines must separate methods and constructors. -->
+ <module name="EmptyLineSeparator">
+ <property name="allowNoEmptyLineBetweenFields" value="true"/>
+ </module>
+
+ <!-- New lines must happen before dots. -->
+ <module name="SeparatorWrap">
+ <property name="id" value="SeparatorWrapDot"/>
+ <property name="tokens" value="DOT"/>
+ <property name="option" value="nl"/>
+ </module>
+
+ <!-- New lines must happen after commas. -->
+ <module name="SeparatorWrap">
+ <property name="id" value="SeparatorWrapComma"/>
+ <property name="tokens" value="COMMA"/>
+ <property name="option" value="EOL"/>
+ </module>
+
+ <!-- Package names must follow a defined format. -->
+ <module name="PackageName">
+ <property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$"/>
+ <message key="name.invalidPattern"
+ value="Package name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+
+ <!-- Type names must follow a defined format. -->
+ <module name="TypeName">
+ <message key="name.invalidPattern"
+ value="Type name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+
+ <!-- Non-constant fields must follow a defined format. -->
+ <module name="MemberName">
+ <property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
+ <message key="name.invalidPattern"
+ value="Member name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+
+ <!-- Constant fields must follow a defined format. -->
+ <module name="ConstantName">
+ <property name="format" value="^log?|[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$"/>
+ </module>
+
+ <!-- Method and lambda parameters must follow a defined format. -->
+ <module name="ParameterName">
+ <property name="id" value="ParameterNameNonPublic"/>
+ <property name="format" value="^[a-z]([a-zA-Z0-9]*)?$"/>
+ <property name="accessModifiers" value="protected, package, private"/>
+ <message key="name.invalidPattern"
+ value="Parameter name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+ <module name="ParameterName">
+ <property name="id" value="ParameterNamePublic"/>
+ <property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
+ <property name="accessModifiers" value="public"/>
+ <message key="name.invalidPattern"
+ value="Parameter name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+
+ <!-- Catch parameters must follow a defined format. -->
+ <module name="CatchParameterName">
+ <property name="format" value="^(e|t|[a-z][a-zA-Z0-9]*)$"/>
+ <message key="name.invalidPattern"
+ value="Catch parameter name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+
+ <!-- Local variables must follow a defined format. -->
+ <module name="LocalVariableName">
+ <property name="tokens" value="VARIABLE_DEF"/>
+ <property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
+ <property name="allowOneCharVarInForLoop" value="true"/>
+ <message key="name.invalidPattern"
+ value="Local variable name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+
+ <!-- Type parameters must follow a defined format. -->
+ <module name="ClassTypeParameterName">
+ <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
+ <message key="name.invalidPattern"
+ value="Class type name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+ <module name="MethodTypeParameterName">
+ <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
+ <message key="name.invalidPattern"
+ value="Method type name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+ <module name="InterfaceTypeParameterName">
+ <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
+ <message key="name.invalidPattern"
+ value="Interface type name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+
+ <!-- Method names must follow a defined format. -->
+ <module name="MethodName">
+ <property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
+ <message key="name.invalidPattern"
+ value="Method name ''{0}'' must match pattern ''{1}''."/>
+ </module>
+
+ <!-- Finalizers must not be overridden. -->
+ <module name="NoFinalizer"/>
+
+ <!-- Whitespace around generics must follow a defined format. -->
+ <module name="GenericWhitespace">
+ <message key="ws.followed"
+ value="GenericWhitespace ''{0}'' is followed by whitespace."/>
+ <message key="ws.preceded"
+ value="GenericWhitespace ''{0}'' is preceded with whitespace."/>
+ <message key="ws.illegalFollow"
+ value="GenericWhitespace ''{0}'' should be followed by whitespace."/>
+ <message key="ws.notPreceded"
+ value="GenericWhitespace ''{0}'' is not preceded with whitespace."/>
+ </module>
+
+ <!-- File indentation must follow a convention of 4 spaces (8 for throws statements). -->
+<!-- <module name="Indentation">-->
+<!-- <property name="throwsIndent" value="8"/>-->
+<!-- <property name="arrayInitIndent" value="8"/>-->
+<!-- </module>-->
+
+ <!-- Abbreviations must follow the same conventions as any other word (eg. use Rpc, not RPC). -->
+ <module name="AbbreviationAsWordInName">
+ <property name="ignoreFinal" value="false"/>
+ <!-- <property name="allowedAbbreviationLength" value="3"/>-->
+ <property name="severity" value="warning"/>
+ </module>
+
+ <!-- Class contents must be defined in the order suggested by Sun/Oracle:
+ http://www.oracle.com/technetwork/java/javase/documentation/codeconventions-141855.html#1852 -->
+ <module name="DeclarationOrder"/>
+
+ <!--<!– Overloaded methods and constructors must be defined together. –>-->
+ <!--<module name="OverloadMethodsDeclarationOrder">-->
+ <!--<property name="severity" value="warning"/> <!– TODO: Make error. –>-->
+ <!--</module>-->
+
+ <!-- Variables must be declared near where they are used. -->
+ <module name="VariableDeclarationUsageDistance">
+ <property name="allowedDistance" value="10"/>
+ </module>
+
+ <!-- Static imports must occur before external package imports. -->
+ <module name="CustomImportOrder">
+ <property name="sortImportsInGroupAlphabetically" value="true"/>
+ <property name="separateLineBetweenGroups" value="true"/>
+ <property name="customImportOrderRules" value="STATIC###THIRD_PARTY_PACKAGE"/>
+ </module>
+
+ <!-- Method names must be specified on the same line as their parameter list. -->
+ <module name="MethodParamPad"/>
+
+ <!-- There must be no space between a method name and its parameter list. -->
+ <module name="ParenPad"/>
+
+ <!-- Non-field annotations must be on separate lines, or in the case of single parameterless annotation can be
+ placed on the same line as the signature. -->
+ <module name="AnnotationLocation">
+ <property name="id" value="AnnotationLocationMostCases"/>
+ <property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF"/>
+ </module>
+
+ <!-- Fields can have multiple annotations applied on the same line. -->
+ <module name="AnnotationLocation">
+ <property name="id" value="AnnotationLocationVariables"/>
+ <property name="tokens" value="VARIABLE_DEF"/>
+ <property name="allowSamelineMultipleAnnotations" value="true"/>
+ </module>
+
+ <!-- Catch blocks must not be empty without a comment. -->
+ <module name="EmptyCatchBlock"/>
+
+ <!-- Comments must be placed at the same indentation level as the surrounding code. -->
+ <module name="CommentsIndentation"/>
+
+ <!-- Checks for imports of certain packages -->
+ <!-- See http://checkstyle.sf.net/config_imports.html -->
+ <module name="IllegalImport">
+ <property name="illegalPkgs" value="org.apache.http.annotation,javax.annotation.Generated"/>
+ </module>
+
+ <!-- Checks that the override annotation is specified when using @inheritDoc javadoc. -->
+ <module name="MissingOverride"/>
+
+ <!-- Do not allow assignment in subexpressions (except in some cases in loop conditions). -->
+ <module name="InnerAssignment"/>
+
+ <!-- Checks that we don't use System.out.print -->
+ <module name="Regexp">
+ <property name="format" value="System\s*\.\s*(out|err)\s*(\.|::)\s*print"/>
+ <property name="illegalPattern" value="true"/>
+ <property name="message" value="Don't use System console for logging, use a logger instead"/>
+ <property name="ignoreComments" value="true"/>
+ </module>
+
+ <!-- Checks that we don't use Objects.hash. Objects.hashCode is preferred-->
+ <module name="Regexp">
+ <property name="format" value="\bObjects.hash\b"/>
+ <property name="illegalPattern" value="true"/>
+ <property name="message" value="Don't use Objects.hash, use Objects.hashCode instead"/>
+ <property name="ignoreComments" value="true"/>
+ </module>
+
+ <!-- Checks that we don't use sslContext.newHandler directly -->
+ <module name="Regexp">
+ <property name="format" value="\sslContext.newHandler\b"/>
+ <property name="illegalPattern" value="true"/>
+ <property name="message"
+ value="Don't use sslContext.newHandler directly, use NettyUtils.newSslHandler instead"/>
+ <property name="ignoreComments" value="true"/>
+ </module>
+
+
+ <!-- Checks that we don't use AttributeKey.newInstance directly -->
+ <module name="Regexp">
+ <property name="format" value="AttributeKey\.newInstance"/>
+ <property name="illegalPattern" value="true"/>
+ <property name="message" value="Use NettyUtils.getOrCreateAttributeKey to safely declare AttributeKeys"/>
+ <property name="ignoreComments" value="true"/>
+ </module>
+
+ <!-- Checks that we don't use Thread.currentThread().getContextClassLoader() directly -->
+ <module name="Regexp">
+ <property name="format" value="Thread\s*\.\s*currentThread\s*\(\s*\)\s*(\.|::)\s*getContextClassLoader"/>
+ <property name="illegalPattern" value="true"/>
+ <property name="ignoreComments" value="true"/>
+ </module>
+
+ <!-- Checks for redundant public modifier on interfaces and other redundant modifiers -->
+ <!-- <module name="RedundantModifier"/>-->
+
+ <!-- Checks for utility and constants classes to have private constructor-->
+ <module name="HideUtilityClassConstructor"/>
+ </module>
+
+ <!-- Enforce maximum line lengths. -->
+ <module name="LineLength">
+ <property name="max" value="120"/>
+ <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
+ </module>
+
+</module>
\ No newline at end of file
diff --git a/java/style/intellij-codestyle.xml b/java/style/intellij-codestyle.xml
new file mode 100644
index 0000000..7e479cf
--- /dev/null
+++ b/java/style/intellij-codestyle.xml
@@ -0,0 +1,67 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<code_scheme name="Apache RocketMQ Clients 5.0">
+ <option name="GENERATE_FINAL_LOCALS" value="false"/>
+ <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="9999"/>
+ <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="9999999"/>
+ <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+ <value/>
+ </option>
+ <option name="IMPORT_LAYOUT_TABLE">
+ <value>
+ <package name="" withSubpackages="true" static="true"/>
+ <emptyLine/>
+ <package name="" withSubpackages="true" static="false"/>
+ <emptyLine/>
+ </value>
+ </option>
+ <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/>
+ <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/>
+ <option name="JD_P_AT_EMPTY_LINES" value="false"/>
+ <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/>
+ <JavaCodeStyleSettings>
+ <option name="DO_NOT_WRAP_AFTER_SINGLE_ANNOTATION" value="true"/>
+ <option name="CLASS_NAMES_IN_JAVADOC" value="3"/>
+ </JavaCodeStyleSettings>
+ <XML>
+ <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
+ </XML>
+ <codeStyleSettings language="JAVA">
+ <option name="RIGHT_MARGIN" value="120"/>
+ <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+ <option name="ALIGN_MULTILINE_FOR" value="false"/>
+ <option name="KEEP_FIRST_COLUMN_COMMENT" value="false"/>
+ <option name="DO_NOT_WRAP_AFTER_SINGLE_ANNOTATION" value="true"/>
+ <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+<!-- <option name="SPACE_BEFORE_ANNOTATION_ARRAY_INITIALIZER_LBRACE" value="true"/>-->
+ <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true"/>
+ <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true"/>
+ <option name="PLACE_ASSIGNMENT_SIGN_ON_NEXT_LINE" value="true"/>
+ <option name="IF_BRACE_FORCE" value="3"/>
+ <option name="DOWHILE_BRACE_FORCE" value="3"/>
+ <option name="WHILE_BRACE_FORCE" value="3"/>
+ <option name="FOR_BRACE_FORCE" value="3"/>
+ <option name="WRAP_LONG_LINES" value="true"/>
+ <option name="WRAP_ON_TYPING" value="1"/>
+ <indentOptions>
+ <option name="INDENT_SIZE" value="4"/>
+ <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+ <option name="TAB_SIZE" value="4"/>
+ </indentOptions>
+ </codeStyleSettings>
+</code_scheme>
\ No newline at end of file
diff --git a/java/style/spotbugs-suppressions.xml b/java/style/spotbugs-suppressions.xml
new file mode 100644
index 0000000..ca8620f
--- /dev/null
+++ b/java/style/spotbugs-suppressions.xml
@@ -0,0 +1,12 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<FindBugsFilter>
+ <Match>
+ <Bug pattern="SF_SWITCH_FALLTHROUGH,SIC_INNER_SHOULD_BE_STATIC_ANON,URF_UNREAD_FIELD,UUF_UNUSED_FIELD,SS_SHOULD_BE_STATIC,NM_CONFUSING"/>
+ </Match>
+
+ <Match>
+ <Package name="~org\.apache\.rocketmq\.client.*"/>
+ <Bug pattern="NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"/>
+ </Match>
+</FindBugsFilter>