You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/06/18 10:34:10 UTC

[rocketmq-clients] 01/01: Java: fix checkstyle

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit a40055ead61e0ec56ae1d2e0f2e9c56849d06f25
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Sat Jun 18 18:21:17 2022 +0800

    Java: fix checkstyle
---
 java/README.md                                     |  48 ++-
 .../rocketmq/client/apis/ClientConfiguration.java  |  11 +-
 .../client/apis/ClientConfigurationBuilder.java    |   4 +-
 .../rocketmq/client/apis/SessionCredentials.java   |   4 +-
 .../client/apis/consumer/FilterExpression.java     |   7 +-
 .../client/apis/consumer/MessageListener.java      |   3 +-
 .../client/apis/consumer/SimpleConsumer.java       |  25 +-
 .../apis/consumer/SimpleConsumerBuilder.java       |   3 +-
 java/client-shade/pom.xml                          |  18 +
 java/{client-java => client}/pom.xml               |   2 +-
 .../client/java/ClientServiceProviderImpl.java     |   0
 .../org/apache/rocketmq/client/java/UserAgent.java |   5 +-
 .../java/exception/ResourceNotFoundException.java  |   0
 .../client/java/hook/MessageHookPoints.java        |   0
 .../client/java/hook/MessageHookPointsStatus.java  |   0
 .../client/java/hook/MessageInterceptor.java       |   0
 .../apache/rocketmq/client/java/impl/Client.java   |   0
 .../rocketmq/client/java/impl/ClientImpl.java      |  96 ++---
 .../rocketmq/client/java/impl/ClientManager.java   |   0
 .../client/java/impl/ClientManagerImpl.java        |   4 +-
 .../client/java/impl/ClientManagerRegistry.java    |   0
 .../rocketmq/client/java/impl/ClientSettings.java  |   0
 .../rocketmq/client/java/impl/ClientType.java      |   0
 .../client/java/impl/TelemetrySession.java         |  44 ++-
 .../client/java/impl/consumer/Assignment.java      |   0
 .../client/java/impl/consumer/Assignments.java     |   0
 .../client/java/impl/consumer/ConsumeService.java  |   4 +-
 .../client/java/impl/consumer/ConsumeTask.java     |   7 +-
 .../client/java/impl/consumer/ConsumerImpl.java    |  36 +-
 .../java/impl/consumer/FifoConsumeService.java     |   7 +-
 .../client/java/impl/consumer/ProcessQueue.java    |   0
 .../java/impl/consumer/ProcessQueueImpl.java       | 105 ++++--
 .../impl/consumer/PushConsumerBuilderImpl.java     |   9 +-
 .../java/impl/consumer/PushConsumerImpl.java       |  55 +--
 .../java/impl/consumer/PushConsumerSettings.java   |  19 +-
 .../java/impl/consumer/ReceiveMessageResult.java   |   0
 .../impl/consumer/SimpleConsumerBuilderImpl.java   |  13 +-
 .../java/impl/consumer/SimpleConsumerImpl.java     |  50 ++-
 .../java/impl/consumer/SimpleConsumerSettings.java |  16 +-
 .../java/impl/consumer/StandardConsumeService.java |   7 +-
 .../consumer/SubscriptionTopicRouteDataResult.java |   3 +-
 .../java/impl/producer/ProducerBuilderImpl.java    |   9 +-
 .../client/java/impl/producer/ProducerImpl.java    |  99 ++++--
 .../java/impl/producer/ProducerSettings.java       |  14 +-
 .../producer/PublishingTopicRouteDataResult.java   |   0
 .../client/java/impl/producer/SendReceiptImpl.java |   4 +-
 .../client/java/impl/producer/TransactionImpl.java |  24 +-
 .../client/java/logging/CustomConsoleAppender.java |   3 +-
 .../client/java/logging/ProcessIdConverter.java    |   0
 .../client/java/message/MessageBuilderImpl.java    |   6 +-
 .../client/java/message/MessageCommon.java         |   6 +-
 .../client/java/message/MessageIdCodec.java        |   5 +-
 .../client/java/message/MessageIdImpl.java         |   0
 .../rocketmq/client/java/message/MessageImpl.java  |  14 +-
 .../rocketmq/client/java/message/MessageType.java  |   0
 .../client/java/message/MessageViewImpl.java       |  31 +-
 .../client/java/message/PublishingMessageImpl.java |   6 +-
 .../client/java/message/protocol/Encoding.java     |   0
 .../client/java/message/protocol/Resource.java     |   0
 .../client/java/metrics/MessageCacheObserver.java  |   0
 .../rocketmq/client/java/metrics/MessageMeter.java |  55 ++-
 .../rocketmq/client/java/metrics/Metric.java       |   0
 .../java/metrics/MetricMessageInterceptor.java     |  10 +-
 .../rocketmq/client/java/metrics/MetricName.java   |   0
 .../client/java/metrics/RocketmqAttributes.java    |   9 +-
 .../rocketmq/client/java/misc/Dispatcher.java      |   4 +-
 .../client/java/misc/ExecutorServices.java         |   0
 .../rocketmq/client/java/misc/LinkedElement.java   |   0
 .../rocketmq/client/java/misc/LinkedIterator.java  |   0
 .../rocketmq/client/java/misc/MetadataUtils.java   |   0
 .../client/java/misc/RequestIdGenerator.java       |   0
 .../client/java/misc/ThreadFactoryImpl.java        |   0
 .../rocketmq/client/java/misc/Utilities.java       |   6 +-
 .../java/retry/CustomizedBackoffRetryPolicy.java   |  10 +-
 .../java/retry/ExponentialBackoffRetryPolicy.java  |  12 +-
 .../rocketmq/client/java/retry/RetryPolicy.java    |   0
 .../apache/rocketmq/client/java/route/Address.java |   0
 .../rocketmq/client/java/route/AddressScheme.java  |   0
 .../apache/rocketmq/client/java/route/Broker.java  |   3 +-
 .../rocketmq/client/java/route/Endpoints.java      |   7 +-
 .../client/java/route/MessageQueueImpl.java        |   0
 .../rocketmq/client/java/route/Permission.java     |   0
 .../rocketmq/client/java/route/TopicRouteData.java |   3 +-
 .../client/java/route/TopicRouteDataResult.java    |   4 +-
 .../rocketmq/client/java/rpc/AuthInterceptor.java  |   4 +-
 .../client/java/rpc/IpNameResolverFactory.java     |   0
 .../client/java/rpc/LoggingInterceptor.java        |  16 +-
 .../apache/rocketmq/client/java/rpc/RpcClient.java |   0
 .../rocketmq/client/java/rpc/RpcClientImpl.java    |   2 +-
 .../apache/rocketmq/client/java/rpc/Signature.java |   0
 .../apache/rocketmq/client/java/rpc/TLSHelper.java |   0
 .../rocketmq.metadata.properties                   |   0
 ...ache.rocketmq.client.apis.ClientServiceProvider |   0
 .../src/main/resources/logback.xml                 |   0
 .../client/java/ClientServiceProviderImplTest.java |  10 +-
 .../client/java/impl/ClientManagerImplTest.java    |   0
 .../java/impl/consumer/ProcessQueueImplTest.java   |  51 +--
 .../impl/consumer/PushConsumerBuilderImplTest.java |   3 +-
 .../java/impl/consumer/PushConsumerImplTest.java   |  60 ++--
 .../impl/consumer/SimpleConsumerBuilderTest.java   |   3 +-
 .../java/impl/consumer/SimpleConsumerImplTest.java |  76 ++--
 .../impl/consumer/StandardConsumeServiceTest.java  |  15 +-
 .../impl/producer/ProducerBuilderImplTest.java     |   5 +-
 .../java/impl/producer/ProducerImplTest.java       |  93 +++--
 .../java/impl/producer/TransactionImplTest.java    |  32 +-
 .../client/java/message/MessageIdCodecTest.java    |   5 +-
 .../client/java/message/MessageImplTest.java       |  22 +-
 .../retry/CustomizedBackoffRetryPolicyTest.java    |   6 +-
 .../rocketmq/client/java/route/EndpointsTest.java  |   6 +-
 .../apache/rocketmq/client/java/tool/TestBase.java |  36 +-
 .../org.mockito.plugins.MockMaker                  |   0
 java/pom.xml                                       |  35 +-
 java/style/checkstyle.xml                          | 385 +++++++++++++++++++++
 java/style/intellij-codestyle.xml                  |  67 ++++
 java/style/spotbugs-suppressions.xml               |  12 +
 115 files changed, 1363 insertions(+), 530 deletions(-)

diff --git a/java/README.md b/java/README.md
index 2a91aa5..dc36bc9 100644
--- a/java/README.md
+++ b/java/README.md
@@ -1 +1,47 @@
-## RocketMQ Clients for Java
+# RocketMQ Clients for Java
+
+[![Java](https://github.com/apache/rocketmq-clients/actions/workflows/java_build.yml/badge.svg)](https://github.com/apache/rocketmq-clients/actions/workflows/java_build.yml)
+
+The java implementation of client for [Apache RocketMQ](https://rocketmq.apache.org/).
+
+## Prerequisites
+
+Java 11 or higher is required to build this project. The built artifacts can be used on Java 8 or
+higher.
+
+<table>
+  <tr>
+    <td><b>Build required:</b></td>
+    <td><b>Java 11 or later</b></td>
+  </tr>
+  <tr>
+    <td><b>Runtime required:</b></td>
+    <td><b>Java 8 or later</b></td>
+  </tr>
+</table>
+
+## Getting Started
+
+Add dependency to your `pom.xml`, and replace the `${rocketmq.version}` by the latest version.
+
+```xml
+<dependency>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-client-java</artifactId>
+    <version>${rocketmq.version}</version>
+</dependency>
+```
+
+It is worth noting that `rocketmq-client-java` is a shaded jar, which means you could not substitute its dependencies.
+From the perspective of avoiding dependency conflicts, you may need a shaded client in most case, but we also provided
+the no-shaded client.
+
+```xml
+<dependency>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-client-java-noshade</artifactId>
+    <version>${rocketmq.version}</version>
+</dependency>
+```
+
+
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
index 866b0f4..c7006e4 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
@@ -28,20 +28,21 @@ public class ClientConfiguration {
     private final SessionCredentialsProvider sessionCredentialsProvider;
     private final Duration requestTimeout;
 
-    public static ClientConfigurationBuilder newBuilder() {
-        return new ClientConfigurationBuilder();
-    }
-
     /**
      * The caller is supposed to have validated the arguments and handled throwing exception or
      * logging warnings already, so we avoid repeating args check here.
      */
-    ClientConfiguration(String accessPoint, SessionCredentialsProvider sessionCredentialsProvider, Duration requestTimeout) {
+    ClientConfiguration(String accessPoint, SessionCredentialsProvider sessionCredentialsProvider,
+        Duration requestTimeout) {
         this.accessPoint = accessPoint;
         this.sessionCredentialsProvider = sessionCredentialsProvider;
         this.requestTimeout = requestTimeout;
     }
 
+    public static ClientConfigurationBuilder newBuilder() {
+        return new ClientConfigurationBuilder();
+    }
+
     public String getAccessPoint() {
         return accessPoint;
     }
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index ca67644..672826e 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -17,10 +17,10 @@
 
 package org.apache.rocketmq.client.apis;
 
-import java.time.Duration;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.time.Duration;
+
 /**
  * Builder to set {@link ClientConfiguration}.
  */
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/SessionCredentials.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/SessionCredentials.java
index e16decd..5328eab 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/SessionCredentials.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/SessionCredentials.java
@@ -17,10 +17,10 @@
 
 package org.apache.rocketmq.client.apis;
 
-import java.util.Optional;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Optional;
+
 /**
  * Session credentials used in service authentications.
  */
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java
index 2f21cd2..dcda10e 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.rocketmq.client.apis.consumer;
 
-import java.util.Objects;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.base.Objects;
+
 /**
  * Filter expression is an efficient way to filter message for {@link SimpleConsumer} and {@link PushConsumer}.
  * Consumer who applied the filter expression only can receive the filtered messages.
@@ -68,6 +67,6 @@ public class FilterExpression {
 
     @Override
     public int hashCode() {
-        return Objects.hash(expression, filterExpressionType);
+        return Objects.hashCode(expression, filterExpressionType);
     }
 }
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java
index a26b741..66e4d43 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java
@@ -23,7 +23,8 @@ import org.apache.rocketmq.client.apis.message.MessageView;
  * MessageListener is used only for push consumer to process message consumption synchronously.
  *
  * <p> Refer to {@link PushConsumer}, push consumer will get message from server
- * and dispatch the message to backend thread pool which control by parameter threadCount to consumer message concurrently.
+ * and dispatch the message to backend thread pool which control by parameter threadCount to consumer message
+ * concurrently.
  */
 public interface MessageListener {
     /**
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
index 3773008..90ab988 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
@@ -41,9 +41,10 @@ import org.apache.rocketmq.client.apis.message.MessageView;
  * should be set in this case.
  *
  * <p> Simple consumer divide message consumption to 3 parts.
- * Firstly, call receive api get messages from server; Then process message by yourself; At last, your must call Ack api to commit this message.
- * If there is error when process message ,your can reconsume the message later which control by the invisibleDuration parameter.
- * Also, you can change the invisibleDuration by call changeInvisibleDuration api.
+ * Firstly, call receive api get messages from server; Then process message by yourself; At last, your must call Ack api
+ * to commit this message.
+ * If there is error when process message ,your can reconsume the message later which control by the invisibleDuration
+ * parameter. Also, you can change the invisibleDuration by call changeInvisibleDuration api.
  */
 public interface SimpleConsumer extends Closeable {
     /**
@@ -92,7 +93,8 @@ public interface SimpleConsumer extends Closeable {
      * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned.
      *
      * @param maxMessageNum     max message num when server returns.
-     * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout.
+     * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be
+     *                          invisible to other consumer unless timout.
      * @return list of messageView
      */
     List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException;
@@ -103,7 +105,8 @@ public interface SimpleConsumer extends Closeable {
      * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned.
      *
      * @param maxMessageNum     max message num when server returns.
-     * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout.
+     * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be
+     *                          invisible to other consumer unless timout.
      * @return list of messageView
      */
     CompletableFuture<List<MessageView>> receiveAsync(int maxMessageNum, Duration invisibleDuration);
@@ -133,8 +136,10 @@ public interface SimpleConsumer extends Closeable {
      * <p> The origin invisible duration for a message decide by ack request.
      *
      * <p>You must call change request before the origin invisible duration timeout.
-     * If called change request later than the origin invisible duration, this request does not take effect and throw exception.
-     * Duplicate change request will refresh the next visible time of this message to other consumers.
+     * If called change request later than the origin invisible duration, this request does not take effect and throw
+     * exception.
+     *
+     * <p>Duplicate change request will refresh the next visible time of this message to other consumers.
      *
      * @param messageView       special messageView with handle want to change.
      * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time.
@@ -147,8 +152,10 @@ public interface SimpleConsumer extends Closeable {
      * <p> The origin invisible duration for a message decide by ack request.
      *
      * <p> You must call change request before the origin invisible duration timeout.
-     * If called change request later than the origin invisible duration, this request does not take effect and throw exception.
-     * Duplicate change request will refresh the next visible time of this message to other consumers.
+     * If called change request later than the origin invisible duration, this request does not take effect and throw
+     * exception.
+     *
+     * <p>Duplicate change request will refresh the next visible time of this message to other consumers.
      *
      * @param messageView       special messageView with handle want to change.
      * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time.
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
index d2c6134..49a2804 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
@@ -49,7 +49,8 @@ public interface SimpleConsumerBuilder {
 
     /**
      * Set the max await time when receive message from server.
-     * The simple consumer will hold this long-polling receive requests until  a message is returned or a timeout occurs.
+     * The simple consumer will hold this long-polling receive requests until  a message is returned or a timeout
+     * occurs.
      *
      * @param awaitDuration The maximum time to block when no message available.
      * @return the consumer builder instance.
diff --git a/java/client-shade/pom.xml b/java/client-shade/pom.xml
new file mode 100644
index 0000000..a5be176
--- /dev/null
+++ b/java/client-shade/pom.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-client-java-parent</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>5.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-client-java</artifactId>
+
+    <properties>
+        <maven.compiler.release>8</maven.compiler.release>
+    </properties>
+
+</project>
\ No newline at end of file
diff --git a/java/client-java/pom.xml b/java/client/pom.xml
similarity index 97%
rename from java/client-java/pom.xml
rename to java/client/pom.xml
index b7b5e4b..3337ced 100644
--- a/java/client-java/pom.xml
+++ b/java/client/pom.xml
@@ -9,7 +9,7 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>rocketmq-client-java</artifactId>
+    <artifactId>rocketmq-client-java-noshade</artifactId>
 
     <properties>
         <maven.compiler.release>8</maven.compiler.release>
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/ClientServiceProviderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/ClientServiceProviderImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/ClientServiceProviderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/ClientServiceProviderImpl.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/UserAgent.java b/java/client/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
similarity index 94%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
index bdd7754..138f49c 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
@@ -23,6 +23,9 @@ import org.apache.rocketmq.client.java.misc.MetadataUtils;
 import org.apache.rocketmq.client.java.misc.Utilities;
 
 public class UserAgent {
+    public static final UserAgent INSTANCE = new UserAgent(MetadataUtils.getVersion(), Utilities.getOsDescription(),
+        Utilities.hostName());
+    
     private final String version;
     private final String platform;
     private final String hostName;
@@ -33,8 +36,6 @@ public class UserAgent {
         this.hostName = hostName;
     }
 
-    public static final UserAgent INSTANCE = new UserAgent(MetadataUtils.getVersion(), Utilities.getOsDescription(), Utilities.hostName());
-
     public UA toProtoBuf() {
         return UA.newBuilder()
             .setLanguage(Language.JAVA)
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPoints.java b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPoints.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPoints.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPoints.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPointsStatus.java b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPointsStatus.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPointsStatus.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPointsStatus.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/Client.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
similarity index 94%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 8bd2ad0..b7b45fa 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.client.java.impl;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.HeartbeatRequest;
 import apache.rocketmq.v2.HeartbeatResponse;
@@ -39,8 +41,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.Metadata;
 import java.io.UnsupportedEncodingException;
 import java.security.InvalidKeyException;
@@ -84,8 +84,8 @@ import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.TopicRouteData;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
 import org.apache.rocketmq.client.java.rpc.Signature;
-
-import static com.google.common.base.Preconditions.checkNotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
 public abstract class ClientImpl extends AbstractIdleService implements Client, MessageInterceptor {
@@ -95,10 +95,18 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     protected volatile ClientManager clientManager;
     protected final ClientConfiguration clientConfiguration;
     protected final Endpoints accessEndpoints;
+    protected final Set<String> topics;
+    // Thread-safe set.
+    protected final Set<Endpoints> isolated;
+    protected final ExecutorService clientCallbackExecutor;
+    protected final MessageMeter messageMeter;
+    /**
+     * Telemetry command executor, which is aims to execute commands from remote.
+     */
+    protected final ThreadPoolExecutor telemetryCommandExecutor;
+    protected final String clientId;
 
     private volatile ScheduledFuture<?> updateRouteCacheFuture;
-
-    protected final Set<String> topics;
     private final ConcurrentMap<String, TopicRouteDataResult> topicRouteResultCache;
 
     @GuardedBy("inflightRouteFutureLock")
@@ -109,24 +117,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     private final ConcurrentMap<Endpoints, TelemetrySession> telemetrySessionTable;
     private final ReadWriteLock telemetrySessionsLock;
 
-    // Thread-safe set.
-    protected final Set<Endpoints> isolated;
-
     @GuardedBy("messageInterceptorsLock")
     private final List<MessageInterceptor> messageInterceptors;
     private final ReadWriteLock messageInterceptorsLock;
 
-    protected final ExecutorService clientCallbackExecutor;
-
-    protected final MessageMeter messageMeter;
-
-    /**
-     * Telemetry command executor, which is aims to execute commands from remote.
-     */
-    protected final ThreadPoolExecutor telemetryCommandExecutor;
-
-    protected final String clientId;
-
     public ClientImpl(ClientConfiguration clientConfiguration, Set<String> topics) {
         this.clientConfiguration = checkNotNull(clientConfiguration, "clientConfiguration should not be null");
         final String accessPoint = clientConfiguration.getAccessPoint();
@@ -181,16 +175,19 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         // Register client after client id generation.
         this.clientManager = ClientManagerRegistry.getInstance().registerClient(this);
         // Fetch topic route from remote.
-        LOGGER.info("Begin to fetch topic(s) route data from remote during client startup, clientId={}, topics={}", clientId, topics);
+        LOGGER.info("Begin to fetch topic(s) route data from remote during client startup, clientId={}, topics={}",
+            clientId, topics);
         // Aggregate all topic route data futures into a composited future.
         final List<ListenableFuture<TopicRouteDataResult>> futures = topics.stream()
             .map(this::getRouteDataResult)
             .collect(Collectors.toList());
         List<TopicRouteDataResult> results;
         try {
-            results = Futures.allAsList(futures).get(TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP.toNanos(), TimeUnit.NANOSECONDS);
+            results = Futures.allAsList(futures).get(TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP.toNanos(),
+                TimeUnit.NANOSECONDS);
         } catch (Throwable t) {
-            LOGGER.error("Failed to get topic route data result from remote during client startup, clientId={}, topics={}", clientId, topics, t);
+            LOGGER.error("Failed to get topic route data result from remote during client startup, clientId={}, "
+                + "topics={}", clientId, topics, t);
             throw new ResourceNotFoundException(t);
         }
         // Find any topic whose topic route data is failed to fetch from remote.
@@ -203,7 +200,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
             final Status status = result.getStatus();
             throw new ClientException(status.getCode().getNumber(), status.getMessage());
         }
-        LOGGER.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}", clientId, topics);
+        LOGGER.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}",
+            clientId, topics);
         // Update route cache periodically.
         final ScheduledExecutorService scheduler = clientManager.getScheduler();
         this.updateRouteCacheFuture = scheduler.scheduleWithFixedDelay(() -> {
@@ -260,7 +258,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                 try {
                     interceptor.doBefore(hookPoint, messageCommons);
                 } catch (Throwable t) {
-                    LOGGER.warn("Exception raised while intercepting message, hookPoint={}, clientId={}", hookPoint, clientId);
+                    LOGGER.warn("Exception raised while intercepting message, hookPoint={}, clientId={}", hookPoint,
+                        clientId);
                 }
             }
         } finally {
@@ -277,7 +276,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                 try {
                     interceptor.doAfter(hookPoints, messageCommons, duration, status);
                 } catch (Throwable t) {
-                    LOGGER.warn("Exception raised while intercepting message, hookPoint={}, clientId={}", hookPoints, clientId);
+                    LOGGER.warn("Exception raised while intercepting message, hookPoint={}, clientId={}", hookPoints,
+                        clientId);
                 }
             }
         } finally {
@@ -305,13 +305,15 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                     .build();
                 telemeter(endpoints, telemetryCommand);
             } catch (Throwable t) {
-                LOGGER.error("Failed to send thread stack trace to remote, endpoints={}, nonce={}, clientId={}", endpoints, nonce, clientId, t);
+                LOGGER.error("Failed to send thread stack trace to remote, endpoints={}, nonce={}, clientId={}",
+                    endpoints, nonce, clientId, t);
             }
         };
         try {
             telemetryCommandExecutor.submit(task);
         } catch (Throwable t) {
-            LOGGER.error("[Bug] Exception raised while submitting task to print thread stack trace, endpoints={}, nonce={}, clientId={}", endpoints, nonce, clientId, t);
+            LOGGER.error("[Bug] Exception raised while submitting task to print thread stack trace, endpoints={}, "
+                + "nonce={}, clientId={}", endpoints, nonce, clientId, t);
         }
     }
 
@@ -342,7 +344,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
             try {
                 telemeter(endpoints, command);
             } catch (Throwable t) {
-                LOGGER.error("Failed to telemeter settings to remote, clientId={}, endpoints={}", clientId, endpoints, t);
+                LOGGER.error("Failed to telemeter settings, clientId={}, endpoints={}", clientId, endpoints, t);
             }
         }
     }
@@ -432,14 +434,16 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         Futures.addCallback(future, new FutureCallback<List<TelemetrySession>>() {
             @Override
             public void onSuccess(List<TelemetrySession> sessions) {
-                LOGGER.info("Register session successfully, current route will be cached, topic={}, topicRouteDataResult={}", topic, topicRouteDataResult);
+                LOGGER.info("Register session successfully, current route will be cached, topic={}, "
+                    + "topicRouteDataResult={}", topic, topicRouteDataResult);
                 final TopicRouteDataResult old = topicRouteResultCache.put(topic, topicRouteDataResult);
                 if (topicRouteDataResult.equals(old)) {
                     // Log if topic route result remains the same.
                     LOGGER.info("Topic route result remains the same, topic={}, clientId={}", topic, clientId);
                 } else {
                     // Log if topic route result is updated.
-                    LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId, old, topicRouteDataResult);
+                    LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId,
+                        old, topicRouteDataResult);
                 }
                 onTopicRouteDataResultUpdate0(topic, topicRouteDataResult);
                 future0.set(null);
@@ -448,7 +452,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
             @Override
             public void onFailure(Throwable t) {
                 // Note: Topic route would not be updated if failed to register session.
-                LOGGER.error("Failed to register session, current route will NOT be cached, topic={}, topicRouteDataResult={}", topic, topicRouteDataResult);
+                LOGGER.error("Failed to register session, current route will NOT be cached, topic={}, "
+                    + "topicRouteDataResult={}", topic, topicRouteDataResult);
                 future0.setException(t);
             }
         }, MoreExecutors.directExecutor());
@@ -465,7 +470,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
      * @param command   request of message consume verification from remote.
      */
     public void onVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command) {
-        LOGGER.warn("Ignore verify message command from remote, which is not expected, clientId={}, command={}", clientId, command);
+        LOGGER.warn("Ignore verify message command from remote, which is not expected, clientId={}, command={}",
+            clientId, command);
         final String nonce = command.getNonce();
         final Status status = Status.newBuilder().setCode(Code.NOT_IMPLEMENTED).build();
         VerifyMessageResult verifyMessageResult = VerifyMessageResult.newBuilder().setNonce(nonce).build();
@@ -487,14 +493,16 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
      * @param command   request of orphaned transaction recovery from remote.
      */
     public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand command) {
-        LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, client id={}, command={}", clientId, command);
+        LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, client id={}, "
+            + "command={}", clientId, command);
     }
 
     private void updateRouteCache() {
         LOGGER.info("Start to update route cache for a new round, clientId={}", clientId);
         topicRouteResultCache.keySet().forEach(topic -> {
             // Set timeout for future on purpose.
-            final ListenableFuture<TopicRouteDataResult> future = Futures.withTimeout(fetchTopicRoute(topic), TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP, getScheduler());
+            final ListenableFuture<TopicRouteDataResult> future = Futures.withTimeout(fetchTopicRoute(topic),
+                TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP, getScheduler());
             Futures.addCallback(future, new FutureCallback<TopicRouteDataResult>() {
                 @Override
                 public void onSuccess(TopicRouteDataResult topicRouteDataResult) {
@@ -503,7 +511,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
 
                 @Override
                 public void onFailure(Throwable t) {
-                    LOGGER.error("Failed to fetch topic route for update cache, topic={}, clientId={}", topic, clientId, t);
+                    LOGGER.error("Failed to fetch topic route for update cache, topic={}, clientId={}", topic,
+                        clientId, t);
                 }
             }, MoreExecutors.directExecutor());
         });
@@ -583,7 +592,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                     LOGGER.info("Send heartbeat successfully, endpoints={}, clientId={}", endpoints, clientId);
                     final boolean removed = isolated.remove(endpoints);
                     if (removed) {
-                        LOGGER.info("Rejoin endpoints which is isolated before, clientId={}, endpoints={}", clientId, endpoints);
+                        LOGGER.info("Rejoin endpoints which is isolated before, clientId={}, endpoints={}", clientId,
+                            endpoints);
                     }
                 }
 
@@ -593,7 +603,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                 }
             }, MoreExecutors.directExecutor());
         } catch (Throwable e) {
-            LOGGER.error("Exception raised while preparing heartbeat, endpoints={}, clientId={}", endpoints, clientId, e);
+            LOGGER.error("Exception raised while preparing heartbeat, endpoints={}, clientId={}", endpoints, clientId,
+                e);
         }
     }
 
@@ -679,10 +690,12 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                 Futures.whenAllSucceed(updateFuture).run(() -> {
                     inflightRouteFutureLock.lock();
                     try {
-                        final Set<SettableFuture<TopicRouteDataResult>> newFutureSet = inflightRouteFutureTable.remove(topic);
+                        final Set<SettableFuture<TopicRouteDataResult>> newFutureSet =
+                            inflightRouteFutureTable.remove(topic);
                         if (null == newFutureSet) {
                             // Should never reach here.
-                            LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic, clientId);
+                            LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic,
+                                clientId);
                             return;
                         }
                         LOGGER.debug("Fetch topic route successfully, topic={}, in-flight route future "
@@ -692,7 +705,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                         }
                     } catch (Throwable t) {
                         // Should never reach here.
-                        LOGGER.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic, clientId, t);
+                        LOGGER.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic,
+                            clientId, t);
                     } finally {
                         inflightRouteFutureLock.unlock();
                     }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index cfad8ad..ddbbbf9 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -42,8 +42,6 @@ import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
@@ -69,6 +67,8 @@ import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.rpc.RpcClient;
 import org.apache.rocketmq.client.java.rpc.RpcClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @see ClientManager
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
similarity index 88%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
index 90b02f3..bb51870 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
@@ -27,8 +27,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
 import java.io.UnsupportedEncodingException;
@@ -37,6 +35,8 @@ import java.security.NoSuchAlgorithmException;
 import java.time.Duration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Telemetry session is constructed before first communication between client and remote route endpoints.
@@ -50,17 +50,17 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
     private final Endpoints endpoints;
     private volatile StreamObserver<TelemetryCommand> requestObserver;
 
-    public static ListenableFuture<TelemetrySession> register(ClientImpl client, ClientManager clientManager,
-        Endpoints endpoints) {
-        return new TelemetrySession(client, clientManager, endpoints).register();
-    }
-
     private TelemetrySession(ClientImpl client, ClientManager clientManager, Endpoints endpoints) {
         this.client = client;
         this.clientManager = clientManager;
         this.endpoints = endpoints;
     }
 
+    public static ListenableFuture<TelemetrySession> register(ClientImpl client, ClientManager clientManager,
+        Endpoints endpoints) {
+        return new TelemetrySession(client, clientManager, endpoints).register();
+    }
+
     private ListenableFuture<TelemetrySession> register() {
         ListenableFuture<TelemetrySession> future;
         try {
@@ -69,7 +69,8 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
             final Settings settings = clientSettings.toProtobuf();
             final TelemetryCommand settingsCommand = TelemetryCommand.newBuilder().setSettings(settings).build();
             this.telemeter(settingsCommand);
-            future = Futures.transform(clientSettings.getArrivedFuture(), input -> this, MoreExecutors.directExecutor());
+            future = Futures.transform(clientSettings.getArrivedFuture(), input -> this,
+                MoreExecutors.directExecutor());
         } catch (Throwable t) {
             SettableFuture<TelemetrySession> future0 = SettableFuture.create();
             future0.setException(t);
@@ -78,12 +79,14 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
         Futures.addCallback(future, new FutureCallback<TelemetrySession>() {
             @Override
             public void onSuccess(TelemetrySession session) {
-                LOGGER.info("Register telemetry session successfully, endpoints={}, clientId={}", endpoints, client.getClientId());
+                LOGGER.info("Register telemetry session successfully, endpoints={}, clientId={}", endpoints,
+                    client.getClientId());
             }
 
             @Override
             public void onFailure(Throwable t) {
-                LOGGER.error("Failed to register telemetry session, endpoints={}, clientId={}", endpoints, client.getClientId(), t);
+                LOGGER.error("Failed to register telemetry session, endpoints={}, clientId={}", endpoints,
+                    client.getClientId(), t);
                 release();
             }
         }, MoreExecutors.directExecutor());
@@ -106,7 +109,8 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
     /**
      * Initialize telemetry session.
      */
-    private synchronized void init() throws UnsupportedEncodingException, NoSuchAlgorithmException, InvalidKeyException, ClientException {
+    private synchronized void init() throws UnsupportedEncodingException, NoSuchAlgorithmException,
+        InvalidKeyException, ClientException {
         this.release();
         final Metadata metadata = client.sign();
         this.requestObserver = clientManager.telemetry(endpoints, metadata, Duration.ofNanos(Long.MAX_VALUE), this);
@@ -141,35 +145,41 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
             switch (command.getCommandCase()) {
                 case SETTINGS: {
                     final Settings settings = command.getSettings();
-                    LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints, client.getClientId());
+                    LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints,
+                        client.getClientId());
                     client.onSettingsCommand(endpoints, settings);
                     break;
                 }
                 case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
                     final RecoverOrphanedTransactionCommand recoverOrphanedTransactionCommand =
                         command.getRecoverOrphanedTransactionCommand();
-                    LOGGER.info("Receive orphaned transaction recovery command from remote, endpoints={}, clientId={}", endpoints, client.getClientId());
+                    LOGGER.info("Receive orphaned transaction recovery command from remote, endpoints={}, "
+                        + "clientId={}", endpoints, client.getClientId());
                     client.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand);
                     break;
                 }
                 case VERIFY_MESSAGE_COMMAND: {
                     final VerifyMessageCommand verifyMessageCommand = command.getVerifyMessageCommand();
-                    LOGGER.info("Receive message verification command from remote, endpoints={}, clientId={}", client.getClientId());
+                    LOGGER.info("Receive message verification command from remote, endpoints={}, clientId={}",
+                        client.getClientId());
                     client.onVerifyMessageCommand(endpoints, verifyMessageCommand);
                     break;
                 }
                 case PRINT_THREAD_STACK_TRACE_COMMAND: {
                     final PrintThreadStackTraceCommand printThreadStackTraceCommand =
                         command.getPrintThreadStackTraceCommand();
-                    LOGGER.info("Receive thread stack print command from remote, endpoints={}, clientId={}", endpoints, client.getClientId());
+                    LOGGER.info("Receive thread stack print command from remote, endpoints={}, clientId={}",
+                        endpoints, client.getClientId());
                     client.onPrintThreadStackCommand(endpoints, printThreadStackTraceCommand);
                     break;
                 }
                 default:
-                    LOGGER.warn("Receive unrecognized command from remote, endpoints={}, command={}, clientId={}", endpoints, command, client.getClientId());
+                    LOGGER.warn("Receive unrecognized command from remote, endpoints={}, command={}, clientId={}",
+                        endpoints, command, client.getClientId());
             }
         } catch (Throwable t) {
-            LOGGER.error("[Bug] unexpected exception raised while receiving command from remote, command={}, clientId={}", command, client.getClientId());
+            LOGGER.error("[Bug] unexpected exception raised while receiving command from remote, command={}, "
+                + "clientId={}", command, client.getClientId());
         }
     }
 
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignment.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignment.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignment.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignment.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignments.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignments.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignments.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignments.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
index b045548..793b425 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
@@ -23,8 +23,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
@@ -36,6 +34,8 @@ import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.misc.Dispatcher;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("NullableProblems")
 public abstract class ConsumeService extends Dispatcher {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
index b68556b..27fbc36 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
@@ -18,8 +18,6 @@
 package org.apache.rocketmq.client.java.impl.consumer;
 
 import com.google.common.base.Stopwatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
@@ -31,6 +29,8 @@ import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ConsumeTask implements Callable<ConsumeResult> {
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsumeTask.class);
@@ -67,7 +67,8 @@ public class ConsumeTask implements Callable<ConsumeResult> {
             consumeResult = ConsumeResult.FAILURE;
         }
         final Duration duration = stopwatch.elapsed();
-        MessageHookPointsStatus status = ConsumeResult.SUCCESS.equals(consumeResult) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+        MessageHookPointsStatus status = ConsumeResult.SUCCESS.equals(consumeResult) ? MessageHookPointsStatus.OK :
+            MessageHookPointsStatus.ERROR;
         messageInterceptor.doAfter(MessageHookPoints.CONSUME, messageCommons, duration, status);
         // Make sure that the return value is the subset of messageViews.
         return consumeResult;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
similarity index 91%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index b44b112..f674d01 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -39,8 +39,6 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.Timestamp;
 import com.google.protobuf.util.Durations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.Metadata;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -59,6 +57,8 @@ import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
 abstract class ConsumerImpl extends ClientImpl {
@@ -79,7 +79,8 @@ abstract class ConsumerImpl extends ClientImpl {
         try {
             Metadata metadata = sign();
             final Endpoints endpoints = mq.getBroker().getEndpoints();
-            final ListenableFuture<Iterator<ReceiveMessageResponse>> future = clientManager.receiveMessage(endpoints, metadata, request, timeout);
+            final ListenableFuture<Iterator<ReceiveMessageResponse>> future = clientManager.receiveMessage(endpoints,
+                metadata, request, timeout);
             return Futures.transform(future, it -> {
                 // Null here means status not set yet.
                 Status status = null;
@@ -96,13 +97,15 @@ abstract class ConsumerImpl extends ClientImpl {
                             break;
                         case DELIVERY_TIMESTAMP:
                             deliveryTimestampFromRemote = response.getDeliveryTimestamp();
+                            break;
                         default:
-                            LOGGER.warn("[Bug] Not recognized content for receive message response, mq={}, clientId={}, resp={}", mq, clientId, response);
+                            LOGGER.warn("[Bug] Not recognized content for receive message response, mq={}," +
+                                " clientId={}, resp={}", mq, clientId, response);
                     }
                 }
                 for (Message message : messageList) {
-                    final MessageViewImpl messageView = MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote);
-                    messages.add(messageView);
+                    final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote);
+                    messages.add(view);
                 }
                 return new ReceiveMessageResult(endpoints, status, messages);
             }, MoreExecutors.directExecutor());
@@ -156,9 +159,11 @@ abstract class ConsumerImpl extends ClientImpl {
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 final Duration duration = stopwatch.elapsed();
-                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ?
+                    MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
                 if (!Code.OK.equals(code)) {
-                    LOGGER.error("Failed to ack message, code={}, status message=[{}], topic={}, messageId={}, clientId={}", code, status.getMessage(), topic, messageId, clientId);
+                    LOGGER.error("Failed to ack message, code={}, status message=[{}], topic={}, messageId={}," +
+                        " clientId={}", code, status.getMessage(), topic, messageId, clientId);
                 }
                 doAfter(MessageHookPoints.ACK, messageCommons, duration, messageHookPointsStatus);
             }
@@ -167,7 +172,8 @@ abstract class ConsumerImpl extends ClientImpl {
             public void onFailure(Throwable t) {
                 final Duration duration = stopwatch.elapsed();
                 doAfter(MessageHookPoints.ACK, messageCommons, duration, MessageHookPointsStatus.ERROR);
-                LOGGER.error("Exception raised during message acknowledgement, topic={}, messageId={}, clientId={}", topic, messageId, clientId, t);
+                LOGGER.error("Exception raised during message acknowledgement, topic={}, messageId={}, clientId={}",
+                    topic, messageId, clientId, t);
             }
         }, MoreExecutors.directExecutor());
         return future;
@@ -184,7 +190,8 @@ abstract class ConsumerImpl extends ClientImpl {
         try {
             final ChangeInvisibleDurationRequest request = wrapChangeInvisibleDuration(messageView, invisibleDuration);
             final Metadata metadata = sign();
-            future = clientManager.changeInvisibleDuration(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
+            future = clientManager.changeInvisibleDuration(endpoints, metadata, request,
+                clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
             final SettableFuture<ChangeInvisibleDurationResponse> future0 = SettableFuture.create();
             future0.setException(t);
@@ -197,9 +204,11 @@ abstract class ConsumerImpl extends ClientImpl {
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 final Duration duration = stopwatch.elapsed();
-                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ?
+                    MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
                 if (!Code.OK.equals(code)) {
-                    LOGGER.error("Failed to change message invisible duration, messageId={}, endpoints={}, code={}, status message=[{}], clientId={}", messageId, endpoints, code, status.getMessage(), clientId);
+                    LOGGER.error("Failed to change message invisible duration, messageId={}, endpoints={}, code={}," +
+                        " status message=[{}], clientId={}", messageId, endpoints, code, status.getMessage(), clientId);
                 }
                 doAfter(MessageHookPoints.CHANGE_INVISIBLE_DURATION, messageCommons, duration, messageHookPointsStatus);
             }
@@ -208,7 +217,8 @@ abstract class ConsumerImpl extends ClientImpl {
             public void onFailure(Throwable t) {
                 final Duration duration = stopwatch.elapsed();
                 doAfter(MessageHookPoints.ACK, messageCommons, duration, MessageHookPointsStatus.ERROR);
-                LOGGER.error("Exception raised during message acknowledgement, messageId={}, endpoints={}, clientId={}", messageId, endpoints, clientId, t);
+                LOGGER.error("Exception raised during message acknowledgement, messageId={}, endpoints={}, clientId={}",
+                    messageId, endpoints, clientId, t);
 
             }
         }, MoreExecutors.directExecutor());
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
index 0169915..c8beeb2 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
@@ -21,8 +21,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -35,6 +33,8 @@ import org.apache.rocketmq.client.apis.consumer.MessageListener;
 import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("NullableProblems")
 class FifoConsumeService extends ConsumeService {
@@ -53,7 +53,8 @@ class FifoConsumeService extends ConsumeService {
         }
         final MessageViewImpl next = iterator.next();
         final ListenableFuture<ConsumeResult> future0 = consume(next);
-        final ListenableFuture<Void> future = Futures.transformAsync(future0, result -> pq.eraseFifoMessage(next, result), MoreExecutors.directExecutor());
+        final ListenableFuture<Void> future = Futures.transformAsync(future0, result -> pq.eraseFifoMessage(next,
+            result), MoreExecutors.directExecutor());
         Futures.addCallback(future, new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void ignore) {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
similarity index 87%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index bd93e26..3183627 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -29,8 +29,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -53,6 +51,8 @@ import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default implementation of {@link ProcessQueue}.
@@ -63,12 +63,12 @@ import org.apache.rocketmq.client.java.route.MessageQueueImpl;
  */
 @SuppressWarnings({"NullableProblems", "UnstableApiUsage"})
 class ProcessQueueImpl implements ProcessQueue {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ProcessQueueImpl.class);
-
     public static final Duration FORWARD_FIFO_MESSAGE_TO_DLQ_DELAY = Duration.ofMillis(100);
     public static final Duration ACK_FIFO_MESSAGE_DELAY = Duration.ofMillis(100);
     public static final Duration RECEIVE_LATER_DELAY = Duration.ofSeconds(3);
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(ProcessQueueImpl.class);
+
     private final PushConsumerImpl consumer;
 
     /**
@@ -120,12 +120,14 @@ class ProcessQueueImpl implements ProcessQueue {
 
     @Override
     public boolean expired() {
-        Duration maxIdleDuration = Duration.ofNanos(2 * consumer.getPushConsumerSettings().getLongPollingTimeout().toNanos());
+        final PushConsumerSettings settings = consumer.getPushConsumerSettings();
+        Duration maxIdleDuration = Duration.ofNanos(2 * settings.getLongPollingTimeout().toNanos());
         final Duration idleDuration = Duration.ofNanos(System.nanoTime() - activityNanoTime);
         if (idleDuration.compareTo(maxIdleDuration) < 0) {
             return false;
         }
-        LOGGER.warn("Process queue is idle, idle duration={}, max idle duration={}, mq={}, clientId={}", idleDuration, maxIdleDuration, mq, consumer.getClientId());
+        LOGGER.warn("Process queue is idle, idle duration={}, max idle duration={}, mq={}, clientId={}", idleDuration,
+            maxIdleDuration, mq, consumer.getClientId());
         return true;
     }
 
@@ -153,11 +155,13 @@ class ProcessQueueImpl implements ProcessQueue {
             corrupted.forEach(messageView -> {
                 final MessageId messageId = messageView.getMessageId();
                 if (consumer.getPushConsumerSettings().isFifo()) {
-                    LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}, messageId={}, clientId={}", mq, messageId, consumer.getClientId());
+                    LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}," +
+                        " messageId={}, clientId={}", mq, messageId, consumer.getClientId());
                     forwardToDeadLetterQueue(messageView);
                     return;
                 }
-                LOGGER.error("Message is corrupted, nack it in standard mode, mq={}, messageId={}, clientId={}", mq, messageId, consumer.getClientId());
+                LOGGER.error("Message is corrupted, nack it in standard mode, mq={}, messageId={}, clientId={}", mq,
+                    messageId, consumer.getClientId());
                 nackMessage(messageView);
             });
         }
@@ -188,18 +192,21 @@ class ProcessQueueImpl implements ProcessQueue {
                 return;
             }
             // Should never reach here.
-            LOGGER.error("[Bug] Failed to schedule receive message request, mq={}, clientId={}", mq, consumer.getClientId(), t);
+            LOGGER.error("[Bug] Failed to schedule receive message request, mq={}, clientId={}", mq,
+                consumer.getClientId(), t);
             receiveMessageLater();
         }
     }
 
     public void receiveMessage() {
         if (dropped) {
-            LOGGER.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", mq, consumer.getClientId());
+            LOGGER.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", mq,
+                consumer.getClientId());
             return;
         }
         if (this.isCacheFull()) {
-            LOGGER.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq, consumer.getClientId());
+            LOGGER.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq,
+                consumer.getClientId());
             receiveMessageLater();
             return;
         }
@@ -208,7 +215,8 @@ class ProcessQueueImpl implements ProcessQueue {
 
     private void receiveMessageImmediately() {
         if (!consumer.isRunning()) {
-            LOGGER.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, consumer.getClientId());
+            LOGGER.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq,
+                consumer.getClientId());
             return;
         }
         try {
@@ -228,18 +236,20 @@ class ProcessQueueImpl implements ProcessQueue {
                 public void onSuccess(ReceiveMessageResult result) {
                     // Intercept after message reception.
                     final Duration duration = stopwatch.elapsed();
-                    final List<MessageCommon> messageCommons = result.getMessages().stream().map(MessageViewImpl::getMessageCommon).collect(Collectors.toList());
+                    final List<MessageCommon> commons = result.getMessages().stream()
+                        .map(MessageViewImpl::getMessageCommon).collect(Collectors.toList());
                     if (result.getStatus().isPresent() && Code.OK.equals(result.getStatus().get().getCode())) {
-                        consumer.doAfter(MessageHookPoints.RECEIVE, messageCommons, duration, MessageHookPointsStatus.OK);
+                        consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.OK);
                     } else {
-                        consumer.doAfter(MessageHookPoints.RECEIVE, messageCommons, duration, MessageHookPointsStatus.ERROR);
+                        consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.ERROR);
                     }
 
                     try {
                         onReceiveMessageResult(result);
                     } catch (Throwable t) {
                         // Should never reach here.
-                        LOGGER.error("[Bug] Exception raised while handling receive result, would receive later, mq={}, endpoints={}, clientId={}",
+                        LOGGER.error("[Bug] Exception raised while handling receive result, would receive later," +
+                                " mq={}, endpoints={}, clientId={}",
                             mq, endpoints, consumer.getClientId(), t);
                         receiveMessageLater();
                     }
@@ -249,16 +259,18 @@ class ProcessQueueImpl implements ProcessQueue {
                 public void onFailure(Throwable t) {
                     // Intercept after message reception.
                     final Duration duration = stopwatch.elapsed();
-                    consumer.doAfter(MessageHookPoints.RECEIVE, Collections.emptyList(), duration, MessageHookPointsStatus.ERROR);
+                    consumer.doAfter(MessageHookPoints.RECEIVE, Collections.emptyList(), duration,
+                        MessageHookPointsStatus.ERROR);
 
-                    LOGGER.error("Exception raised while message reception, would receive later, mq={}, endpoints={}, clientId={}",
-                        mq, endpoints, consumer.getClientId(), t);
+                    LOGGER.error("Exception raised while message reception, would receive later, mq={}, endpoints={}," +
+                        " clientId={}", mq, endpoints, consumer.getClientId(), t);
                     receiveMessageLater();
                 }
             }, MoreExecutors.directExecutor());
             consumer.getReceptionTimes().getAndIncrement();
         } catch (Throwable t) {
-            LOGGER.error("Exception raised while message reception, would receive later, mq={}, clientId={}", mq, consumer.getClientId(), t);
+            LOGGER.error("Exception raised while message reception, would receive later, mq={}, clientId={}", mq,
+                consumer.getClientId(), t);
             receiveMessageLater();
         }
     }
@@ -267,14 +279,16 @@ class ProcessQueueImpl implements ProcessQueue {
         final int cacheMessageCountThresholdPerQueue = consumer.cacheMessageCountThresholdPerQueue();
         final long actualMessagesQuantity = this.cachedMessagesCount();
         if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity) {
-            LOGGER.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}, mq={}, clientId={}",
+            LOGGER.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}," +
+                    " mq={}, clientId={}",
                 cacheMessageCountThresholdPerQueue, actualMessagesQuantity, mq, consumer.getClientId());
             return true;
         }
         final int cacheMessageBytesThresholdPerQueue = consumer.cacheMessageBytesThresholdPerQueue();
         final long actualCachedMessagesBytes = this.cachedMessageBytes();
         if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes) {
-            LOGGER.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes, actual={} bytes, mq={}, clientId={}",
+            LOGGER.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes," +
+                    " actual={} bytes, mq={}, clientId={}",
                 cacheMessageBytesThresholdPerQueue, actualCachedMessagesBytes, mq, consumer.getClientId());
             return true;
         }
@@ -316,7 +330,8 @@ class ProcessQueueImpl implements ProcessQueue {
                 receiveMessage();
             }
             optionalStatus = Optional.of(Status.newBuilder().setCode(Code.OK).build());
-            LOGGER.error("[Bug] Status not set but message(s) found in the receive result, fix the status to OK, mq={}, endpoints={}", mq, endpoints);
+            LOGGER.error("[Bug] Status not set but message(s) found in the receive result, fix the status to OK," +
+                " mq={}, endpoints={}", mq, endpoints);
         }
         final Status status = optionalStatus.get();
         final Code code = status.getCode();
@@ -327,17 +342,20 @@ class ProcessQueueImpl implements ProcessQueue {
                     consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
                     consumer.getConsumeService().signal();
                 }
-                LOGGER.debug("Receive message with OK, mq={}, endpoints={}, messages found count={}, clientId={}", mq, endpoints, messages.size(), consumer.getClientId());
+                LOGGER.debug("Receive message with OK, mq={}, endpoints={}, messages found count={}, clientId={}",
+                    mq, endpoints, messages.size(), consumer.getClientId());
                 receiveMessage();
                 break;
             case MESSAGE_NOT_FOUND:
-                LOGGER.info("Message not found while receiving message, mq={}, endpoints={}, clientId={}, code={}, status message=[{}]",
+                LOGGER.info("Message not found while receiving message, mq={}, endpoints={}, clientId={}, code={}," +
+                        " status message=[{}]",
                     mq, endpoints, consumer.getClientId(), code.getNumber(), status.getMessage());
                 receiveMessage();
                 break;
             // Fall through on purpose.
             default:
-                LOGGER.error("Failed to receive message from remote, try it later, mq={}, endpoints={}, clientId={}, code={}, status message=[{}]",
+                LOGGER.error("Failed to receive message from remote, try it later, mq={}, endpoints={}, clientId={}," +
+                        " code={}, status message=[{}]",
                     mq, endpoints, consumer.getClientId(), code.getNumber(), status.getMessage());
                 receiveMessageLater();
         }
@@ -431,14 +449,18 @@ class ProcessQueueImpl implements ProcessQueue {
         if (ConsumeResult.FAILURE.equals(consumeResult) && attempt < maxAttempts) {
             final Duration nextAttemptDelay = retryPolicy.getNextAttemptDelay(attempt);
             attempt = messageView.incrementAndGetDeliveryAttempt();
-            LOGGER.debug("Prepare to redeliver the fifo message because of the consumption failure, maxAttempt={}, attempt={}, mq={}, messageId={}, nextAttemptDelay={}, clientId={}",
+            LOGGER.debug("Prepare to redeliver the fifo message because of the consumption failure, maxAttempt={}," +
+                    " attempt={}, mq={}, messageId={}, nextAttemptDelay={}, clientId={}",
                 maxAttempts, attempt, mq, messageId, nextAttemptDelay, consumer.getClientId());
             final ListenableFuture<ConsumeResult> future = service.consume(messageView, nextAttemptDelay);
-            return Futures.transformAsync(future, result -> eraseFifoMessage(messageView, result), MoreExecutors.directExecutor());
+            return Futures.transformAsync(future, result -> eraseFifoMessage(messageView, result),
+                MoreExecutors.directExecutor());
         }
         boolean ok = ConsumeResult.SUCCESS.equals(consumeResult);
         if (!ok) {
-            LOGGER.info("Failed to consume fifo message finally, run out of attempt times, maxAttempts={}, attempt={}, mq={}, messageId={}, clientId={}",
+            LOGGER.info("Failed to consume fifo message finally, run out of attempt times, maxAttempts={}, "
+                    + "attempt={}," +
+                    " mq={}, messageId={}, clientId={}",
                 maxAttempts, attempt, mq, messageId, consumer.getClientId());
         }
         // Ack message or forward it to DLQ depends on consumption result.
@@ -465,7 +487,8 @@ class ProcessQueueImpl implements ProcessQueue {
                 final Code code = status.getCode();
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
-                    LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later, clientId={}, messageId={}, attempt={}, mq={}, code={}, status message={}",
+                    LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later," +
+                            " clientId={}, messageId={}, attempt={}, mq={}, code={}, status message={}",
                         clientId, messageView.getMessageId(), attempt, mq, code, status.getMessage());
                     forwardToDeadLetterQueue(messageView, 1 + attempt, future0);
                     return;
@@ -496,7 +519,8 @@ class ProcessQueueImpl implements ProcessQueue {
         final String clientId = consumer.getClientId();
         // Process queue is dropped, no need to proceed.
         if (dropped) {
-            LOGGER.info("Process queue was dropped, give up to forward message to dead letter queue, mq={}, messageId={}, clientId={}", mq, messageId, clientId);
+            LOGGER.info("Process queue was dropped, give up to forward message to dead letter queue, mq={}," +
+                " messageId={}, clientId={}", mq, messageId, clientId);
             return;
         }
         final ScheduledExecutorService scheduler = consumer.getScheduler();
@@ -508,7 +532,8 @@ class ProcessQueueImpl implements ProcessQueue {
                 return;
             }
             // Should never reach here.
-            LOGGER.error("[Bug] Failed to schedule DLQ message request, mq={}, messageId={}, clientId={}", mq, messageView.getMessageId(), clientId);
+            LOGGER.error("[Bug] Failed to schedule DLQ message request, mq={}, messageId={}, clientId={}", mq,
+                messageView.getMessageId(), clientId);
             forwardToDeadLetterQueueLater(messageView, 1 + attempt, future0);
         }
     }
@@ -531,15 +556,16 @@ class ProcessQueueImpl implements ProcessQueue {
                 final Code code = status.getCode();
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
-                    LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, attempt={}, messageId={}, mq={}, code={}, endpoints={}, status message=[{}]",
+                    LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, attempt={}," +
+                            " messageId={}, mq={}, code={}, endpoints={}, status message=[{}]",
                         clientId, attempt, messageView.getMessageId(), mq, code, endpoints, status.getMessage());
                     ackFifoMessageLater(messageView, 1 + attempt, future0);
                     return;
                 }
                 // Log retries.
                 if (1 < attempt) {
-                    LOGGER.info("Re-ack fifo message successfully, clientId={}, attempt={}, messageId={}, mq={}, endpoints={}",
-                        clientId, attempt, messageView.getMessageId(), mq, endpoints);
+                    LOGGER.info("Re-ack fifo message successfully, clientId={}, attempt={}, messageId={}, mq={}," +
+                        " endpoints={}", clientId, attempt, messageView.getMessageId(), mq, endpoints);
                 }
                 // Set result if FIFO message is acknowledged successfully.
                 future0.set(null);
@@ -548,7 +574,8 @@ class ProcessQueueImpl implements ProcessQueue {
             @Override
             public void onFailure(Throwable t) {
                 // Log failure and retry later.
-                LOGGER.error("Exception raised while ack fifo message, clientId={}, would attempt to re-ack later, attempt={}, messageId={}, mq={}, endpoints={}",
+                LOGGER.error("Exception raised while ack fifo message, clientId={}, would attempt to re-ack later," +
+                        " attempt={}, messageId={}, mq={}, endpoints={}",
                     clientId, attempt, messageView.getMessageId(), mq, endpoints, t);
                 ackFifoMessageLater(messageView, 1 + attempt, future0);
             }
@@ -561,7 +588,8 @@ class ProcessQueueImpl implements ProcessQueue {
         final String clientId = consumer.getClientId();
         // Process queue is dropped, no need to proceed.
         if (dropped) {
-            LOGGER.info("Process queue was dropped, give up to ack message, mq={}, messageId={}, clientId={}", mq, messageId, clientId);
+            LOGGER.info("Process queue was dropped, give up to ack message, mq={}, messageId={}, clientId={}",
+                mq, messageId, clientId);
             return;
         }
         final ScheduledExecutorService scheduler = consumer.getScheduler();
@@ -573,7 +601,8 @@ class ProcessQueueImpl implements ProcessQueue {
                 return;
             }
             // Should never reach here.
-            LOGGER.error("[Bug] Failed to schedule ack fifo message request, mq={}, messageId={}, clientId={}", mq, messageId, clientId);
+            LOGGER.error("[Bug] Failed to schedule ack fifo message request, mq={}, messageId={}, clientId={}",
+                mq, messageId, clientId);
             ackFifoMessageLater(messageView, 1 + attempt, future0);
         }
     }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
index 13cae19..de3c6ef 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
@@ -27,9 +30,6 @@ import org.apache.rocketmq.client.apis.consumer.MessageListener;
 import org.apache.rocketmq.client.apis.consumer.PushConsumer;
 import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Implementation of {@link PushConsumerBuilder}
  */
@@ -59,7 +59,8 @@ public class PushConsumerBuilderImpl implements PushConsumerBuilder {
     @Override
     public PushConsumerBuilder setConsumerGroup(String consumerGroup) {
         checkNotNull(consumerGroup, "consumerGroup should not be null");
-        checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(), "consumerGroup does not match the regex [regex=%s]", CONSUMER_GROUP_PATTERN.pattern());
+        checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(), "consumerGroup does not match the "
+            + "regex [regex=%s]", CONSUMER_GROUP_PATTERN.pattern());
         this.consumerGroup = consumerGroup;
         return this;
     }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
similarity index 93%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 59784a2..b14c047 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -33,8 +33,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.Metadata;
 import java.time.Duration;
 import java.util.Collections;
@@ -73,6 +71,8 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default implementation of {@link PushConsumer}
@@ -86,6 +86,9 @@ import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
 class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCacheObserver {
     private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerImpl.class);
 
+    final AtomicLong consumptionOkQuantity;
+    final AtomicLong consumptionErrorQuantity;
+
     private final ClientConfiguration clientConfiguration;
     private final PushConsumerSettings pushConsumerSettings;
     private final String consumerGroup;
@@ -104,9 +107,6 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
      */
     private final AtomicLong receivedMessagesQuantity;
 
-    final AtomicLong consumptionOkQuantity;
-    final AtomicLong consumptionErrorQuantity;
-
     private final ThreadPoolExecutor consumptionExecutor;
     private final ConcurrentMap<MessageQueueImpl, ProcessQueue> processQueueTable;
     private ConsumeService consumeService;
@@ -215,7 +215,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
     public PushConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException {
         // Check consumer status.
         if (!this.isRunning()) {
-            LOGGER.error("Unable to add subscription because push consumer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to add subscription because push consumer is not running, state={}, clientId={}",
+                this.state(), clientId);
             throw new IllegalStateException("Push consumer is not running now");
         }
 
@@ -237,7 +238,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
     public PushConsumer unsubscribe(String topic) {
         // Check consumer status.
         if (!this.isRunning()) {
-            LOGGER.error("Unable to remove subscription because push consumer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to remove subscription because push consumer is not running, state={}, clientId={}",
+                this.state(), clientId);
             throw new IllegalStateException("Push consumer is not running now");
         }
         subscriptionExpressions.remove(topic);
@@ -278,7 +280,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
             final Status status = response.getStatus();
             final Code code = status.getCode();
             if (!Code.OK.equals(code)) {
-                final String message = String.format("Failed to query assignment, code=%d, status message=[{%s}]", code.getNumber(), status.getMessage());
+                final String message = String.format("Failed to query assignment, code=%d, status message=[{%s}]",
+                    code.getNumber(), status.getMessage());
                 throw new RuntimeException(message);
             }
             SettableFuture<Assignments> future0 = SettableFuture.create();
@@ -385,14 +388,17 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
                     public void onSuccess(Assignments latest) {
                         if (latest.getAssignmentList().isEmpty()) {
                             if (null == existed || existed.getAssignmentList().isEmpty()) {
-                                LOGGER.info("Acquired empty assignments from remote, would scan later, topic={}, clientId={}", topic, clientId);
+                                LOGGER.info("Acquired empty assignments from remote, would scan later, topic={}, "
+                                    + "clientId={}", topic, clientId);
                                 return;
                             }
-                            LOGGER.info("Attention!!! acquired empty assignments from remote, but existed assignments is not empty, topic={}, clientId={}", topic, clientId);
+                            LOGGER.info("Attention!!! acquired empty assignments from remote, but existed assignments"
+                                + " is not empty, topic={}, clientId={}", topic, clientId);
                         }
 
                         if (!latest.equals(existed)) {
-                            LOGGER.info("Assignments of topic={} has changed, {} => {}, clientId={}", topic, existed, latest, clientId);
+                            LOGGER.info("Assignments of topic={} has changed, {} => {}, clientId={}", topic, existed,
+                                latest, clientId);
                             syncProcessQueue(topic, latest, filterExpression);
                             cacheAssignments.put(topic, latest);
                             return;
@@ -404,7 +410,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
                     @SuppressWarnings("NullableProblems")
                     @Override
                     public void onFailure(Throwable t) {
-                        LOGGER.error("Exception raised while scanning the assignments, topic={}, clientId={}", topic, clientId, t);
+                        LOGGER.error("Exception raised while scanning the assignments, topic={}, clientId={}", topic,
+                            clientId, t);
                     }
                 }, MoreExecutors.directExecutor());
             }
@@ -472,7 +479,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
             public void onSuccess(ConsumeResult consumeResult) {
                 Code code = ConsumeResult.SUCCESS.equals(consumeResult) ? Code.OK : Code.FAILED_TO_CONSUME_MESSAGE;
                 Status status = Status.newBuilder().setCode(code).build();
-                final VerifyMessageResult verifyMessageResult = VerifyMessageResult.newBuilder().setNonce(nonce).build();
+                final VerifyMessageResult verifyMessageResult =
+                    VerifyMessageResult.newBuilder().setNonce(nonce).build();
                 TelemetryCommand command = TelemetryCommand.newBuilder()
                     .setVerifyMessageResult(verifyMessageResult)
                     .setStatus(status)
@@ -480,14 +488,16 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
                 try {
                     telemeter(endpoints, command);
                 } catch (Throwable t) {
-                    LOGGER.error("Failed to send message verification result command, endpoints={}, command={}, messageId={}, clientId={}", endpoints, command, messageId, clientId, t);
+                    LOGGER.error("Failed to send message verification result command, endpoints={}, command={}, "
+                        + "messageId={}, clientId={}", endpoints, command, messageId, clientId, t);
                 }
             }
 
             @Override
             public void onFailure(Throwable t) {
                 // Should never reach here.
-                LOGGER.error("[Bug] Failed to get message verification result, endpoints={}, messageId={}, clientId={}", endpoints, messageId, clientId, t);
+                LOGGER.error("[Bug] Failed to get message verification result, endpoints={}, messageId={}, "
+                    + "clientId={}", endpoints, messageId, clientId, t);
             }
         }, MoreExecutors.directExecutor());
     }
@@ -527,7 +537,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
             @Override
             public void onSuccess(ForwardMessageToDeadLetterQueueResponse response) {
                 final Duration duration = stopwatch.elapsed();
-                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(response.getStatus().getCode()) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(response.getStatus().getCode()) ?
+                    MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
                 // Intercept after forwarding message to DLQ.
                 doAfter(MessageHookPoints.FORWARD_TO_DLQ, messageCommons, duration, messageHookPointsStatus);
             }
@@ -550,11 +561,15 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
         final long consumptionOkQuantity = this.consumptionOkQuantity.getAndSet(0);
         final long consumptionErrorQuantity = this.consumptionErrorQuantity.getAndSet(0);
 
-        LOGGER.info("clientId={}, consumerGroup={}, receptionTimes={}, receivedMessagesQuantity={}, consumptionOkQuantity={}, consumptionErrorQuantity={}",
-            clientId, consumerGroup, receptionTimes, receivedMessagesQuantity, consumptionOkQuantity, consumptionErrorQuantity);
+        LOGGER.info("clientId={}, consumerGroup={}, receptionTimes={}, receivedMessagesQuantity={}, "
+                + "consumptionOkQuantity={}, consumptionErrorQuantity={}",
+            clientId, consumerGroup, receptionTimes, receivedMessagesQuantity, consumptionOkQuantity,
+            consumptionErrorQuantity);
         for (ProcessQueue pq : processQueueTable.values()) {
-            LOGGER.info("Process queue stats: clientId={}, mq={}, pendingMessageCount={}, inflightMessageCount={}, cachedMessageBytes={}",
-                clientId, pq.getMessageQueue(), pq.getPendingMessageCount(), pq.getInflightMessageCount(), pq.getCachedMessageBytes());
+            LOGGER.info("Process queue stats: clientId={}, mq={}, pendingMessageCount={}, inflightMessageCount={}, "
+                    + "cachedMessageBytes={}",
+                clientId, pq.getMessageQueue(), pq.getPendingMessageCount(), pq.getInflightMessageCount(),
+                pq.getCachedMessageBytes());
         }
     }
 
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
index 90e96a2..b9e97a5 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
@@ -24,8 +24,6 @@ import apache.rocketmq.v2.Subscription;
 import apache.rocketmq.v2.SubscriptionEntry;
 import com.google.common.base.MoreObjects;
 import com.google.protobuf.util.Durations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
@@ -39,6 +37,8 @@ import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PushConsumerSettings extends ClientSettings {
     private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerSettings.class);
@@ -73,8 +73,10 @@ public class PushConsumerSettings extends ClientSettings {
         List<SubscriptionEntry> subscriptionEntries = new ArrayList<>();
         for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
             final FilterExpression filterExpression = entry.getValue();
-            apache.rocketmq.v2.Resource topic = apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
-            final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder = apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
+            apache.rocketmq.v2.Resource topic =
+                apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
+            final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
+                apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
             final FilterExpressionType type = filterExpression.getFilterExpressionType();
             switch (type) {
                 case TAG:
@@ -86,10 +88,12 @@ public class PushConsumerSettings extends ClientSettings {
                 default:
                     LOGGER.warn("[Bug] Unrecognized filter type, type={}", type);
             }
-            SubscriptionEntry subscriptionEntry = SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
+            SubscriptionEntry subscriptionEntry =
+                SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
             subscriptionEntries.add(subscriptionEntry);
         }
-        Subscription subscription = Subscription.newBuilder().setGroup(group.toProtobuf()).addAllSubscriptions(subscriptionEntries).build();
+        Subscription subscription =
+            Subscription.newBuilder().setGroup(group.toProtobuf()).addAllSubscriptions(subscriptionEntries).build();
         return Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
             .setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setSubscription(subscription)
             .setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
@@ -99,7 +103,8 @@ public class PushConsumerSettings extends ClientSettings {
     public void applySettingsCommand(Settings settings) {
         final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
         if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
-            LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, client type={}", clientId, pubSubCase, clientType);
+            LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, "
+                + "client type={}", clientId, pubSubCase, clientType);
             return;
         }
         final Subscription subscription = settings.getSubscription();
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
similarity index 94%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
index fe19f4b..297845d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -27,9 +30,6 @@ import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 public class SimpleConsumerBuilderImpl implements SimpleConsumerBuilder {
     private static final Pattern CONSUMER_GROUP_PATTERN = Pattern.compile("^[%|a-zA-Z0-9._-]{1,255}$");
 
@@ -53,8 +53,8 @@ public class SimpleConsumerBuilderImpl implements SimpleConsumerBuilder {
     @Override
     public SimpleConsumerBuilder setConsumerGroup(String consumerGroup) {
         checkNotNull(consumerGroup, "consumerGroup should not be null");
-        checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(), "consumerGroup does not match the regex [regex=%s]",
-            CONSUMER_GROUP_PATTERN.pattern());
+        checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(),
+            "consumerGroup does not match the regex [regex=%s]", CONSUMER_GROUP_PATTERN.pattern());
         this.consumerGroup = consumerGroup;
         return this;
     }
@@ -79,7 +79,8 @@ public class SimpleConsumerBuilderImpl implements SimpleConsumerBuilder {
         checkNotNull(consumerGroup, "consumerGroup has not been set yet");
         checkArgument(!subscriptionExpressions.isEmpty(), "subscriptionExpressions have not been set yet");
         checkNotNull(awaitDuration, "awaitDuration has not been set yet");
-        final SimpleConsumerImpl consumer = new SimpleConsumerImpl(clientConfiguration, consumerGroup, awaitDuration, subscriptionExpressions);
+        final SimpleConsumerImpl consumer = new SimpleConsumerImpl(clientConfiguration, consumerGroup, awaitDuration,
+            subscriptionExpressions);
         consumer.startAsync().awaitRunning();
         return consumer;
     }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index f0605b6..db48b87 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -27,8 +27,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -51,6 +49,8 @@ import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of {@link SimpleConsumer}
@@ -66,20 +66,21 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     private final AtomicInteger topicIndex;
 
     private final Map<String /* topic */, FilterExpression> subscriptionExpressions;
-    private final ConcurrentMap<String /* topic */, SubscriptionTopicRouteDataResult> subscriptionTopicRouteDataResultCache;
+    private final ConcurrentMap<String /* topic */, SubscriptionTopicRouteDataResult> subTopicRouteDataResultCache;
 
     public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration,
         Map<String, FilterExpression> subscriptionExpressions) {
         super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
         Resource groupResource = new Resource(consumerGroup);
-        this.simpleConsumerSettings = new SimpleConsumerSettings(clientId, accessEndpoints, groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
+        this.simpleConsumerSettings = new SimpleConsumerSettings(clientId, accessEndpoints, groupResource,
+            clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
         this.consumerGroup = consumerGroup;
         this.awaitDuration = awaitDuration;
 
         this.topicIndex = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
 
         this.subscriptionExpressions = subscriptionExpressions;
-        this.subscriptionTopicRouteDataResultCache = new ConcurrentHashMap<>();
+        this.subTopicRouteDataResultCache = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -111,7 +112,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     public SimpleConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException {
         // Check consumer status.
         if (!this.isRunning()) {
-            LOGGER.error("Unable to add subscription because simple consumer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to add subscription because simple consumer is not running, state={}, clientId={}",
+                this.state(), clientId);
             throw new IllegalStateException("Simple consumer is not running now");
         }
         final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
@@ -131,7 +133,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     @Override
     public SimpleConsumer unsubscribe(String topic) {
         if (!this.isRunning()) {
-            LOGGER.error("Unable to remove subscription because simple consumer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to remove subscription because simple consumer is not running, state={}, "
+                + "clientId={}", this.state(), clientId);
             throw new IllegalStateException("Simple consumer is not running now");
         }
         subscriptionExpressions.remove(topic);
@@ -167,7 +170,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     public ListenableFuture<List<MessageView>> receive0(int maxMessageNum, Duration invisibleDuration) {
         SettableFuture<List<MessageView>> future = SettableFuture.create();
         if (!this.isRunning()) {
-            LOGGER.error("Unable to receive message because simple consumer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to receive message because simple consumer is not running, state={}, clientId={}",
+                this.state(), clientId);
             future.setException(new IllegalStateException("Simple consumer is not running now"));
             return future;
         }
@@ -175,7 +179,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         final ArrayList<String> topics = new ArrayList<>(copy.keySet());
         // All topic is subscribed.
         if (topics.isEmpty()) {
-            final IllegalArgumentException exception = new IllegalArgumentException("There is no topic to receive message");
+            final IllegalArgumentException exception = new IllegalArgumentException("There is no topic to receive "
+                + "message");
             future.setException(exception);
             return future;
         }
@@ -184,7 +189,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         final ListenableFuture<SubscriptionTopicRouteDataResult> routeFuture = getSubscriptionTopicRouteResult(topic);
         final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
             final MessageQueueImpl mq = result.takeMessageQueue();
-            final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, invisibleDuration);
+            final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
+                invisibleDuration);
             return receiveMessage(request, mq, awaitDuration);
         }, MoreExecutors.directExecutor());
         return Futures.transformAsync(future0, result -> {
@@ -225,12 +231,14 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         SettableFuture<Void> future0 = SettableFuture.create();
         // Check consumer status.
         if (!this.isRunning()) {
-            LOGGER.error("Unable to ack message because simple consumer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to ack message because simple consumer is not running, state={}, clientId={}",
+                this.state(), clientId);
             future0.setException(new IllegalStateException("Simple consumer is not running now"));
             return future0;
         }
         if (!(messageView instanceof MessageViewImpl)) {
-            final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for messageView");
+            final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for "
+                + "messageView");
             future0.setException(exception);
             return future0;
         }
@@ -268,12 +276,14 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     public ListenableFuture<Void> changeInvisibleDuration0(MessageView messageView, Duration invisibleDuration) {
         SettableFuture<Void> future0 = SettableFuture.create();
         if (!(messageView instanceof MessageViewImpl)) {
-            final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for messageView");
+            final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for "
+                + "messageView");
             future0.setException(exception);
             return future0;
         }
         MessageViewImpl impl = (MessageViewImpl) messageView;
-        final ListenableFuture<ChangeInvisibleDurationResponse> future = changInvisibleDuration(impl, invisibleDuration);
+        final ListenableFuture<ChangeInvisibleDurationResponse> future = changInvisibleDuration(impl,
+            invisibleDuration);
         return Futures.transformAsync(future, response -> {
             // Refresh receipt handle manually.
             impl.setReceiptHandle(response.getReceiptHandle());
@@ -302,21 +312,23 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     }
 
     public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
-        final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult = new SubscriptionTopicRouteDataResult(topicRouteDataResult);
-        subscriptionTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
+        final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult =
+            new SubscriptionTopicRouteDataResult(topicRouteDataResult);
+        subTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
     }
 
     private ListenableFuture<SubscriptionTopicRouteDataResult> getSubscriptionTopicRouteResult(final String topic) {
         SettableFuture<SubscriptionTopicRouteDataResult> future0 = SettableFuture.create();
-        final SubscriptionTopicRouteDataResult result = subscriptionTopicRouteDataResultCache.get(topic);
+        final SubscriptionTopicRouteDataResult result = subTopicRouteDataResultCache.get(topic);
         if (null != result) {
             future0.set(result);
             return future0;
         }
         final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
         return Futures.transform(future, topicRouteDataResult -> {
-            final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult = new SubscriptionTopicRouteDataResult(topicRouteDataResult);
-            subscriptionTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
+            final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult =
+                new SubscriptionTopicRouteDataResult(topicRouteDataResult);
+            subTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
             return subscriptionTopicRouteDataResult;
         }, MoreExecutors.directExecutor());
     }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
index 70c981a..0117237 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
@@ -23,8 +23,6 @@ import apache.rocketmq.v2.Subscription;
 import apache.rocketmq.v2.SubscriptionEntry;
 import com.google.common.base.MoreObjects;
 import com.google.protobuf.util.Durations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
@@ -36,6 +34,8 @@ import org.apache.rocketmq.client.java.impl.ClientSettings;
 import org.apache.rocketmq.client.java.impl.ClientType;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SimpleConsumerSettings extends ClientSettings {
     private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerSettings.class);
@@ -57,8 +57,10 @@ public class SimpleConsumerSettings extends ClientSettings {
         List<SubscriptionEntry> subscriptionEntries = new ArrayList<>();
         for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
             final FilterExpression filterExpression = entry.getValue();
-            apache.rocketmq.v2.Resource topic = apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
-            final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder = apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
+            apache.rocketmq.v2.Resource topic =
+                apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
+            final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
+                apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
             final FilterExpressionType type = filterExpression.getFilterExpressionType();
             switch (type) {
                 case TAG:
@@ -70,7 +72,8 @@ public class SimpleConsumerSettings extends ClientSettings {
                 default:
                     LOGGER.warn("[Bug] Unrecognized filter type for simple consumer, type={}", type);
             }
-            SubscriptionEntry subscriptionEntry = SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
+            SubscriptionEntry subscriptionEntry =
+                SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
             subscriptionEntries.add(subscriptionEntry);
         }
         Subscription subscription = Subscription.newBuilder().setGroup(group.toProtobuf())
@@ -85,7 +88,8 @@ public class SimpleConsumerSettings extends ClientSettings {
     public void applySettingsCommand(Settings settings) {
         final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
         if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
-            LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, client type={}", clientId, pubSubCase, clientType);
+            LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, "
+                + "client type={}", clientId, pubSubCase, clientType);
             return;
         }
         this.arrivedFuture.set(null);
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
similarity index 99%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
index bb0681d..52d0c21 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
@@ -21,8 +21,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -35,6 +33,8 @@ import org.apache.rocketmq.client.apis.consumer.MessageListener;
 import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("NullableProblems")
 public class StandardConsumeService extends ConsumeService {
@@ -89,7 +89,6 @@ public class StandardConsumeService extends ConsumeService {
         boolean dispatched;
         do {
             dispatched = dispatch0();
-        }
-        while (dispatched);
+        } while (dispatched);
     }
 }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
index 522934d..804cf3f 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
@@ -63,7 +63,8 @@ public class SubscriptionTopicRouteDataResult {
         }
         if (messageQueues.isEmpty()) {
             // Should never reach here.
-            throw new ResourceNotFoundException("Failed to take message queue due to readable message queue doesn't exist");
+            throw new ResourceNotFoundException("Failed to take message queue due to readable message queue doesn't "
+                + "exist");
         }
         final int next = messageQueueIndex.getAndIncrement();
         return messageQueues.get(IntMath.mod(next, messageQueues.size()));
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
similarity index 95%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
index 3ab768a..30d4f9d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
@@ -27,9 +30,6 @@ import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
 import org.apache.rocketmq.client.apis.producer.TransactionChecker;
 import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Implementation of {@link ProducerBuilder}
  */
@@ -57,7 +57,8 @@ public class ProducerBuilderImpl implements ProducerBuilder {
     @Override
     public ProducerBuilder setTopics(String... topics) {
         final Set<String> set = Arrays.stream(topics).peek(topic -> checkNotNull(topic, "topic should not be null"))
-            .peek(topic -> checkArgument(MessageBuilderImpl.TOPIC_PATTERN.matcher(topic).matches(), "topic does not match the regex [regex=%s]", MessageBuilderImpl.TOPIC_PATTERN.pattern()))
+            .peek(topic -> checkArgument(MessageBuilderImpl.TOPIC_PATTERN.matcher(topic).matches(), "topic does not "
+                + "match the regex [regex=%s]", MessageBuilderImpl.TOPIC_PATTERN.pattern()))
             .collect(Collectors.toSet());
         this.topics.addAll(set);
         return this;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
similarity index 87%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 0877499..7c3d658 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -34,8 +34,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.Metadata;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -73,6 +71,8 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default implementation of {@link Producer}
@@ -92,10 +92,12 @@ class ProducerImpl extends ClientImpl implements Producer {
      * The caller is supposed to have validated the arguments and handled throwing exception or
      * logging warnings already, so we avoid repeating args check here.
      */
-    ProducerImpl(ClientConfiguration clientConfiguration, Set<String> topics, int maxAttempts, TransactionChecker checker) {
+    ProducerImpl(ClientConfiguration clientConfiguration, Set<String> topics, int maxAttempts,
+        TransactionChecker checker) {
         super(clientConfiguration, topics);
         ExponentialBackoffRetryPolicy retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
-        this.producerSettings = new ProducerSettings(clientId, accessEndpoints, retryPolicy, clientConfiguration.getRequestTimeout(), topics.stream().map(Resource::new).collect(Collectors.toSet()));
+        this.producerSettings = new ProducerSettings(clientId, accessEndpoints, retryPolicy,
+            clientConfiguration.getRequestTimeout(), topics.stream().map(Resource::new).collect(Collectors.toSet()));
         this.checker = checker;
 
         this.publishingRouteDataResultCache = new ConcurrentHashMap<>();
@@ -121,14 +123,17 @@ class ProducerImpl extends ClientImpl implements Producer {
         final String transactionId = command.getTransactionId();
         final String messageId = command.getOrphanedTransactionalMessage().getSystemProperties().getMessageId();
         if (null == checker) {
-            LOGGER.error("No transaction checker registered, ignore it, messageId={}, transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId);
+            LOGGER.error("No transaction checker registered, ignore it, messageId={}, transactionId={}, endpoints={},"
+                + " clientId={}", messageId, transactionId, endpoints, clientId);
             return;
         }
         MessageViewImpl messageView;
         try {
             messageView = MessageViewImpl.fromProtobuf(command.getOrphanedTransactionalMessage(), mq);
         } catch (Throwable t) {
-            LOGGER.error("[Bug] Failed to decode message during orphaned transaction message recovery, messageId={}, transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, endpoints, clientId, t);
+            LOGGER.error("[Bug] Failed to decode message during orphaned transaction message recovery, messageId={}, "
+                    + "transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, endpoints,
+                clientId, t);
             return;
         }
         ListenableFuture<TransactionResolution> future;
@@ -148,15 +153,18 @@ class ProducerImpl extends ClientImpl implements Producer {
                     if (null == resolution || TransactionResolution.UNKNOWN.equals(resolution)) {
                         return;
                     }
-                    endTransaction(endpoints, messageView.getMessageCommon(), messageView.getMessageId(), transactionId, resolution);
+                    endTransaction(endpoints, messageView.getMessageCommon(), messageView.getMessageId(),
+                        transactionId, resolution);
                 } catch (Throwable t) {
-                    LOGGER.error("Exception raised while ending the transaction, messageId={}, transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
+                    LOGGER.error("Exception raised while ending the transaction, messageId={}, transactionId={}, "
+                        + "endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
                 }
             }
 
             @Override
             public void onFailure(Throwable t) {
-                LOGGER.error("Exception raised while checking the transaction, messageId={}, transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
+                LOGGER.error("Exception raised while checking the transaction, messageId={}, transactionId={}, "
+                    + "endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
 
             }
         }, MoreExecutors.directExecutor());
@@ -202,7 +210,8 @@ class ProducerImpl extends ClientImpl implements Producer {
         } catch (Throwable t) {
             throw new ClientException(t);
         }
-        final ListenableFuture<List<SendReceiptImpl>> future = send0(Collections.singletonList(publishingMessage), true);
+        final ListenableFuture<List<SendReceiptImpl>> future = send0(Collections.singletonList(publishingMessage),
+            true);
         final List<SendReceiptImpl> receipts = handleClientFuture(future);
         final SendReceiptImpl sendReceipt = receipts.iterator().next();
         ((TransactionImpl) transaction).tryAddReceipt(publishingMessage, sendReceipt);
@@ -235,7 +244,8 @@ class ProducerImpl extends ClientImpl implements Producer {
     @Override
     public Transaction beginTransaction() {
         if (!this.isRunning()) {
-            LOGGER.error("Unable to begin a transaction because producer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to begin a transaction because producer is not running, state={}, clientId={}",
+                this.state(), clientId);
             throw new IllegalStateException("Producer is not running now");
         }
         return new TransactionImpl(this);
@@ -270,7 +280,8 @@ class ProducerImpl extends ClientImpl implements Producer {
 
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final List<MessageCommon> messageCommons = Collections.singletonList(messageCommon);
-        MessageHookPoints messageHookPoints = TransactionResolution.COMMIT.equals(resolution) ? MessageHookPoints.COMMIT_TRANSACTION : MessageHookPoints.ROLLBACK_TRANSACTION;
+        MessageHookPoints messageHookPoints = TransactionResolution.COMMIT.equals(resolution) ?
+            MessageHookPoints.COMMIT_TRANSACTION : MessageHookPoints.ROLLBACK_TRANSACTION;
         doBefore(messageHookPoints, messageCommons);
 
         final ListenableFuture<EndTransactionResponse> future =
@@ -281,7 +292,8 @@ class ProducerImpl extends ClientImpl implements Producer {
                 final Duration duration = stopwatch.elapsed();
                 final Status status = result.getStatus();
                 final Code code = status.getCode();
-                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK :
+                    MessageHookPointsStatus.ERROR;
                 doAfter(messageHookPoints, messageCommons, duration, messageHookPointsStatus);
             }
 
@@ -324,14 +336,16 @@ class ProducerImpl extends ClientImpl implements Producer {
         if (!this.isRunning()) {
             final IllegalStateException e = new IllegalStateException("Producer is not running now");
             future.setException(e);
-            LOGGER.error("Unable to send message because producer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to send message because producer is not running, state={}, clientId={}",
+                this.state(), clientId);
             return future;
         }
 
         List<PublishingMessageImpl> pubMessages = new ArrayList<>();
         for (Message message : messages) {
             try {
-                final PublishingMessageImpl pubMessage = new PublishingMessageImpl(message, producerSettings, txEnabled);
+                final PublishingMessageImpl pubMessage = new PublishingMessageImpl(message, producerSettings,
+                    txEnabled);
                 pubMessages.add(pubMessage);
             } catch (Throwable t) {
                 // Failed to refine message, no need to proceed.
@@ -347,7 +361,8 @@ class ProducerImpl extends ClientImpl implements Producer {
             // Messages have different topics, no need to proceed.
             final IllegalArgumentException e = new IllegalArgumentException("Messages to send have different topics");
             future.setException(e);
-            LOGGER.error("Messages to be sent have different topics, no need to proceed, topic(s)={}, clientId={}", topics, clientId);
+            LOGGER.error("Messages to be sent have different topics, no need to proceed, topic(s)={}, clientId={}",
+                topics, clientId);
             return future;
         }
 
@@ -358,9 +373,11 @@ class ProducerImpl extends ClientImpl implements Producer {
             .collect(Collectors.toSet());
         if (1 < messageTypes.size()) {
             // Messages have different message type, no need to proceed.
-            final IllegalArgumentException e = new IllegalArgumentException("Messages to send have different types, please check");
+            final IllegalArgumentException e = new IllegalArgumentException("Messages to send have different types, "
+                + "please check");
             future.setException(e);
-            LOGGER.error("Messages to be sent have different message types, no need to proceed, topic={}, messageType(s)={}, clientId={}", topic, messageTypes, clientId, e);
+            LOGGER.error("Messages to be sent have different message types, no need to proceed, topic={}, messageType"
+                + "(s)={}, clientId={}", topic, messageTypes, clientId, e);
             return future;
         }
 
@@ -374,9 +391,11 @@ class ProducerImpl extends ClientImpl implements Producer {
                 .map(Optional::get).collect(Collectors.toSet());
 
             if (1 < messageGroups.size()) {
-                final IllegalArgumentException e = new IllegalArgumentException("FIFO messages to send have different message groups, messageGroups=" + messageGroups);
+                final IllegalArgumentException e = new IllegalArgumentException("FIFO messages to send have different"
+                    + " message groups, messageGroups=" + messageGroups);
                 future.setException(e);
-                LOGGER.error("FIFO messages to be sent have different message groups, no need to proceed, topic={}, messageGroups={}, clientId={}", topic, messageGroups, clientId, e);
+                LOGGER.error("FIFO messages to be sent have different message groups, no need to proceed, topic={}, "
+                    + "messageGroups={}, clientId={}", topic, messageGroups, clientId, e);
                 return future;
             }
             messageGroup = messageGroups.iterator().next();
@@ -418,28 +437,31 @@ class ProducerImpl extends ClientImpl implements Producer {
         }
         // Calculate the current message queue.
         final MessageQueueImpl messageQueue = candidates.get(IntMath.mod(attempt - 1, candidates.size()));
-//        if (!messageQueue.matchMessageType(messageType)) {
-//            final IllegalArgumentException e = new IllegalArgumentException("Current message type not match with topic accept message types");
-//            future.setException(e);
-//            return;
-//        }
+        //        if (!messageQueue.matchMessageType(messageType)) {
+        //            final IllegalArgumentException e = new IllegalArgumentException("Current message type not match
+        //            with topic accept message types");
+        //            future.setException(e);
+        //            return;
+        //        }
         final Endpoints endpoints = messageQueue.getBroker().getEndpoints();
         final SendMessageRequest request = wrapSendMessageRequest(messages);
 
         final ListenableFuture<SendMessageResponse> responseFuture = clientManager.sendMessage(endpoints, metadata,
             request, clientConfiguration.getRequestTimeout());
 
-        final ListenableFuture<List<SendReceiptImpl>> attemptFuture = Futures.transformAsync(responseFuture, response -> {
-            final SettableFuture<List<SendReceiptImpl>> future0 = SettableFuture.create();
-            future0.set(SendReceiptImpl.processSendResponse(messageQueue, response));
-            return future0;
-        }, MoreExecutors.directExecutor());
+        final ListenableFuture<List<SendReceiptImpl>> attemptFuture = Futures.transformAsync(responseFuture,
+            response -> {
+                final SettableFuture<List<SendReceiptImpl>> future0 = SettableFuture.create();
+                future0.set(SendReceiptImpl.processSendResponse(messageQueue, response));
+                return future0;
+            }, MoreExecutors.directExecutor());
 
         final int maxAttempts = this.getRetryPolicy().getMaxAttempts();
 
         // Intercept before message publishing.
         final Stopwatch stopwatch = Stopwatch.createStarted();
-        final List<MessageCommon> messageCommons = messages.stream().map(PublishingMessageImpl::getMessageCommon).collect(Collectors.toList());
+        final List<MessageCommon> messageCommons =
+            messages.stream().map(PublishingMessageImpl::getMessageCommon).collect(Collectors.toList());
         doBefore(MessageHookPoints.SEND, messageCommons);
 
         Futures.addCallback(attemptFuture, new FutureCallback<List<SendReceiptImpl>>() {
@@ -494,16 +516,19 @@ class ProducerImpl extends ClientImpl implements Producer {
                 if (MessageType.TRANSACTION.equals(messageType)) {
                     future.setException(t);
                     LOGGER.error("Failed to send transactional message finally, maxAttempts=1, attempt={}, " +
-                        "topic={}, messageId(s), endpoints={}, clientId={}", attempt, topic, messageIds, endpoints, clientId, t);
+                            "topic={}, messageId(s), endpoints={}, clientId={}", attempt, topic, messageIds, endpoints,
+                        clientId, t);
                     return;
                 }
                 // Try to do more attempts.
                 int nextAttempt = 1 + attempt;
                 final Duration delay = ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
                 LOGGER.warn("Failed to send message, would attempt to resend after {}, maxAttempts={}," +
-                        " attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay, maxAttempts, attempt,
+                        " attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay, maxAttempts,
+                    attempt,
                     topic, messageIds, endpoints, clientId, t);
-                clientManager.getScheduler().schedule(() -> send0(future, topic, messageType, candidates, messages, nextAttempt),
+                clientManager.getScheduler().schedule(() -> send0(future, topic, messageType, candidates, messages,
+                        nextAttempt),
                     delay.toNanos(), TimeUnit.NANOSECONDS);
             }
         }, clientCallbackExecutor);
@@ -511,7 +536,8 @@ class ProducerImpl extends ClientImpl implements Producer {
 
     @Override
     public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
-        final PublishingTopicRouteDataResult publishingTopicRouteDataResult = new PublishingTopicRouteDataResult(topicRouteDataResult);
+        final PublishingTopicRouteDataResult publishingTopicRouteDataResult =
+            new PublishingTopicRouteDataResult(topicRouteDataResult);
         publishingRouteDataResultCache.put(topic, publishingTopicRouteDataResult);
     }
 
@@ -524,7 +550,8 @@ class ProducerImpl extends ClientImpl implements Producer {
         }
         return Futures.transformAsync(getRouteDataResult(topic), topicRouteDataResult -> {
             SettableFuture<PublishingTopicRouteDataResult> future = SettableFuture.create();
-            final PublishingTopicRouteDataResult publishingTopicRouteDataResult = new PublishingTopicRouteDataResult(topicRouteDataResult);
+            final PublishingTopicRouteDataResult publishingTopicRouteDataResult =
+                new PublishingTopicRouteDataResult(topicRouteDataResult);
             publishingRouteDataResultCache.put(topic, publishingTopicRouteDataResult);
             future.set(publishingTopicRouteDataResult);
             return future;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index 8e6e90e..9445b30 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -21,8 +21,6 @@ import apache.rocketmq.v2.Publishing;
 import apache.rocketmq.v2.Settings;
 import com.google.common.base.MoreObjects;
 import com.google.protobuf.util.Durations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -32,6 +30,8 @@ import org.apache.rocketmq.client.java.impl.ClientType;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ProducerSettings extends ClientSettings {
     private static final Logger LOGGER = LoggerFactory.getLogger(ProducerSettings.class);
@@ -68,8 +68,11 @@ public class ProducerSettings extends ClientSettings {
 
     @Override
     public Settings toProtobuf() {
-        final Publishing publishing = Publishing.newBuilder().addAllTopics(topics.stream().map(Resource::toProtobuf).collect(Collectors.toList())).build();
-        final Settings.Builder builder = Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
+        final Publishing publishing =
+            Publishing.newBuilder().addAllTopics(topics.stream().map(Resource::toProtobuf)
+                .collect(Collectors.toList())).build();
+        final Settings.Builder builder =
+            Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
             .setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
         return builder.setBackoffPolicy(retryPolicy.toProtobuf()).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
     }
@@ -78,7 +81,8 @@ public class ProducerSettings extends ClientSettings {
     public void applySettingsCommand(Settings settings) {
         final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
         if (!Settings.PubSubCase.PUBLISHING.equals(pubSubCase)) {
-            LOGGER.error("[Bug] Issued settings not match with the client type, clientId={}, pubSubCase={}, clientType={}", clientId, pubSubCase, clientType);
+            LOGGER.error("[Bug] Issued settings not match with the client type, clientId={}, pubSubCase={}, "
+                + "clientType={}", clientId, pubSubCase, clientType);
             return;
         }
         final Publishing publishing = settings.getPublishing();
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
similarity index 94%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index 7f1ac10..e718228 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -84,7 +84,9 @@ public class SendReceiptImpl implements SendReceipt {
                 throwableList.add(e);
                 continue;
             }
-            final SendReceiptImpl impl = new SendReceiptImpl(MessageIdCodec.getInstance().decode(entry.getMessageId()), entry.getTransactionId(), mq, entry.getOffset());
+            final SendReceiptImpl impl =
+                new SendReceiptImpl(MessageIdCodec.getInstance().decode(entry.getMessageId()),
+                    entry.getTransactionId(), mq, entry.getOffset());
             sendReceipts.add(impl);
         }
         if (!throwableList.isEmpty()) {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
index e3f8dd0..aef1098 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
@@ -18,8 +18,6 @@
 package org.apache.rocketmq.client.java.impl.producer;
 
 import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map;
@@ -33,18 +31,17 @@ import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.Transaction;
 import org.apache.rocketmq.client.apis.producer.TransactionResolution;
 import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class TransactionImpl implements Transaction {
     private static final Logger LOGGER = LoggerFactory.getLogger(ProducerImpl.class);
-
-    private final ProducerImpl producerImpl;
-
     private static final int MAX_MESSAGE_NUM = 1;
 
+    private final ProducerImpl producerImpl;
     @GuardedBy("messagesLock")
     private final Set<PublishingMessageImpl> messages;
     private final ReadWriteLock messagesLock;
-
     private final ConcurrentMap<PublishingMessageImpl, SendReceiptImpl> messageSendReceiptMap;
 
     public TransactionImpl(ProducerImpl producerImpl) {
@@ -58,7 +55,8 @@ class TransactionImpl implements Transaction {
         messagesLock.readLock().lock();
         try {
             if (messages.size() > MAX_MESSAGE_NUM) {
-                throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " + MAX_MESSAGE_NUM);
+                throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " +
+                    MAX_MESSAGE_NUM);
             }
         } finally {
             messagesLock.readLock().unlock();
@@ -66,9 +64,11 @@ class TransactionImpl implements Transaction {
         messagesLock.writeLock().lock();
         try {
             if (messages.size() >= MAX_MESSAGE_NUM) {
-                throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " + MAX_MESSAGE_NUM);
+                throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " +
+                    MAX_MESSAGE_NUM);
             }
-            final PublishingMessageImpl publishingMessage = new PublishingMessageImpl(message, producerImpl.producerSettings, true);
+            final PublishingMessageImpl publishingMessage = new PublishingMessageImpl(message,
+                producerImpl.producerSettings, true);
             messages.add(publishingMessage);
             return publishingMessage;
         } finally {
@@ -97,7 +97,8 @@ class TransactionImpl implements Transaction {
         for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) {
             final PublishingMessageImpl publishingMessage = entry.getKey();
             final SendReceiptImpl sendReceipt = entry.getValue();
-            producerImpl.endTransaction(sendReceipt.getEndpoints(), publishingMessage.getMessageCommon(), sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.COMMIT);
+            producerImpl.endTransaction(sendReceipt.getEndpoints(), publishingMessage.getMessageCommon(),
+                sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.COMMIT);
         }
     }
 
@@ -109,7 +110,8 @@ class TransactionImpl implements Transaction {
         for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) {
             final PublishingMessageImpl publishingMessage = entry.getKey();
             final SendReceiptImpl sendReceipt = entry.getValue();
-            producerImpl.endTransaction(sendReceipt.getEndpoints(), publishingMessage.getMessageCommon(), sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.ROLLBACK);
+            producerImpl.endTransaction(sendReceipt.getEndpoints(), publishingMessage.getMessageCommon(),
+                sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.ROLLBACK);
         }
     }
 }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java b/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
similarity index 91%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
index 5904626..e131471 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
@@ -21,7 +21,8 @@ import ch.qos.logback.core.ConsoleAppender;
 
 public class CustomConsoleAppender<E> extends ConsoleAppender<E> {
     public static final String ENABLE_CONSOLE_APPENDER_KEY = "mq.consoleAppender.enabled";
-    private final boolean enabled = Boolean.parseBoolean(System.getenv(ENABLE_CONSOLE_APPENDER_KEY)) || Boolean.parseBoolean(System.getProperty(ENABLE_CONSOLE_APPENDER_KEY));
+    private final boolean enabled = Boolean.parseBoolean(System.getenv(ENABLE_CONSOLE_APPENDER_KEY)) ||
+        Boolean.parseBoolean(System.getProperty(ENABLE_CONSOLE_APPENDER_KEY));
 
     public CustomConsoleAppender() {
     }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/ProcessIdConverter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/logging/ProcessIdConverter.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/ProcessIdConverter.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/logging/ProcessIdConverter.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
index 8343e18..112fbb7 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.client.java.message;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -28,9 +31,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.message.MessageBuilder;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 public class MessageBuilderImpl implements MessageBuilder {
     public static final Pattern TOPIC_PATTERN = Pattern.compile("^[%|a-zA-Z0-9._-]{1,127}$");
     private static final int MESSAGE_BODY_LENGTH_THRESHOLD = 1024 * 1024 * 4;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
similarity index 95%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
index 33d075f..737e878 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
@@ -58,14 +58,16 @@ public class MessageCommon {
 
     public MessageCommon(String topic, byte[] body, String tag, String messageGroup, Long deliveryTimestamp,
         String parentTraceContext, Collection<String> keys, Map<String, String> properties) {
-        this(null, topic, body, properties, tag, keys, messageGroup, deliveryTimestamp, null, parentTraceContext, null, null, null, null, null);
+        this(null, topic, body, properties, tag, keys, messageGroup, deliveryTimestamp, null, parentTraceContext,
+            null, null, null, null, null);
     }
 
     public MessageCommon(MessageId messageId, String topic, byte[] body, String tag, String messageGroup,
         Long deliveryTimestamp, Collection<String> keys, Map<String, String> properties, String bornHost,
         String traceContext, long bornTimestamp, int deliveryAttempt, Stopwatch decodeStopwatch,
         Timestamp deliveryTimestampFromRemote) {
-        this(messageId, topic, body, properties, tag, keys, messageGroup, deliveryTimestamp, bornHost, null, traceContext, bornTimestamp, deliveryAttempt, decodeStopwatch, deliveryTimestampFromRemote);
+        this(messageId, topic, body, properties, tag, keys, messageGroup, deliveryTimestamp, bornHost, null,
+            traceContext, bornTimestamp, deliveryAttempt, decodeStopwatch, deliveryTimestampFromRemote);
     }
 
     private MessageCommon(@Nullable MessageId messageId, String topic, byte[] body,
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
similarity index 99%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
index 9369e74..59d1578 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
@@ -53,13 +53,12 @@ import org.apache.rocketmq.client.java.misc.Utilities;
  * </pre>
  */
 public class MessageIdCodec {
-    private static final MessageIdCodec INSTANCE = new MessageIdCodec();
-
     public static final int MESSAGE_ID_LENGTH_FOR_V1_OR_LATER = 34;
-
     public static final String MESSAGE_ID_VERSION_V0 = "00";
     public static final String MESSAGE_ID_VERSION_V1 = "01";
 
+    private static final MessageIdCodec INSTANCE = new MessageIdCodec();
+
     private final String processFixedStringV1;
     private final long secondsSinceCustomEpoch;
     private final long secondsStartTimestamp;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdImpl.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
similarity index 98%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
index 1b03ca6..2293c23 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
@@ -33,8 +33,10 @@ import org.apache.rocketmq.client.apis.message.Message;
  * @see Message
  */
 public class MessageImpl implements Message {
-    private final String topic;
+    protected final Collection<String> keys;
+
     final byte[] body;
+    private final String topic;
 
     @Nullable
     private final String tag;
@@ -45,7 +47,6 @@ public class MessageImpl implements Message {
     @Nullable
     private final String parentTraceContext;
 
-    protected final Collection<String> keys;
     private final Map<String, String> properties;
 
     /**
@@ -65,10 +66,6 @@ public class MessageImpl implements Message {
         this.properties = properties;
     }
 
-    public MessageCommon getMessageCommon() {
-        return new MessageCommon(topic, body, tag, messageGroup, deliveryTimestamp, parentTraceContext, keys, properties);
-    }
-
     MessageImpl(Message message) {
         this.topic = message.getTopic();
         if (message instanceof MessageImpl) {
@@ -89,6 +86,11 @@ public class MessageImpl implements Message {
         this.properties = message.getProperties();
     }
 
+    public MessageCommon getMessageCommon() {
+        return new MessageCommon(topic, body, tag, messageGroup, deliveryTimestamp, parentTraceContext, keys,
+            properties);
+    }
+
     /**
      * @see Message#getTopic()
      */
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
similarity index 93%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
index 9e0d2b7..5b3525d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.client.java.message;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import apache.rocketmq.v2.Digest;
 import apache.rocketmq.v2.DigestType;
 import apache.rocketmq.v2.Encoding;
@@ -27,8 +29,6 @@ import com.google.common.base.Stopwatch;
 import com.google.protobuf.ProtocolStringList;
 import com.google.protobuf.Timestamp;
 import com.google.protobuf.util.Timestamps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.NoSuchAlgorithmException;
@@ -44,8 +44,8 @@ import org.apache.rocketmq.client.java.misc.LinkedIterator;
 import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageView {
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageViewImpl.class);
@@ -73,8 +73,9 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
 
     public MessageViewImpl(MessageId messageId, String topic, byte[] body, String tag, String messageGroup,
         Long deliveryTimestamp, Collection<String> keys, Map<String, String> properties,
-        String bornHost, long bornTimestamp, int deliveryAttempt, MessageQueueImpl messageQueue, String receiptHandle,
-        String traceContext, long offset, boolean corrupted, Timestamp deliveryTimestampFromRemote) {
+        String bornHost, long bornTimestamp, int deliveryAttempt, MessageQueueImpl messageQueue,
+        String receiptHandle, String traceContext, long offset, boolean corrupted,
+        Timestamp deliveryTimestampFromRemote) {
         this.messageId = checkNotNull(messageId, "messageId should not be null");
         this.topic = checkNotNull(topic, "topic should not be null");
         this.body = checkNotNull(body, "body should not be null");
@@ -98,7 +99,8 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
     }
 
     public MessageCommon getMessageCommon() {
-        return new MessageCommon(messageId, topic, body, tag, messageGroup, deliveryTimestamp, keys, properties, bornHost, traceContext, bornTimestamp, deliveryAttempt, decodeStopwatch, null);
+        return new MessageCommon(messageId, topic, body, tag, messageGroup, deliveryTimestamp, keys, properties,
+            bornHost, traceContext, bornTimestamp, deliveryAttempt, decodeStopwatch, null);
     }
 
     /**
@@ -264,7 +266,8 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
                     }
                 } catch (NoSuchAlgorithmException e) {
                     corrupted = true;
-                    LOGGER.error("MD5 is not supported unexpectedly, skip it, topic={}, messageId={}", topic, messageId);
+                    LOGGER.error("MD5 is not supported unexpectedly, skip it, topic={}, messageId={}", topic,
+                        messageId);
                 }
                 break;
             case SHA1:
@@ -275,11 +278,13 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
                     }
                 } catch (NoSuchAlgorithmException e) {
                     corrupted = true;
-                    LOGGER.error("SHA-1 is not supported unexpectedly, skip it, topic={}, messageId={}", topic, messageId);
+                    LOGGER.error("SHA-1 is not supported unexpectedly, skip it, topic={}, messageId={}", topic,
+                        messageId);
                 }
                 break;
             default:
-                LOGGER.error("Unsupported message body digest algorithm, digestType={}, topic={}, messageId={}", digestType, topic, messageId);
+                LOGGER.error("Unsupported message body digest algorithm, digestType={}, topic={}, messageId={}",
+                    digestType, topic, messageId);
         }
         final Encoding bodyEncoding = systemProperties.getBodyEncoding();
         switch (bodyEncoding) {
@@ -294,7 +299,8 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
             case IDENTITY:
                 break;
             default:
-                LOGGER.error("Unsupported message encoding algorithm, topic={}, messageId={}, bodyEncoding={}", topic, messageId, bodyEncoding);
+                LOGGER.error("Unsupported message encoding algorithm, topic={}, messageId={}, bodyEncoding={}", topic,
+                    messageId, bodyEncoding);
         }
 
         String tag = systemProperties.hasTag() ? systemProperties.getTag() : null;
@@ -309,7 +315,8 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
         final Map<String, String> properties = message.getUserPropertiesMap();
         final String receiptHandle = systemProperties.getReceiptHandle();
         String traceContext = systemProperties.hasTraceContext() ? systemProperties.getTraceContext() : null;
-        return new MessageViewImpl(messageId, topic, body, tag, messageGroup, deliveryTimestamp, keys, properties, bornHost, bornTimestamp, deliveryAttempt, mq, receiptHandle, traceContext, offset, corrupted, timestamp);
+        return new MessageViewImpl(messageId, topic, body, tag, messageGroup, deliveryTimestamp, keys, properties,
+            bornHost, bornTimestamp, deliveryAttempt, mq, receiptHandle, traceContext, offset, corrupted, timestamp);
     }
 
     @Override
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 97ea1ea..7c6ea9c 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -45,7 +45,8 @@ public class PublishingMessageImpl extends MessageImpl {
     private final MessageType messageType;
     private volatile String traceContext;
 
-    public PublishingMessageImpl(Message message, ProducerSettings producerSettings, boolean txEnabled) throws IOException {
+    public PublishingMessageImpl(Message message, ProducerSettings producerSettings, boolean txEnabled)
+        throws IOException {
         super(message);
         this.traceContext = null;
         final int length = message.getBody().remaining();
@@ -66,7 +67,8 @@ public class PublishingMessageImpl extends MessageImpl {
                 body = new byte[length];
                 message.getBody().get(body);
             }
-            final byte[] compressed = Utilities.compressBytesGzip(body, producerSettings.getMessageGzipCompressionLevel());
+            final byte[] compressed = Utilities.compressBytesGzip(body,
+                producerSettings.getMessageGzipCompressionLevel());
             this.compressedBody = ByteBuffer.wrap(compressed).asReadOnlyBuffer();
             this.encoding = Encoding.GZIP;
         } else {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/protocol/Encoding.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Encoding.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/protocol/Encoding.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Encoding.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
similarity index 88%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
index 3a9b4bd..edec539 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.client.java.metrics;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.ManagedChannel;
 import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
@@ -51,6 +49,8 @@ import org.apache.rocketmq.client.java.impl.ClientImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.rpc.AuthInterceptor;
 import org.apache.rocketmq.client.java.rpc.IpNameResolverFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MessageMeter {
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageMeter.class);
@@ -83,51 +83,63 @@ public class MessageMeter {
     }
 
     LongCounter getSendSuccessTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.SEND_SUCCESS_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.SEND_SUCCESS_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     LongCounter getSendFailureTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.SEND_FAILURE_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.SEND_FAILURE_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     DoubleHistogram getSendSuccessCostTimeHistogram() {
-        return histogramMap.computeIfAbsent(MetricName.SEND_SUCCESS_COST_TIME, name -> meter.histogramBuilder(name.getName()).build());
+        return histogramMap.computeIfAbsent(MetricName.SEND_SUCCESS_COST_TIME,
+            name -> meter.histogramBuilder(name.getName()).build());
     }
 
     LongCounter getProcessSuccessTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.PROCESS_SUCCESS_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.PROCESS_SUCCESS_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     LongCounter getProcessFailureTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.PROCESS_FAILURE_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.PROCESS_FAILURE_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     DoubleHistogram getProcessCostTimeHistogram() {
-        return histogramMap.computeIfAbsent(MetricName.PROCESS_TIME, name -> meter.histogramBuilder(name.getName()).build());
+        return histogramMap.computeIfAbsent(MetricName.PROCESS_TIME,
+            name -> meter.histogramBuilder(name.getName()).build());
     }
 
     LongCounter getAckSuccessTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.ACK_SUCCESS_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.ACK_SUCCESS_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     LongCounter getAckFailureTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.ACK_FAILURE_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.ACK_FAILURE_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     LongCounter getChangeInvisibleDurationSuccessCounter() {
-        return counterMap.computeIfAbsent(MetricName.CHANGE_INVISIBLE_DURATION_SUCCESS_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.CHANGE_INVISIBLE_DURATION_SUCCESS_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     LongCounter getChangeInvisibleDurationFailureCounter() {
-        return counterMap.computeIfAbsent(MetricName.CHANGE_INVISIBLE_DURATION_FAILURE_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.CHANGE_INVISIBLE_DURATION_FAILURE_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     DoubleHistogram getMessageDeliveryLatencyHistogram() {
-        return histogramMap.computeIfAbsent(MetricName.DELIVERY_LATENCY, name -> meter.histogramBuilder(name.getName()).build());
+        return histogramMap.computeIfAbsent(MetricName.DELIVERY_LATENCY,
+            name -> meter.histogramBuilder(name.getName()).build());
     }
 
     DoubleHistogram getMessageAwaitTimeHistogram() {
-        return histogramMap.computeIfAbsent(MetricName.AWAIT_TIME, name -> meter.histogramBuilder(name.getName()).build());
+        return histogramMap.computeIfAbsent(MetricName.AWAIT_TIME,
+            name -> meter.histogramBuilder(name.getName()).build());
     }
 
     @SuppressWarnings("deprecation")
@@ -138,12 +150,14 @@ public class MessageMeter {
             }
             final Optional<Endpoints> optionalEndpoints = metric.tryGetMetricEndpoints();
             if (!optionalEndpoints.isPresent()) {
-                LOGGER.error("[Bug] Metric switch is on but endpoints is not filled, clientId={}", client.getClientId());
+                LOGGER.error("[Bug] Metric switch is on but endpoints is not filled, clientId={}",
+                    client.getClientId());
                 return;
             }
             final Endpoints newMetricEndpoints = optionalEndpoints.get();
             if (newMetricEndpoints.equals(metricEndpoints)) {
-                LOGGER.debug("Message metric exporter endpoints remains the same, clientId={}, endpoints={}", client.getClientId(), newMetricEndpoints);
+                LOGGER.debug("Message metric exporter endpoints remains the same, clientId={}, endpoints={}",
+                    client.getClientId(), newMetricEndpoints);
                 return;
             }
             final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
@@ -172,13 +186,15 @@ public class MessageMeter {
                 .setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(0.1d, 1d, 10d, 30d, 60d)))
                 .build();
 
-            PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter).setInterval(Duration.ofSeconds(1)).build();
+            PeriodicMetricReader reader =
+                PeriodicMetricReader.builder(exporter).setInterval(Duration.ofSeconds(1)).build();
             provider = SdkMeterProvider.builder().registerMetricReader(reader)
                 .registerView(instrumentSelector, view)
                 .build();
             final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build();
             meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
-            LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", client.getClientId(), metricEndpoints, newMetricEndpoints);
+            LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", client.getClientId(),
+                metricEndpoints, newMetricEndpoints);
             this.reset();
             metricEndpoints = newMetricEndpoints;
         } catch (Throwable t) {
@@ -205,7 +221,8 @@ public class MessageMeter {
             try {
                 latch.await();
             } catch (Throwable t) {
-                LOGGER.error("Exception raised while waiting for the shutdown of meter, clientId={}", client.getClientId());
+                LOGGER.error("Exception raised while waiting for the shutdown of meter, clientId={}",
+                    client.getClientId());
             }
         }
         LOGGER.info("Shutdown the message meter, clientId={}", client.getClientId());
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
similarity index 98%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
index 97a6657..440f89d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
@@ -19,8 +19,6 @@ package org.apache.rocketmq.client.java.metrics;
 
 import com.google.protobuf.Timestamp;
 import com.google.protobuf.util.Timestamps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.metrics.DoubleHistogram;
 import io.opentelemetry.api.metrics.LongCounter;
@@ -34,6 +32,8 @@ import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.impl.ClientImpl;
 import org.apache.rocketmq.client.java.message.MessageCommon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MetricMessageInterceptor implements MessageInterceptor {
     private static final Logger LOGGER = LoggerFactory.getLogger(MetricMessageInterceptor.class);
@@ -159,8 +159,10 @@ public class MetricMessageInterceptor implements MessageInterceptor {
     }
 
     private void doAfterChangInvisibleDuration(List<MessageCommon> messageCommons, MessageHookPointsStatus status) {
-        final LongCounter changeInvisibleDurationSuccessCounter = messageMeter.getChangeInvisibleDurationSuccessCounter();
-        final LongCounter changeInvisibleDurationFailureCounter = messageMeter.getChangeInvisibleDurationFailureCounter();
+        final LongCounter changeInvisibleDurationSuccessCounter =
+            messageMeter.getChangeInvisibleDurationSuccessCounter();
+        final LongCounter changeInvisibleDurationFailureCounter =
+            messageMeter.getChangeInvisibleDurationFailureCounter();
         final Optional<String> optionalConsumerGroup = messageMeter.tryGetConsumerGroup();
         if (!optionalConsumerGroup.isPresent()) {
             LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", messageMeter.getClient());
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
similarity index 96%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
index 5cc8465..d7a2d28 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
@@ -17,14 +17,15 @@
 
 package org.apache.rocketmq.client.java.metrics;
 
-import io.opentelemetry.api.common.AttributeKey;
-
 import static io.opentelemetry.api.common.AttributeKey.stringKey;
 
+import io.opentelemetry.api.common.AttributeKey;
+
 public class RocketmqAttributes {
     public static final AttributeKey<String> TOPIC = stringKey("topic");
-
     public static final AttributeKey<String> CLIENT_ID = stringKey("client_id");
-
     public static final AttributeKey<String> CONSUMER_GROUP = stringKey("consumer_group");
+
+    private RocketmqAttributes() {
+    }
 }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
index ff0e8af..c4e1c71 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
@@ -18,12 +18,12 @@
 package org.apache.rocketmq.client.java.misc;
 
 import com.google.common.util.concurrent.AbstractIdleService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A driver to notify downstream to dispatch task.
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/ExecutorServices.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/ExecutorServices.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/ExecutorServices.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/ExecutorServices.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/MetadataUtils.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/MetadataUtils.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/MetadataUtils.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/MetadataUtils.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/RequestIdGenerator.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/RequestIdGenerator.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/RequestIdGenerator.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/RequestIdGenerator.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/ThreadFactoryImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/ThreadFactoryImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/ThreadFactoryImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/ThreadFactoryImpl.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
similarity index 98%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
index 639365f..2705cd7 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
@@ -60,12 +60,14 @@ public class Utilities {
     /**
      * Used to build output as Hex
      */
-    private static final char[] DIGITS_LOWER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+    private static final char[] DIGITS_LOWER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd',
+        'e', 'f'};
 
     /**
      * Used to build output as Hex
      */
-    private static final char[] DIGITS_UPPER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
+    private static final char[] DIGITS_UPPER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D',
+        'E', 'F'};
 
     private static final String CLIENT_ID_SEPARATOR = "@";
 
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
similarity index 95%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
index 5c177bf..2157613 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.client.java.retry;
 
+import static apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
+import static com.google.common.base.Preconditions.checkArgument;
+
 import apache.rocketmq.v2.CustomizedBackoff;
 import com.google.common.base.MoreObjects;
 import com.google.protobuf.util.Durations;
@@ -24,9 +27,6 @@ import java.time.Duration;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
-import static com.google.common.base.Preconditions.checkArgument;
-
 public class CustomizedBackoffRetryPolicy implements RetryPolicy {
     private final List<Duration> durations;
     private final int maxAttempts;
@@ -70,7 +70,9 @@ public class CustomizedBackoffRetryPolicy implements RetryPolicy {
     @Override
     public apache.rocketmq.v2.RetryPolicy toProtobuf() {
         CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder()
-            .addAllNext(durations.stream().map(duration -> Durations.fromNanos(duration.toNanos())).collect(Collectors.toList()))
+            .addAllNext(durations.stream()
+                .map(duration -> Durations.fromNanos(duration.toNanos()))
+                .collect(Collectors.toList()))
             .build();
         return apache.rocketmq.v2.RetryPolicy.newBuilder()
             .setMaxAttempts(maxAttempts)
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
similarity index 96%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
index b216f06..79f033a 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
@@ -17,17 +17,18 @@
 
 package org.apache.rocketmq.client.java.retry;
 
+import static apache.rocketmq.v2.RetryPolicy.StrategyCase.EXPONENTIAL_BACKOFF;
+import static com.google.common.base.Preconditions.checkArgument;
+
 import apache.rocketmq.v2.ExponentialBackoff;
 import com.google.common.base.MoreObjects;
 import com.google.protobuf.util.Durations;
 import java.time.Duration;
 import java.util.Random;
 
-import static apache.rocketmq.v2.RetryPolicy.StrategyCase.EXPONENTIAL_BACKOFF;
-import static com.google.common.base.Preconditions.checkArgument;
-
 /**
- * The {@link ExponentialBackoffRetryPolicy} defines a policy to do more attempts when failure is encountered, mainly refer to
+ * The {@link ExponentialBackoffRetryPolicy} defines a policy to do more attempts when failure is encountered, mainly
+ * refer to
  * <a href="https://github.com/grpc/proposal/blob/master/A6-client-retries.md">gRPC Retry Design</a>.
  */
 public class ExponentialBackoffRetryPolicy implements RetryPolicy {
@@ -62,7 +63,8 @@ public class ExponentialBackoffRetryPolicy implements RetryPolicy {
     @Override
     public Duration getNextAttemptDelay(int attempt) {
         checkArgument(attempt > 0, "attempt must be positive");
-        int randomNumberBound = (int) Math.min(initialBackoff.toNanos() * Math.pow(backoffMultiplier, 1.0 * (attempt - 1)),
+        int randomNumberBound = (int) Math.min(initialBackoff.toNanos() * Math.pow(backoffMultiplier,
+                1.0 * (attempt - 1)),
             maxBackoff.toNanos());
         if (randomNumberBound <= 0) {
             return Duration.ZERO;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Address.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Address.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Address.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/Address.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/AddressScheme.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/AddressScheme.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/AddressScheme.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/AddressScheme.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Broker.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
similarity index 96%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
index 590149c..ab77a3d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
@@ -34,7 +34,8 @@ public class Broker {
     }
 
     public apache.rocketmq.v2.Broker toProtobuf() {
-        return apache.rocketmq.v2.Broker.newBuilder().setName(name).setId(id).setEndpoints(endpoints.toProtobuf()).build();
+        return apache.rocketmq.v2.Broker.newBuilder().setName(name).setId(id).setEndpoints(endpoints.toProtobuf())
+            .build();
     }
 
     public String getName() {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
similarity index 98%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
index 8e50e76..4a4f562 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.client.java.route;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.common.base.Objects;
 import com.google.common.net.InternetDomainName;
 import java.net.InetSocketAddress;
@@ -24,12 +26,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Pattern;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 public class Endpoints {
     public static final int DEFAULT_PORT = 80;
 
-    private static final Pattern IPV4_HOST_PATTERN = Pattern.compile("^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])*$");
+    private static final Pattern IPV4_HOST_PATTERN = Pattern.compile("^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0"
+        + "-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])*$");
     private static final String ENDPOINT_SEPARATOR = ";";
     private static final String ADDRESS_SEPARATOR = ",";
     private static final String COLON = ":";
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Permission.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Permission.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Permission.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/Permission.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
index dbed25c..a0725a4 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
@@ -65,7 +65,8 @@ public class TopicRouteData {
     public Endpoints pickEndpointsToQueryAssignments() throws ResourceNotFoundException {
         int nextIndex = index.getAndIncrement();
         for (int i = 0; i < messageQueueImpls.size(); i++) {
-            final MessageQueueImpl messageQueueImpl = messageQueueImpls.get(IntMath.mod(nextIndex++, messageQueueImpls.size()));
+            final MessageQueueImpl messageQueueImpl = messageQueueImpls.get(IntMath.mod(nextIndex++,
+                messageQueueImpls.size()));
             final Broker broker = messageQueueImpl.getBroker();
             if (Utilities.MASTER_BROKER_ID != broker.getId()) {
                 continue;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
index dd0f553..1efb034 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
@@ -17,13 +17,13 @@
 
 package org.apache.rocketmq.client.java.route;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import apache.rocketmq.v2.Status;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import javax.annotation.concurrent.Immutable;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Result topic route data fetched from remote.
  */
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
index 11ca662..66820e6 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.client.java.rpc;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.CallOptions;
 import io.grpc.Channel;
 import io.grpc.ClientCall;
@@ -27,6 +25,8 @@ import io.grpc.ForwardingClientCall;
 import io.grpc.Metadata;
 import io.grpc.MethodDescriptor;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AuthInterceptor implements ClientInterceptor {
     private static final Logger LOGGER = LoggerFactory.getLogger(AuthInterceptor.class);
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/IpNameResolverFactory.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/IpNameResolverFactory.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/IpNameResolverFactory.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/IpNameResolverFactory.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
similarity index 86%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
index b6f7ba9..bff53d7 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.client.java.rpc;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.CallOptions;
 import io.grpc.Channel;
 import io.grpc.ClientCall;
@@ -28,6 +26,8 @@ import io.grpc.ForwardingClientCallListener;
 import io.grpc.Metadata;
 import io.grpc.MethodDescriptor;
 import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The client log interceptor based on grpc can track any remote procedure call that interacts with the client locally.
@@ -53,18 +53,21 @@ public class LoggingInterceptor implements ClientInterceptor {
         return new ForwardingClientCall.SimpleForwardingClientCall<T, E>(next.newCall(method, callOptions)) {
             @Override
             public void start(Listener<E> responseListener, final Metadata headers) {
-                LOGGER.trace("gRPC request header, rpcId={}, serviceName={}, methodName={}, authority={}, headers={}", rpcId, serviceName, methodName, authority, headers);
+                LOGGER.trace("gRPC request header, rpcId={}, serviceName={}, methodName={}, authority={}, headers={}",
+                    rpcId, serviceName, methodName, authority, headers);
                 Listener<E> observabilityListener =
                     new ForwardingClientCallListener.SimpleForwardingClientCallListener<E>(responseListener) {
                         @Override
                         public void onMessage(E response) {
-                            LOGGER.trace("gRPC response, rpcId={}, serviceName={}, methodName={}, content:\n{}", rpcId, serviceName, methodName, response);
+                            LOGGER.trace("gRPC response, rpcId={}, serviceName={}, methodName={}, content:\n{}",
+                                rpcId, serviceName, methodName, response);
                             super.onMessage(response);
                         }
 
                         @Override
                         public void onHeaders(Metadata headers) {
-                            LOGGER.trace("gRPC response header, rpcId={}, serviceName={}, methodName={}, authority={}, headers={}", rpcId, serviceName, methodName, authority, headers);
+                            LOGGER.trace("gRPC response header, rpcId={}, serviceName={}, methodName={}, "
+                                + "authority={}, headers={}", rpcId, serviceName, methodName, authority, headers);
                             super.onHeaders(headers);
                         }
                     };
@@ -73,7 +76,8 @@ public class LoggingInterceptor implements ClientInterceptor {
 
             @Override
             public void sendMessage(T request) {
-                LOGGER.trace("gRPC request, rpcId={}, serviceName={}, methodName={}, content:\n{}", rpcId, serviceName, methodName, request);
+                LOGGER.trace("gRPC request, rpcId={}, serviceName={}, methodName={}, content:\n{}", rpcId,
+                    serviceName, methodName, request);
                 super.sendMessage(request);
             }
         };
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
similarity index 99%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index d31a64e..15b94aa 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -81,7 +81,7 @@ public class RpcClientImpl implements RpcClient {
 
         final NettyChannelBuilder channelBuilder =
             NettyChannelBuilder.forTarget(endpoints.getGrpcTarget())
-//                .withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
+                // .withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                 .keepAliveTime(KEEP_ALIVE_DURATION.toNanos(), TimeUnit.NANOSECONDS)
                 .maxInboundMessageSize(GRPC_MAX_MESSAGE_SIZE)
                 .intercept(LoggingInterceptor.getInstance())
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
diff --git a/java/client-java/src/main/resources-filtered/rocketmq.metadata.properties b/java/client/src/main/resources-filtered/rocketmq.metadata.properties
similarity index 100%
rename from java/client-java/src/main/resources-filtered/rocketmq.metadata.properties
rename to java/client/src/main/resources-filtered/rocketmq.metadata.properties
diff --git a/java/client-java/src/main/resources/META-INF/services/org.apache.rocketmq.client.apis.ClientServiceProvider b/java/client/src/main/resources/META-INF/services/org.apache.rocketmq.client.apis.ClientServiceProvider
similarity index 100%
rename from java/client-java/src/main/resources/META-INF/services/org.apache.rocketmq.client.apis.ClientServiceProvider
rename to java/client/src/main/resources/META-INF/services/org.apache.rocketmq.client.apis.ClientServiceProvider
diff --git a/java/client-java/src/main/resources/logback.xml b/java/client/src/main/resources/logback.xml
similarity index 100%
rename from java/client-java/src/main/resources/logback.xml
rename to java/client/src/main/resources/logback.xml
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
similarity index 86%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
index 23992be..ca198d9 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.client.java;
 
+import static org.junit.Assert.assertEquals;
+
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
 import org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl;
 import org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerBuilderImpl;
@@ -24,8 +26,6 @@ import org.apache.rocketmq.client.java.impl.producer.ProducerBuilderImpl;
 import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
 public class ClientServiceProviderImplTest {
 
     @Test
@@ -35,12 +35,14 @@ public class ClientServiceProviderImplTest {
 
     @Test
     public void testNewPushConsumerBuilder() {
-        assertEquals(PushConsumerBuilderImpl.class, ClientServiceProvider.loadService().newPushConsumerBuilder().getClass());
+        assertEquals(PushConsumerBuilderImpl.class,
+            ClientServiceProvider.loadService().newPushConsumerBuilder().getClass());
     }
 
     @Test
     public void testNewSimpleConsumerBuilder() {
-        assertEquals(SimpleConsumerBuilderImpl.class, ClientServiceProvider.loadService().newSimpleConsumerBuilder().getClass());
+        assertEquals(SimpleConsumerBuilderImpl.class,
+            ClientServiceProvider.loadService().newSimpleConsumerBuilder().getClass());
     }
 
     @Test
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
similarity index 100%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
similarity index 90%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 7312c52..7c9baeb 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -17,6 +17,15 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import apache.rocketmq.v2.AckMessageResponse;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
@@ -44,15 +53,6 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 @RunWith(MockitoJUnitRunner.class)
 public class ProcessQueueImplTest extends TestBase {
     @Mock
@@ -116,9 +116,11 @@ public class ProcessQueueImplTest extends TestBase {
         when(retryPolicy.getNextAttemptDelay(anyInt())).thenReturn(Duration.ofSeconds(1));
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(pushConsumerSettings.isFifo()).thenReturn(false);
-        when(pushConsumer.changInvisibleDuration(any(MessageViewImpl.class), any(Duration.class))).thenReturn(okChangeInvisibleDurationFuture());
+        when(pushConsumer.changInvisibleDuration(any(MessageViewImpl.class), any(Duration.class)))
+            .thenReturn(okChangeInvisibleDurationFuture());
         processQueue.cacheMessages(messageViewList);
-        verify(pushConsumer, times(1)).changInvisibleDuration(any(MessageViewImpl.class), any(Duration.class));
+        verify(pushConsumer, times(1))
+            .changInvisibleDuration(any(MessageViewImpl.class), any(Duration.class));
     }
 
     @Test
@@ -127,9 +129,11 @@ public class ProcessQueueImplTest extends TestBase {
         List<MessageViewImpl> messageViewList = new ArrayList<>();
         messageViewList.add(corruptedMessageView0);
         when(pushConsumerSettings.isFifo()).thenReturn(true);
-        when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class))).thenReturn(okForwardMessageToDeadLetterQueueResponseFuture());
+        when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class)))
+            .thenReturn(okForwardMessageToDeadLetterQueueResponseFuture());
         processQueue.cacheMessages(messageViewList);
-        verify(pushConsumer, times(1)).forwardMessageToDeadLetterQueue(any(MessageViewImpl.class));
+        verify(pushConsumer, times(1))
+            .forwardMessageToDeadLetterQueue(any(MessageViewImpl.class));
         final Iterator<MessageViewImpl> iterator = processQueue.tryTakeFifoMessages();
         assertFalse(iterator.hasNext());
     }
@@ -147,10 +151,12 @@ public class ProcessQueueImplTest extends TestBase {
         ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(fakeEndpoints(), status, messageViewList);
         SettableFuture<ReceiveMessageResult> future0 = SettableFuture.create();
         future0.set(receiveMessageResult);
-        when(pushConsumer.receiveMessage(any(ReceiveMessageRequest.class), any(MessageQueueImpl.class), any(Duration.class))).thenReturn(future0);
+        when(pushConsumer.receiveMessage(any(ReceiveMessageRequest.class), any(MessageQueueImpl.class),
+            any(Duration.class))).thenReturn(future0);
         when(pushConsumerSettings.getReceiveBatchSize()).thenReturn(32);
         ReceiveMessageRequest request = ReceiveMessageRequest.newBuilder().build();
-        when(pushConsumer.wrapReceiveMessageRequest(anyInt(), any(MessageQueueImpl.class), any(FilterExpression.class))).thenReturn(request);
+        when(pushConsumer.wrapReceiveMessageRequest(anyInt(), any(MessageQueueImpl.class),
+            any(FilterExpression.class))).thenReturn(request);
         processQueue.fetchMessageImmediately();
         Thread.sleep(ProcessQueueImpl.RECEIVE_LATER_DELAY.toMillis() / 2);
         verify(pushConsumer, times(cachedMessagesCountThresholdPerQueue))
@@ -175,7 +181,8 @@ public class ProcessQueueImplTest extends TestBase {
         final ListenableFuture<AckMessageResponse> future = okAckMessageResponseFuture();
         when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future);
         processQueue.eraseMessage(optionalMessageView.get(), ConsumeResult.SUCCESS);
-        future.addListener(() -> verify(pushConsumer, times(1)).ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
+        future.addListener(() -> verify(pushConsumer, times(1))
+            .ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
         assertEquals(processQueue.cachedMessagesCount(), cachedMessageCount - 1);
         assertEquals(processQueue.inflightMessagesCount(), 0);
         assertEquals(consumptionOkQuantity.get(), 1);
@@ -220,7 +227,8 @@ public class ProcessQueueImplTest extends TestBase {
         when(retryPolicy.getMaxAttempts()).thenReturn(1);
         when(pushConsumer.getConsumptionExecutor()).thenReturn(SINGLE_THREAD_POOL_EXECUTOR);
         final ListenableFuture<Void> future = processQueue.eraseFifoMessage(messageView, ConsumeResult.SUCCESS);
-        future.addListener(() -> verify(pushConsumer, times(1)).ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
+        future.addListener(() -> verify(pushConsumer, times(1))
+            .ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
     }
 
     @Test
@@ -229,13 +237,15 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future0 = okForwardMessageToDeadLetterQueueResponseFuture();
+        ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future0 =
+            okForwardMessageToDeadLetterQueueResponseFuture();
         when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(retryPolicy.getMaxAttempts()).thenReturn(1);
         when(pushConsumer.getConsumptionExecutor()).thenReturn(SINGLE_THREAD_POOL_EXECUTOR);
         final ListenableFuture<Void> future = processQueue.eraseFifoMessage(messageView, ConsumeResult.FAILURE);
-        future.addListener(() -> verify(pushConsumer, times(1)).forwardMessageToDeadLetterQueue(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
+        future.addListener(() -> verify(pushConsumer, times(1))
+            .forwardMessageToDeadLetterQueue(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
     }
 
     @Test
@@ -254,6 +264,7 @@ public class ProcessQueueImplTest extends TestBase {
         when(consumeService.consume(any(MessageViewImpl.class), any(Duration.class))).thenReturn(consumeFuture);
         when(pushConsumer.getConsumeService()).thenReturn(consumeService);
         final ListenableFuture<Void> future = processQueue.eraseFifoMessage(messageView, ConsumeResult.FAILURE);
-        future.addListener(() -> verify(pushConsumer, times(1)).ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
+        future.addListener(() -> verify(pushConsumer, times(1))
+            .ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
     }
 }
\ No newline at end of file
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
similarity index 96%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
index e1c6673..cb37b6b 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
@@ -72,7 +72,8 @@ public class PushConsumerBuilderImplTest extends TestBase {
     @Test(expected = IllegalArgumentException.class)
     public void testBuildWithoutExpressions() throws ClientException {
         final PushConsumerBuilderImpl builder = new PushConsumerBuilderImpl();
-        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+        ClientConfiguration clientConfiguration =
+            ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
         builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_GROUP_0)
             .setMessageListener(messageView -> ConsumeResult.SUCCESS)
             .build();
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
similarity index 80%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
index deff901..54e507c 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
@@ -17,6 +17,13 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import apache.rocketmq.v2.CustomizedBackoff;
 import apache.rocketmq.v2.QueryAssignmentRequest;
 import apache.rocketmq.v2.QueryRouteRequest;
@@ -33,16 +40,16 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
 import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
 import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.tool.TestBase;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -50,13 +57,6 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 @RunWith(MockitoJUnitRunner.class)
 public class PushConsumerImplTest extends TestBase {
     @Mock
@@ -77,21 +77,27 @@ public class PushConsumerImplTest extends TestBase {
     private final int maxCacheMessageSizeInBytes = 1024;
     private final int consumptionThreadCount = 4;
 
-    private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+    private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+        .setEndpoints(FAKE_ACCESS_POINT).build();
 
     private PushConsumerImpl pushConsumer;
 
     private void start(PushConsumerImpl pushConsumer) throws ClientException {
-        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class)))
+        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
+            any(Duration.class)))
             .thenReturn(okQueryRouteResponseFuture());
-        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class)))
+        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+            any(TelemetrySession.class)))
             .thenReturn(telemetryRequestObserver);
-        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("TestScheduler"));
+        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
+            "TestScheduler"));
         when(clientManager.getScheduler()).thenReturn(scheduler);
         doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
 
-        CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder().addNext(Durations.fromNanos(Duration.ofSeconds(3).toNanos())).build();
-        RetryPolicy retryPolicy = RetryPolicy.newBuilder().setMaxAttempts(17).setCustomizedBackoff(customizedBackoff).build();
+        CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder()
+            .addNext(Durations.fromNanos(Duration.ofSeconds(3).toNanos())).build();
+        RetryPolicy retryPolicy = RetryPolicy.newBuilder().setMaxAttempts(17).setCustomizedBackoff(customizedBackoff)
+            .build();
         Subscription subscription = Subscription.newBuilder().build();
         Settings settings = Settings.newBuilder().setSubscription(subscription).setBackoffPolicy(retryPolicy).build();
         final Service service = pushConsumer.startAsync();
@@ -108,14 +114,19 @@ public class PushConsumerImplTest extends TestBase {
 
     @Test
     public void testScanAssignment() throws ExecutionException, InterruptedException, ClientException {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
+            maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
         start(pushConsumer);
-        when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class))).thenReturn(okQueryAssignmentResponseFuture());
+        when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class),
+            any(Duration.class))).thenReturn(okQueryAssignmentResponseFuture());
         pushConsumer.scanAssignments();
-        verify(clientManager, atLeast(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, atLeast(1)).queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class));
+        verify(clientManager, atLeast(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, atLeast(1)).queryAssignment(any(Endpoints.class),
+            any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class));
         Assert.assertEquals(okQueryAssignmentResponseFuture().get().getAssignmentsCount(), pushConsumer.getQueueSize());
-        when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class))).thenReturn(okEmptyQueryAssignmentResponseFuture());
+        when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class),
+            any(Duration.class))).thenReturn(okEmptyQueryAssignmentResponseFuture());
         pushConsumer.scanAssignments();
         Assert.assertEquals(0, pushConsumer.getQueueSize());
         shutdown(pushConsumer);
@@ -123,19 +134,22 @@ public class PushConsumerImplTest extends TestBase {
 
     @Test(expected = IllegalStateException.class)
     public void testSubscribeWithoutStart() throws ClientException {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
+            maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
         pushConsumer.subscribe(FAKE_TOPIC_0, new FilterExpression(FAKE_TOPIC_0));
     }
 
     @Test(expected = IllegalStateException.class)
     public void testUnsubscribeWithoutStart() {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
+            maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
         pushConsumer.unsubscribe(FAKE_TOPIC_0);
     }
 
     @Test
     public void testSubscribeWithSubscriptionOverwriting() throws ClientException {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
+            maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
         start(pushConsumer);
         final FilterExpression filterExpression = new FilterExpression(FAKE_TAG_0);
         pushConsumer.subscribe(FAKE_TOPIC_0, filterExpression);
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
similarity index 94%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
index 315af9b..31609c4 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
@@ -47,7 +47,8 @@ public class SimpleConsumerBuilderTest extends TestBase {
     @Test(expected = IllegalArgumentException.class)
     public void testBuildWithoutExpressions() throws ClientException {
         final SimpleConsumerBuilderImpl builder = new SimpleConsumerBuilderImpl();
-        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+        ClientConfiguration clientConfiguration =
+            ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
         builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_GROUP_0)
             .build();
     }
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
similarity index 84%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index 59a2530..9568a81 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -17,6 +17,14 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import apache.rocketmq.v2.AckMessageRequest;
 import apache.rocketmq.v2.AckMessageResponse;
 import apache.rocketmq.v2.Broker;
@@ -53,23 +61,15 @@ import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
 import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
 import org.apache.rocketmq.client.java.impl.TelemetrySession;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.tool.TestBase;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 @RunWith(MockitoJUnitRunner.class)
 public class SimpleConsumerImplTest extends TestBase {
     @Mock
@@ -78,10 +78,11 @@ public class SimpleConsumerImplTest extends TestBase {
     private StreamObserver<TelemetryCommand> telemetryRequestObserver;
     @InjectMocks
     private ClientManagerRegistry clientManagerRegistry = ClientManagerRegistry.getInstance();
-    private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+    private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+        .setEndpoints(FAKE_ACCESS_POINT).build();
 
     private final Duration awaitDuration = Duration.ofSeconds(3);
-    private final Map<String, FilterExpression> subscriptionExpressions = createSubscriptionExpressions(FAKE_TOPIC_0);
+    private final Map<String, FilterExpression> subExpressions = createSubscriptionExpressions(FAKE_TOPIC_0);
 
     private SimpleConsumerImpl simpleConsumer;
 
@@ -91,15 +92,20 @@ public class SimpleConsumerImplTest extends TestBase {
         List<MessageQueue> messageQueueList = new ArrayList<>();
         MessageQueue mq = MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
             .setPermission(Permission.READ_WRITE)
-            .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0())).setId(0).build();
+            .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0())).setId(0)
+            .build();
         messageQueueList.add(mq);
-        QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status).addAllMessageQueues(messageQueueList).build();
+        QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status)
+            .addAllMessageQueues(messageQueueList).build();
         future0.set(response);
-        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class)))
+        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
+            any(Duration.class)))
             .thenReturn(future0);
-        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class)))
+        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+            any(TelemetrySession.class)))
             .thenReturn(telemetryRequestObserver);
-        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("TestScheduler"));
+        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
+            new ThreadFactoryImpl("TestScheduler"));
         when(clientManager.getScheduler()).thenReturn(scheduler);
         doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
 
@@ -119,38 +125,38 @@ public class SimpleConsumerImplTest extends TestBase {
 
     @Test(expected = IllegalStateException.class)
     public void testReceiveWithoutStart() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         simpleConsumer.receive(1, Duration.ofSeconds(1));
     }
 
     @Test(expected = IllegalStateException.class)
     public void testAckWithoutStart() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         simpleConsumer.ack(fakeMessageViewImpl());
     }
 
     @Test(expected = IllegalStateException.class)
     public void testSubscribeWithoutStart() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         simpleConsumer.subscribe(FAKE_TOPIC_1, FilterExpression.SUB_ALL);
     }
 
     @Test(expected = IllegalStateException.class)
     public void testUnsubscribeWithoutStart() {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         simpleConsumer.unsubscribe(FAKE_TOPIC_0);
     }
 
     @Test
     public void testStartAndShutdown() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         shutdown(simpleConsumer);
     }
 
     @Test
     public void testSubscribeWithSubscriptionOverwriting() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         final FilterExpression filterExpression = new FilterExpression(FAKE_TAG_0);
         simpleConsumer.subscribe(FAKE_TOPIC_0, filterExpression);
@@ -159,7 +165,7 @@ public class SimpleConsumerImplTest extends TestBase {
 
     @Test(expected = IllegalArgumentException.class)
     public void testReceiveWithAllTopicsAreUnsubscribed() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         simpleConsumer.unsubscribe(FAKE_TOPIC_0);
         try {
@@ -171,25 +177,29 @@ public class SimpleConsumerImplTest extends TestBase {
 
     @Test
     public void testReceiveMessageSuccess() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         int receivedMessageCount = 16;
-        final ListenableFuture<Iterator<ReceiveMessageResponse>> future = okReceiveMessageResponsesFuture(FAKE_TOPIC_0, receivedMessageCount);
-        when(clientManager.receiveMessage(any(Endpoints.class), any(Metadata.class), any(ReceiveMessageRequest.class), any(Duration.class))).thenReturn(future);
+        final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
+            okReceiveMessageResponsesFuture(FAKE_TOPIC_0, receivedMessageCount);
+        when(clientManager.receiveMessage(any(Endpoints.class), any(Metadata.class), any(ReceiveMessageRequest.class),
+            any(Duration.class))).thenReturn(future);
         final List<MessageView> messageViews = simpleConsumer.receive(1, Duration.ofSeconds(1));
-        verify(clientManager, times(1)).receiveMessage(any(Endpoints.class), any(Metadata.class), any(ReceiveMessageRequest.class), any(Duration.class));
+        verify(clientManager, times(1)).receiveMessage(any(Endpoints.class),
+            any(Metadata.class), any(ReceiveMessageRequest.class), any(Duration.class));
         assertEquals(receivedMessageCount, messageViews.size());
         shutdown(simpleConsumer);
     }
 
     @Test
     public void testAckMessageSuccess() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         try {
             final MessageViewImpl messageView = fakeMessageViewImpl();
             final ListenableFuture<AckMessageResponse> future = okAckMessageResponseFuture();
-            when(clientManager.ackMessage(any(Endpoints.class), any(Metadata.class), any(AckMessageRequest.class), any(Duration.class))).thenReturn(future);
+            when(clientManager.ackMessage(any(Endpoints.class), any(Metadata.class), any(AckMessageRequest.class),
+                any(Duration.class))).thenReturn(future);
             simpleConsumer.ack(messageView);
         } finally {
             shutdown(simpleConsumer);
@@ -198,12 +208,14 @@ public class SimpleConsumerImplTest extends TestBase {
 
     @Test
     public void testChangeInvisibleDurationSuccess() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         try {
             final MessageViewImpl messageView = fakeMessageViewImpl();
-            final ListenableFuture<ChangeInvisibleDurationResponse> future = okChangeInvisibleDurationResponseFuture(FAKE_RECEIPT_HANDLE_1);
-            when(clientManager.changeInvisibleDuration(any(Endpoints.class), any(Metadata.class), any(ChangeInvisibleDurationRequest.class), any(Duration.class))).thenReturn(future);
+            final ListenableFuture<ChangeInvisibleDurationResponse> future =
+                okChangeInvisibleDurationResponseFuture(FAKE_RECEIPT_HANDLE_1);
+            when(clientManager.changeInvisibleDuration(any(Endpoints.class), any(Metadata.class),
+                any(ChangeInvisibleDurationRequest.class), any(Duration.class))).thenReturn(future);
             simpleConsumer.changeInvisibleDuration0(messageView, Duration.ofSeconds(3));
             assertEquals(FAKE_RECEIPT_HANDLE_1, messageView.getReceiptHandle());
         } finally {
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
similarity index 96%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
index a64babf..e1298bd 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
@@ -17,6 +17,12 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
@@ -34,12 +40,6 @@ import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 public class StandardConsumeServiceTest extends TestBase {
 
     @Test
@@ -73,7 +73,8 @@ public class StandardConsumeServiceTest extends TestBase {
                 Duration duration, MessageHookPointsStatus status) {
             }
         };
-        final StandardConsumeService service = new StandardConsumeService(FAKE_CLIENT_ID, processQueueTable, listener, SINGLE_THREAD_POOL_EXECUTOR, interceptor, SCHEDULER);
+        final StandardConsumeService service = new StandardConsumeService(FAKE_CLIENT_ID, processQueueTable, listener,
+            SINGLE_THREAD_POOL_EXECUTOR, interceptor, SCHEDULER);
         service.dispatch0();
         verify(processQueue0, times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
         verify(processQueue0, times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
similarity index 97%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
index 2bf5520..5fab2b2 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
@@ -20,8 +20,8 @@ package org.apache.rocketmq.client.java.impl.producer;
 import java.io.IOException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
-import org.apache.rocketmq.client.apis.ClientServiceProvider;
 import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
 import org.apache.rocketmq.client.apis.producer.Producer;
 import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
 import org.apache.rocketmq.client.java.impl.ClientManager;
@@ -79,7 +79,8 @@ public class ProducerBuilderImplTest {
     @Test
     public void testBuildWithoutTopic() throws ClientException, IOException {
         ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints("127.0.0.1:80").build();
-        Producer producer = ClientServiceProvider.loadService().newProducerBuilder().setClientConfiguration(clientConfiguration).build();
+        Producer producer = ClientServiceProvider.loadService().newProducerBuilder()
+            .setClientConfiguration(clientConfiguration).build();
         producer.close();
     }
 }
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
similarity index 82%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index 1f2d1ec..29f4278 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -17,6 +17,15 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import apache.rocketmq.v2.Broker;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.MessageQueue;
@@ -50,24 +59,15 @@ import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
 import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
 import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.tool.TestBase;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 @RunWith(MockitoJUnitRunner.class)
 public class ProducerImplTest extends TestBase {
     @Mock
@@ -78,7 +78,9 @@ public class ProducerImplTest extends TestBase {
     @InjectMocks
     private ClientManagerRegistry clientManagerRegistry = ClientManagerRegistry.getInstance();
 
-    private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+    private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+        .setEndpoints(FAKE_ACCESS_POINT).build();
+
     private final String[] str = {FAKE_TOPIC_0};
     private final Set<String> set = new HashSet<>(Arrays.asList(str));
 
@@ -88,7 +90,8 @@ public class ProducerImplTest extends TestBase {
     private final ProducerImpl producer = new ProducerImpl(clientConfiguration, set, 1, null);
 
     @InjectMocks
-    private final ProducerImpl producerWithoutTopicBinding = new ProducerImpl(clientConfiguration, new HashSet<>(), 1, null);
+    private final ProducerImpl producerWithoutTopicBinding = new ProducerImpl(clientConfiguration, new HashSet<>(), 1,
+        null);
 
     private void start(ProducerImpl producer) throws ClientException {
         SettableFuture<QueryRouteResponse> future0 = SettableFuture.create();
@@ -96,15 +99,20 @@ public class ProducerImplTest extends TestBase {
         List<MessageQueue> messageQueueList = new ArrayList<>();
         MessageQueue mq = MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
             .setPermission(Permission.READ_WRITE)
-            .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0())).setId(0).build();
+            .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0()))
+            .setId(0).build();
         messageQueueList.add(mq);
-        QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status).addAllMessageQueues(messageQueueList).build();
+        QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status)
+            .addAllMessageQueues(messageQueueList).build();
         future0.set(response);
-        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class)))
+        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
+            any(Duration.class)))
             .thenReturn(future0);
-        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class)))
+        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+            any(TelemetrySession.class)))
             .thenReturn(telemetryRequestObserver);
-        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("TestScheduler"));
+        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
+            "TestScheduler"));
         when(clientManager.getScheduler()).thenReturn(scheduler);
         doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
 
@@ -131,11 +139,14 @@ public class ProducerImplTest extends TestBase {
     @Test
     public void testSendWithTopicBinding() throws ClientException, ExecutionException, InterruptedException {
         start(producer);
-        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
+            any(Duration.class), any(TelemetrySession.class));
         final Message message = fakeMessage(FAKE_TOPIC_0);
         final ListenableFuture<SendMessageResponse> future = okSendMessageResponseFutureWithSingleEntry();
-        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future);
+        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
+            any(Duration.class))).thenReturn(future);
         final SendMessageResponse response = future.get();
         assertEquals(1, response.getEntriesCount());
         final apache.rocketmq.v2.SendResultEntry receipt = response.getEntriesList().iterator().next();
@@ -147,16 +158,21 @@ public class ProducerImplTest extends TestBase {
     @Test
     public void testSendWithoutTopicBinding() throws ClientException, ExecutionException, InterruptedException {
         start(producerWithoutTopicBinding);
-        verify(clientManager, never()).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, never()).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+        verify(clientManager, never()).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, never()).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+            any(TelemetrySession.class));
         final Message message = fakeMessage(FAKE_TOPIC_0);
         final ListenableFuture<SendMessageResponse> future = okSendMessageResponseFutureWithSingleEntry();
-        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future);
+        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
+            any(Duration.class))).thenReturn(future);
         final SendMessageResponse response = future.get();
         assertEquals(1, response.getEntriesCount());
         final SendReceipt sendReceipt = producerWithoutTopicBinding.send(message);
-        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
+            any(Duration.class), any(TelemetrySession.class));
         final apache.rocketmq.v2.SendResultEntry receipt = response.getEntriesList().iterator().next();
         assertEquals(receipt.getMessageId(), sendReceipt.getMessageId().toString());
         shutdown(producerWithoutTopicBinding);
@@ -165,8 +181,10 @@ public class ProducerImplTest extends TestBase {
     @Test
     public void testSendBatchMessage() throws ClientException, ExecutionException, InterruptedException {
         start(producer);
-        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
+            any(Duration.class), any(TelemetrySession.class));
         int batchMessageNum = 2;
         List<Message> messages = new ArrayList<>();
         for (int i = 0; i < batchMessageNum; i++) {
@@ -175,7 +193,8 @@ public class ProducerImplTest extends TestBase {
         }
 
         final ListenableFuture<SendMessageResponse> future = okBatchSendMessageResponseFuture();
-        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future);
+        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
+            any(Duration.class))).thenReturn(future);
         final SendMessageResponse response = future.get();
         assertEquals(batchMessageNum, response.getEntriesCount());
         final List<apache.rocketmq.v2.SendResultEntry> receipts = response.getEntriesList();
@@ -188,10 +207,13 @@ public class ProducerImplTest extends TestBase {
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void testSendBatchMessageWithDifferentTopic() throws ClientException, ExecutionException, InterruptedException {
+    public void testSendBatchMessageWithDifferentTopic() throws ClientException, ExecutionException,
+        InterruptedException {
         start(producer);
-        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+            any(TelemetrySession.class));
         int batchMessageNum = 2;
         List<Message> messages = new ArrayList<>();
 
@@ -214,10 +236,13 @@ public class ProducerImplTest extends TestBase {
     @Test(expected = ClientException.class)
     public void testSendMessageWithFailure() throws ClientException {
         start(producer);
-        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+            any(TelemetrySession.class));
         final ListenableFuture<SendMessageResponse> future = failureSendMessageResponseFuture();
-        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future);
+        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
+            any(Duration.class))).thenReturn(future);
         Message message0 = fakeMessage(FAKE_TOPIC_0);
         try {
             producer.send(message0);
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
similarity index 90%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
index a66e84d..3519385 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
@@ -29,21 +34,12 @@ import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.tool.TestBase;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
-import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.when;
-
 @RunWith(MockitoJUnitRunner.class)
 public class TransactionImplTest extends TestBase {
     @Mock
@@ -98,7 +94,8 @@ public class TransactionImplTest extends TestBase {
     }
 
     @Test
-    public void testCommit() throws IOException, ClientException, ExecutionException, InterruptedException, NoSuchFieldException, IllegalAccessException {
+    public void testCommit() throws IOException, ClientException, ExecutionException, InterruptedException,
+        NoSuchFieldException, IllegalAccessException {
         final TransactionImpl transaction = new TransactionImpl(producer);
         final Message message0 = fakeMessage(FAKE_TOPIC_0);
 
@@ -113,14 +110,17 @@ public class TransactionImplTest extends TestBase {
         final PublishingMessageImpl publishingMessage = transaction.tryAddMessage(message0);
         final SendReceiptImpl receipt = fakeSendReceiptImpl(fakeMessageQueueImpl0());
         transaction.tryAddReceipt(publishingMessage, receipt);
-        ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor = ArgumentCaptor.forClass(TransactionResolution.class);
-        doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class), any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
+        ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor =
+            ArgumentCaptor.forClass(TransactionResolution.class);
+        doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class),
+            any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
         transaction.commit();
         assertEquals(TransactionResolution.COMMIT, resolutionArgumentCaptor.getValue());
     }
 
     @Test
-    public void testRollback() throws IOException, ClientException, ExecutionException, InterruptedException, NoSuchFieldException, IllegalAccessException {
+    public void testRollback() throws IOException, ClientException, ExecutionException, InterruptedException,
+        NoSuchFieldException, IllegalAccessException {
         final TransactionImpl transaction = new TransactionImpl(producer);
         final Message message0 = fakeMessage(FAKE_TOPIC_0);
 
@@ -135,8 +135,10 @@ public class TransactionImplTest extends TestBase {
         final PublishingMessageImpl publishingMessage = transaction.tryAddMessage(message0);
         final SendReceiptImpl receipt = fakeSendReceiptImpl(fakeMessageQueueImpl0());
         transaction.tryAddReceipt(publishingMessage, receipt);
-        ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor = ArgumentCaptor.forClass(TransactionResolution.class);
-        doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class), any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
+        ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor =
+            ArgumentCaptor.forClass(TransactionResolution.class);
+        doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class),
+            any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
         transaction.rollback();
         assertEquals(TransactionResolution.ROLLBACK, resolutionArgumentCaptor.getValue());
     }
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
similarity index 99%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
index daa70b9..4e3b81d 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
@@ -17,13 +17,12 @@
 
 package org.apache.rocketmq.client.java.message;
 
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.HashSet;
-import java.util.Set;
-
 public class MessageIdCodecTest {
     private final MessageIdCodec codec = MessageIdCodec.getInstance();
 
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
similarity index 96%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
index c43f107..be03d1d 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
@@ -17,21 +17,19 @@
 
 package org.apache.rocketmq.client.java.message;
 
-import java.util.Arrays;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 public class MessageImplTest extends TestBase {
     private final ClientServiceProvider provider = ClientServiceProvider.loadService();
     private final String sampleTopic = "foobar";
@@ -70,7 +68,8 @@ public class MessageImplTest extends TestBase {
 
     @Test
     public void testTagSetter() {
-        final Message message = provider.newMessageBuilder().setTag("tagA").setTopic(FAKE_TOPIC_0).setBody(FAKE_MESSAGE_BODY).build();
+        final Message message = provider.newMessageBuilder().setTag("tagA").setTopic(FAKE_TOPIC_0)
+            .setBody(FAKE_MESSAGE_BODY).build();
         assertTrue(message.getTag().isPresent());
         assertEquals("tagA", message.getTag().get());
     }
@@ -92,7 +91,8 @@ public class MessageImplTest extends TestBase {
 
     @Test
     public void testKeySetter() {
-        final Message message = provider.newMessageBuilder().setKeys("keyA").setTopic(FAKE_TOPIC_0).setBody(FAKE_MESSAGE_BODY).build();
+        final Message message = provider.newMessageBuilder().setKeys("keyA").setTopic(FAKE_TOPIC_0)
+            .setBody(FAKE_MESSAGE_BODY).build();
         assertFalse(message.getKeys().isEmpty());
     }
 
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
similarity index 99%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
index 2ff7238..0faaf23 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.client.java.retry;
 
+import static apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
+import static org.junit.Assert.assertEquals;
+
 import apache.rocketmq.v2.CustomizedBackoff;
 import apache.rocketmq.v2.RetryPolicy;
 import com.google.protobuf.util.Durations;
@@ -25,9 +28,6 @@ import java.util.ArrayList;
 import java.util.List;
 import org.junit.Test;
 
-import static apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
-import static org.junit.Assert.assertEquals;
-
 public class CustomizedBackoffRetryPolicyTest {
 
     @Test
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
similarity index 96%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
index d2163ce..c45673d 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
@@ -82,7 +82,8 @@ public class EndpointsTest {
 
     @Test
     public void testEndpointsWithMultipleIpv6() {
-        final Endpoints endpoints = new Endpoints("1050:0000:0000:0000:0005:0600:300c:326b:8080;1050:0000:0000:0000:0005:0600:300c:326c:8081");
+        final Endpoints endpoints = new Endpoints("1050:0000:0000:0000:0005:0600:300c:326b:8080;" +
+            "1050:0000:0000:0000:0005:0600:300c:326c:8081");
         Assert.assertEquals(AddressScheme.IPv6, endpoints.getScheme());
         Assert.assertEquals(2, endpoints.getAddresses().size());
         final Iterator<Address> iterator = endpoints.getAddresses().iterator();
@@ -96,7 +97,8 @@ public class EndpointsTest {
         Assert.assertEquals(8081, address1.getPort());
 
         Assert.assertEquals(AddressScheme.IPv6, endpoints.getScheme());
-        Assert.assertEquals("ipv6:1050:0000:0000:0000:0005:0600:300c:326b:8080,1050:0000:0000:0000:0005:0600:300c:326c:8081", endpoints.getFacade());
+        Assert.assertEquals("ipv6:1050:0000:0000:0000:0005:0600:300c:326b:8080," +
+            "1050:0000:0000:0000:0005:0600:300c:326c:8081", endpoints.getFacade());
     }
 
     @Test
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
similarity index 94%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index cb5d68f..b30bf93 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -42,7 +42,6 @@ import apache.rocketmq.v2.SystemProperties;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -64,15 +63,15 @@ import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
 import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
-import org.apache.rocketmq.client.java.misc.Utilities;
-import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
 import org.apache.rocketmq.client.java.message.MessageIdCodec;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
+import org.apache.rocketmq.client.java.misc.Utilities;
+import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 
 public class TestBase {
     protected static final String FAKE_CLIENT_ID = "mbp@29848@cno0nhxy";
@@ -163,7 +162,8 @@ public class TestBase {
         final byte[] body = RandomUtils.nextBytes(bodySize);
         Map<String, String> properties = new HashMap<>();
         List<String> keys = new ArrayList<>();
-        return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null, null, keys, properties, FAKE_HOST_0, 1, 1, mq, FAKE_RECEIPT_HANDLE_0, null, 1, corrupted, null);
+        return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null, null,
+            keys, properties, FAKE_HOST_0, 1, 1, mq, FAKE_RECEIPT_HANDLE_0, null, 1, corrupted, null);
     }
 
     protected MessageQueueImpl fakeMessageQueueImpl0() {
@@ -179,11 +179,13 @@ public class TestBase {
     }
 
     protected Broker fakePbBroker0() {
-        return Broker.newBuilder().setEndpoints(fakePbEndpoints0()).setName(FAKE_BROKER_NAME_0).setId(Utilities.MASTER_BROKER_ID).build();
+        return Broker.newBuilder().setEndpoints(fakePbEndpoints0()).setName(FAKE_BROKER_NAME_0)
+            .setId(Utilities.MASTER_BROKER_ID).build();
     }
 
     protected Broker fakePbBroker1() {
-        return Broker.newBuilder().setEndpoints(fakePbEndpoints1()).setName(FAKE_BROKER_NAME_1).setId(Utilities.MASTER_BROKER_ID).build();
+        return Broker.newBuilder().setEndpoints(fakePbEndpoints1()).setName(FAKE_BROKER_NAME_1)
+            .setId(Utilities.MASTER_BROKER_ID).build();
 
     }
 
@@ -196,11 +198,13 @@ public class TestBase {
     }
 
     protected MessageQueue fakePbMessageQueue0(Resource topicResource) {
-        return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker0()).setPermission(Permission.READ_WRITE).build();
+        return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker0())
+            .setPermission(Permission.READ_WRITE).build();
     }
 
     protected MessageQueue fakePbMessageQueue1(Resource topicResource) {
-        return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker1()).setPermission(Permission.READ_WRITE).build();
+        return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker1())
+            .setPermission(Permission.READ_WRITE).build();
     }
 
     protected ListenableFuture<QueryRouteResponse> okQueryRouteResponseFuture() {
@@ -264,7 +268,8 @@ public class TestBase {
         return future;
     }
 
-    protected ListenableFuture<ForwardMessageToDeadLetterQueueResponse> okForwardMessageToDeadLetterQueueResponseFuture() {
+    protected ListenableFuture<ForwardMessageToDeadLetterQueueResponse>
+        okForwardMessageToDeadLetterQueueResponseFuture() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         SettableFuture<ForwardMessageToDeadLetterQueueResponse> future0 = SettableFuture.create();
         final ForwardMessageToDeadLetterQueueResponse response =
@@ -277,8 +282,9 @@ public class TestBase {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         SettableFuture<SendMessageResponse> future0 = SettableFuture.create();
         final String messageId = MessageIdCodec.getInstance().nextMessageId().toString();
-        SendResultEntry entry = SendResultEntry.newBuilder().setMessageId(messageId).setTransactionId(FAKE_TRANSACTION_ID)
-            .setOffset(1).build();
+        SendResultEntry entry =
+            SendResultEntry.newBuilder().setMessageId(messageId).setTransactionId(FAKE_TRANSACTION_ID)
+                .setOffset(1).build();
         SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build();
         future0.set(response);
         return future0;
@@ -300,7 +306,8 @@ public class TestBase {
             .setOffset(1).build();
         SendResultEntry entry1 = SendResultEntry.newBuilder().setMessageId(messageId)
             .setOffset(2).build();
-        SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry0).addEntries(entry1).build();
+        SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry0)
+            .addEntries(entry1).build();
         future0.set(response);
         return future0;
     }
@@ -355,7 +362,8 @@ public class TestBase {
     }
 
     protected ProducerSettings fakeProducerSettings() {
-        return new ProducerSettings(FAKE_CLIENT_ID, fakeEndpoints(), fakeExponentialBackoffRetryPolicy(), Duration.ofSeconds(1), new HashSet<>());
+        return new ProducerSettings(FAKE_CLIENT_ID, fakeEndpoints(), fakeExponentialBackoffRetryPolicy(),
+            Duration.ofSeconds(1), new HashSet<>());
     }
 
     protected SendReceiptImpl fakeSendReceiptImpl(
diff --git a/java/client-java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/java/client/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
similarity index 100%
rename from java/client-java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
rename to java/client/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
diff --git a/java/pom.xml b/java/pom.xml
index ef38d67..abd7dc9 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -10,7 +10,8 @@
     <version>5.0.0-SNAPSHOT</version>
     <modules>
         <module>client-apis</module>
-        <module>client-java</module>
+        <module>client</module>
+        <module>client-shade</module>
     </modules>
 
     <properties>
@@ -32,6 +33,7 @@
 
         <!-- plugin -->
         <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
+        <maven-checkstyle-plugin.version>3.1.1</maven-checkstyle-plugin.version>
     </properties>
 
     <repositories>
@@ -83,6 +85,16 @@
                 <artifactId>rocketmq-client-apis</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>rocketmq-client-java-noshade</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>rocketmq-client-java</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-proto</artifactId>
@@ -148,7 +160,26 @@
                 <artifactId>maven-compiler-plugin</artifactId>
                 <version>${maven-compiler-plugin.version}</version>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>${maven-checkstyle-plugin.version}</version>
+                <configuration>
+                    <consoleOutput>true</consoleOutput>
+                    <encoding>UTF-8</encoding>
+                    <configLocation>style/checkstyle.xml</configLocation>
+                    <includeTestSourceDirectory>true</includeTestSourceDirectory>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>validate</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
-
 </project>
\ No newline at end of file
diff --git a/java/style/checkstyle.xml b/java/style/checkstyle.xml
new file mode 100644
index 0000000..d9bfef4
--- /dev/null
+++ b/java/style/checkstyle.xml
@@ -0,0 +1,385 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<!--
+    Checkstyle configuration originally derived from the Google coding conventions from Google Java Style that can be
+    found at https://google.github.io/styleguide/javaguide.html. Deviations have been made where desired.
+ -->
+<module name="Checker">
+    <property name="charset" value="UTF-8"/>
+    <property name="severity" value="error"/>
+    <property name="fileExtensions" value="java"/>
+
+    <!-- Files must not contain tabs. -->
+    <module name="FileTabCharacter">
+        <property name="eachLine" value="true"/>
+    </module>
+
+    <!-- header -->
+    <module name="RegexpHeader">
+        <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/>
+        <property name="fileExtensions" value="java"/>
+    </module>
+
+    <!--    <module name="SuppressionFilter">-->
+    <!--        <property name="file"-->
+    <!--                  value="${checkstyle.suppressions.file}"-->
+    <!--                  default="checkstyle-suppressions.xml"/>-->
+    <!--    </module>-->
+
+    <module name="TreeWalker">
+
+        <!-- Allow suppressing rules via comments. -->
+        <module name="SuppressionCommentFilter"/>
+
+        <!-- Class names must match the file name in which they are defined. -->
+        <module name="OuterTypeFilename"/>
+
+        <!-- Only one class may be defined per file. -->
+        <module name="OneTopLevelClass"/>
+
+        <!-- Special escape sequences like \n and \t must be used over the octal or unicode equivalent. -->
+        <module name="IllegalTokenText">
+            <property name="tokens" value="STRING_LITERAL, CHAR_LITERAL"/>
+            <property name="format"
+                      value="\\u00(08|09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/>
+            <property name="message" value="Avoid using corresponding octal or Unicode escape."/>
+        </module>
+
+        <!-- Unicode escapes must not be used for printable characters. -->
+        <module name="AvoidEscapedUnicodeCharacters">
+            <property name="allowEscapesForControlCharacters" value="true"/>
+            <property name="allowByTailComment" value="true"/>
+            <property name="allowNonPrintableEscapes" value="true"/>
+        </module>
+
+        <!-- Stars must not be used in import statements. -->
+        <module name="AvoidStarImport"/>
+
+        <!-- Checks for unused imports. -->
+        <module name="UnusedImports"/>
+
+        <!-- Package name and imports must not be wrapped. -->
+        <module name="NoLineWrap"/>
+
+        <!-- Braces must be used for all blocks. -->
+        <module name="NeedBraces"/>
+
+        <!-- Braces must not be empty for most language constructs. -->
+        <module name="EmptyBlock">
+            <property name="option" value="TEXT"/>
+            <property name="tokens" value="LITERAL_TRY, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_SWITCH"/>
+        </module>
+
+        <!-- For language constructs related to the previous statement (eg. "else" or "catch"), the keywords must
+             be defined on the same line as the right curly brace. -->
+        <module name="RightCurly">
+            <property name="id" value="RightCurlySame"/>
+            <property name="tokens"
+                      value="LITERAL_TRY, LITERAL_CATCH, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_DO"/>
+        </module>
+
+        <!-- For other language constructs, they must be defined on a separate line. -->
+        <module name="RightCurly">
+            <property name="id" value="RightCurlyAlone"/>
+            <property name="option" value="alone"/>
+            <property name="tokens"
+                      value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT, INSTANCE_INIT"/>
+        </module>
+
+        <!-- Language constructs like "if" and "while" must be followed by whitespace. -->
+        <module name="WhitespaceAfter"/>
+
+        <!-- Language constructs must be surrounded by whitespace. -->
+        <module name="WhitespaceAround">
+            <property name="allowEmptyConstructors" value="true"/>
+            <property name="allowEmptyMethods" value="true"/>
+            <property name="allowEmptyTypes" value="true"/>
+            <property name="allowEmptyLoops" value="true"/>
+            <message key="ws.notFollowed"
+                     value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement."/>
+            <message key="ws.notPreceded"
+                     value="WhitespaceAround: ''{0}'' is not preceded with whitespace."/>
+        </module>
+
+        <!-- Only one statement per line is permitted. -->
+        <module name="OneStatementPerLine"/>
+
+        <!-- Variables must be defined on different lines. -->
+        <module name="MultipleVariableDeclarations"/>
+
+        <!-- No C-style array declarations are permitted (eg. String args[]). -->
+        <module name="ArrayTypeStyle"/>
+
+        <!-- Defaults must always be included for switch statements, even if they are empty. -->
+        <module name="MissingSwitchDefault"/>
+
+        <!-- Case blocks with statements on them must include a break, return, etc. or the comment "fall through". -->
+        <module name="FallThrough"/>
+
+        <!-- When defining long literals, an upper L must be used. -->
+        <module name="UpperEll"/>
+
+        <!-- Modifiers like public, abstract, static, etc. must follow a consistent order. -->
+        <module name="ModifierOrder"/>
+
+        <!-- Empty lines must separate methods and constructors. -->
+        <module name="EmptyLineSeparator">
+            <property name="allowNoEmptyLineBetweenFields" value="true"/>
+        </module>
+
+        <!-- New lines must happen before dots. -->
+        <module name="SeparatorWrap">
+            <property name="id" value="SeparatorWrapDot"/>
+            <property name="tokens" value="DOT"/>
+            <property name="option" value="nl"/>
+        </module>
+
+        <!-- New lines must happen after commas. -->
+        <module name="SeparatorWrap">
+            <property name="id" value="SeparatorWrapComma"/>
+            <property name="tokens" value="COMMA"/>
+            <property name="option" value="EOL"/>
+        </module>
+
+        <!-- Package names must follow a defined format. -->
+        <module name="PackageName">
+            <property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$"/>
+            <message key="name.invalidPattern"
+                     value="Package name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Type names must follow a defined format. -->
+        <module name="TypeName">
+            <message key="name.invalidPattern"
+                     value="Type name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Non-constant fields must follow a defined format. -->
+        <module name="MemberName">
+            <property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
+            <message key="name.invalidPattern"
+                     value="Member name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Constant fields must follow a defined format. -->
+        <module name="ConstantName">
+            <property name="format" value="^log?|[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$"/>
+        </module>
+
+        <!-- Method and lambda parameters must follow a defined format. -->
+        <module name="ParameterName">
+            <property name="id" value="ParameterNameNonPublic"/>
+            <property name="format" value="^[a-z]([a-zA-Z0-9]*)?$"/>
+            <property name="accessModifiers" value="protected, package, private"/>
+            <message key="name.invalidPattern"
+                     value="Parameter name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+        <module name="ParameterName">
+            <property name="id" value="ParameterNamePublic"/>
+            <property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
+            <property name="accessModifiers" value="public"/>
+            <message key="name.invalidPattern"
+                     value="Parameter name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Catch parameters must follow a defined format. -->
+        <module name="CatchParameterName">
+            <property name="format" value="^(e|t|[a-z][a-zA-Z0-9]*)$"/>
+            <message key="name.invalidPattern"
+                     value="Catch parameter name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Local variables must follow a defined format. -->
+        <module name="LocalVariableName">
+            <property name="tokens" value="VARIABLE_DEF"/>
+            <property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
+            <property name="allowOneCharVarInForLoop" value="true"/>
+            <message key="name.invalidPattern"
+                     value="Local variable name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Type parameters must follow a defined format. -->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
+            <message key="name.invalidPattern"
+                     value="Class type name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+        <module name="MethodTypeParameterName">
+            <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
+            <message key="name.invalidPattern"
+                     value="Method type name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+        <module name="InterfaceTypeParameterName">
+            <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
+            <message key="name.invalidPattern"
+                     value="Interface type name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Method names must follow a defined format. -->
+        <module name="MethodName">
+            <property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
+            <message key="name.invalidPattern"
+                     value="Method name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Finalizers must not be overridden. -->
+        <module name="NoFinalizer"/>
+
+        <!-- Whitespace around generics must follow a defined format. -->
+        <module name="GenericWhitespace">
+            <message key="ws.followed"
+                     value="GenericWhitespace ''{0}'' is followed by whitespace."/>
+            <message key="ws.preceded"
+                     value="GenericWhitespace ''{0}'' is preceded with whitespace."/>
+            <message key="ws.illegalFollow"
+                     value="GenericWhitespace ''{0}'' should be followed by whitespace."/>
+            <message key="ws.notPreceded"
+                     value="GenericWhitespace ''{0}'' is not preceded with whitespace."/>
+        </module>
+
+        <!-- File indentation must follow a convention of 4 spaces (8 for throws statements). -->
+<!--        <module name="Indentation">-->
+<!--            <property name="throwsIndent" value="8"/>-->
+<!--            <property name="arrayInitIndent" value="8"/>-->
+<!--        </module>-->
+
+        <!-- Abbreviations must follow the same conventions as any other word (eg. use Rpc, not RPC). -->
+        <module name="AbbreviationAsWordInName">
+            <property name="ignoreFinal" value="false"/>
+            <!--            <property name="allowedAbbreviationLength" value="3"/>-->
+            <property name="severity" value="warning"/>
+        </module>
+
+        <!-- Class contents must be defined in the order suggested by Sun/Oracle:
+             http://www.oracle.com/technetwork/java/javase/documentation/codeconventions-141855.html#1852 -->
+        <module name="DeclarationOrder"/>
+
+        <!--&lt;!&ndash; Overloaded methods and constructors must be defined together. &ndash;&gt;-->
+        <!--<module name="OverloadMethodsDeclarationOrder">-->
+        <!--<property name="severity" value="warning"/> &lt;!&ndash; TODO: Make error. &ndash;&gt;-->
+        <!--</module>-->
+
+        <!-- Variables must be declared near where they are used. -->
+        <module name="VariableDeclarationUsageDistance">
+            <property name="allowedDistance" value="10"/>
+        </module>
+
+        <!-- Static imports must occur before external package imports. -->
+        <module name="CustomImportOrder">
+            <property name="sortImportsInGroupAlphabetically" value="true"/>
+            <property name="separateLineBetweenGroups" value="true"/>
+            <property name="customImportOrderRules" value="STATIC###THIRD_PARTY_PACKAGE"/>
+        </module>
+
+        <!-- Method names must be specified on the same line as their parameter list. -->
+        <module name="MethodParamPad"/>
+
+        <!-- There must be no space between a method name and its parameter list. -->
+        <module name="ParenPad"/>
+
+        <!-- Non-field annotations must be on separate lines, or in the case of single parameterless annotation can be
+             placed on the same line as the signature. -->
+        <module name="AnnotationLocation">
+            <property name="id" value="AnnotationLocationMostCases"/>
+            <property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF"/>
+        </module>
+
+        <!-- Fields can have multiple annotations applied on the same line. -->
+        <module name="AnnotationLocation">
+            <property name="id" value="AnnotationLocationVariables"/>
+            <property name="tokens" value="VARIABLE_DEF"/>
+            <property name="allowSamelineMultipleAnnotations" value="true"/>
+        </module>
+
+        <!-- Catch blocks must not be empty without a comment. -->
+        <module name="EmptyCatchBlock"/>
+
+        <!-- Comments must be placed at the same indentation level as the surrounding code. -->
+        <module name="CommentsIndentation"/>
+
+        <!-- Checks for imports of certain packages           -->
+        <!-- See http://checkstyle.sf.net/config_imports.html -->
+        <module name="IllegalImport">
+            <property name="illegalPkgs" value="org.apache.http.annotation,javax.annotation.Generated"/>
+        </module>
+
+        <!-- Checks that the override annotation is specified when using @inheritDoc javadoc. -->
+        <module name="MissingOverride"/>
+
+        <!-- Do not allow assignment in subexpressions (except in some cases in loop conditions). -->
+        <module name="InnerAssignment"/>
+
+        <!-- Checks that we don't use System.out.print -->
+        <module name="Regexp">
+            <property name="format" value="System\s*\.\s*(out|err)\s*(\.|::)\s*print"/>
+            <property name="illegalPattern" value="true"/>
+            <property name="message" value="Don't use System console for logging, use a logger instead"/>
+            <property name="ignoreComments" value="true"/>
+        </module>
+
+        <!-- Checks that we don't use Objects.hash. Objects.hashCode is preferred-->
+        <module name="Regexp">
+            <property name="format" value="\bObjects.hash\b"/>
+            <property name="illegalPattern" value="true"/>
+            <property name="message" value="Don't use Objects.hash, use Objects.hashCode instead"/>
+            <property name="ignoreComments" value="true"/>
+        </module>
+
+        <!-- Checks that we don't use sslContext.newHandler directly -->
+        <module name="Regexp">
+            <property name="format" value="\sslContext.newHandler\b"/>
+            <property name="illegalPattern" value="true"/>
+            <property name="message"
+                      value="Don't use sslContext.newHandler directly, use NettyUtils.newSslHandler instead"/>
+            <property name="ignoreComments" value="true"/>
+        </module>
+
+
+        <!-- Checks that we don't use AttributeKey.newInstance directly -->
+        <module name="Regexp">
+            <property name="format" value="AttributeKey\.newInstance"/>
+            <property name="illegalPattern" value="true"/>
+            <property name="message" value="Use NettyUtils.getOrCreateAttributeKey to safely declare AttributeKeys"/>
+            <property name="ignoreComments" value="true"/>
+        </module>
+
+        <!-- Checks that we don't use Thread.currentThread().getContextClassLoader() directly -->
+        <module name="Regexp">
+            <property name="format" value="Thread\s*\.\s*currentThread\s*\(\s*\)\s*(\.|::)\s*getContextClassLoader"/>
+            <property name="illegalPattern" value="true"/>
+            <property name="ignoreComments" value="true"/>
+        </module>
+
+        <!-- Checks for redundant public modifier on interfaces and other redundant modifiers -->
+        <!--        <module name="RedundantModifier"/>-->
+
+        <!-- Checks for utility and constants classes to have private constructor-->
+        <module name="HideUtilityClassConstructor"/>
+    </module>
+
+    <!-- Enforce maximum line lengths. -->
+    <module name="LineLength">
+        <property name="max" value="120"/>
+        <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
+    </module>
+
+</module>
\ No newline at end of file
diff --git a/java/style/intellij-codestyle.xml b/java/style/intellij-codestyle.xml
new file mode 100644
index 0000000..7e479cf
--- /dev/null
+++ b/java/style/intellij-codestyle.xml
@@ -0,0 +1,67 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<code_scheme name="Apache RocketMQ Clients 5.0">
+    <option name="GENERATE_FINAL_LOCALS" value="false"/>
+    <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="9999"/>
+    <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="9999999"/>
+    <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+        <value/>
+    </option>
+    <option name="IMPORT_LAYOUT_TABLE">
+        <value>
+            <package name="" withSubpackages="true" static="true"/>
+            <emptyLine/>
+            <package name="" withSubpackages="true" static="false"/>
+            <emptyLine/>
+        </value>
+    </option>
+    <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/>
+    <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/>
+    <option name="JD_P_AT_EMPTY_LINES" value="false"/>
+    <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/>
+    <JavaCodeStyleSettings>
+        <option name="DO_NOT_WRAP_AFTER_SINGLE_ANNOTATION" value="true"/>
+        <option name="CLASS_NAMES_IN_JAVADOC" value="3"/>
+    </JavaCodeStyleSettings>
+    <XML>
+        <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
+    </XML>
+    <codeStyleSettings language="JAVA">
+        <option name="RIGHT_MARGIN" value="120"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="KEEP_FIRST_COLUMN_COMMENT" value="false"/>
+        <option name="DO_NOT_WRAP_AFTER_SINGLE_ANNOTATION" value="true"/>
+        <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+<!--        <option name="SPACE_BEFORE_ANNOTATION_ARRAY_INITIALIZER_LBRACE" value="true"/>-->
+        <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true"/>
+        <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true"/>
+        <option name="PLACE_ASSIGNMENT_SIGN_ON_NEXT_LINE" value="true"/>
+        <option name="IF_BRACE_FORCE" value="3"/>
+        <option name="DOWHILE_BRACE_FORCE" value="3"/>
+        <option name="WHILE_BRACE_FORCE" value="3"/>
+        <option name="FOR_BRACE_FORCE" value="3"/>
+        <option name="WRAP_LONG_LINES" value="true"/>
+        <option name="WRAP_ON_TYPING" value="1"/>
+        <indentOptions>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+</code_scheme>
\ No newline at end of file
diff --git a/java/style/spotbugs-suppressions.xml b/java/style/spotbugs-suppressions.xml
new file mode 100644
index 0000000..ca8620f
--- /dev/null
+++ b/java/style/spotbugs-suppressions.xml
@@ -0,0 +1,12 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<FindBugsFilter>
+    <Match>
+        <Bug pattern="SF_SWITCH_FALLTHROUGH,SIC_INNER_SHOULD_BE_STATIC_ANON,URF_UNREAD_FIELD,UUF_UNUSED_FIELD,SS_SHOULD_BE_STATIC,NM_CONFUSING"/>
+    </Match>
+
+    <Match>
+        <Package name="~org\.apache\.rocketmq\.client.*"/>
+        <Bug pattern="NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"/>
+    </Match>
+</FindBugsFilter>