You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/06/18 10:34:09 UTC

[rocketmq-clients] branch master updated (70a66a8 -> a40055e)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


    omit 70a66a8  Java: fix checkstyle
     new a40055e  Java: fix checkstyle

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (70a66a8)
            \
             N -- N -- N   refs/heads/master (a40055e)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 java/style/intellij-codestyle.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[rocketmq-clients] 01/01: Java: fix checkstyle

Posted by aa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit a40055ead61e0ec56ae1d2e0f2e9c56849d06f25
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Sat Jun 18 18:21:17 2022 +0800

    Java: fix checkstyle
---
 java/README.md                                     |  48 ++-
 .../rocketmq/client/apis/ClientConfiguration.java  |  11 +-
 .../client/apis/ClientConfigurationBuilder.java    |   4 +-
 .../rocketmq/client/apis/SessionCredentials.java   |   4 +-
 .../client/apis/consumer/FilterExpression.java     |   7 +-
 .../client/apis/consumer/MessageListener.java      |   3 +-
 .../client/apis/consumer/SimpleConsumer.java       |  25 +-
 .../apis/consumer/SimpleConsumerBuilder.java       |   3 +-
 java/client-shade/pom.xml                          |  18 +
 java/{client-java => client}/pom.xml               |   2 +-
 .../client/java/ClientServiceProviderImpl.java     |   0
 .../org/apache/rocketmq/client/java/UserAgent.java |   5 +-
 .../java/exception/ResourceNotFoundException.java  |   0
 .../client/java/hook/MessageHookPoints.java        |   0
 .../client/java/hook/MessageHookPointsStatus.java  |   0
 .../client/java/hook/MessageInterceptor.java       |   0
 .../apache/rocketmq/client/java/impl/Client.java   |   0
 .../rocketmq/client/java/impl/ClientImpl.java      |  96 ++---
 .../rocketmq/client/java/impl/ClientManager.java   |   0
 .../client/java/impl/ClientManagerImpl.java        |   4 +-
 .../client/java/impl/ClientManagerRegistry.java    |   0
 .../rocketmq/client/java/impl/ClientSettings.java  |   0
 .../rocketmq/client/java/impl/ClientType.java      |   0
 .../client/java/impl/TelemetrySession.java         |  44 ++-
 .../client/java/impl/consumer/Assignment.java      |   0
 .../client/java/impl/consumer/Assignments.java     |   0
 .../client/java/impl/consumer/ConsumeService.java  |   4 +-
 .../client/java/impl/consumer/ConsumeTask.java     |   7 +-
 .../client/java/impl/consumer/ConsumerImpl.java    |  36 +-
 .../java/impl/consumer/FifoConsumeService.java     |   7 +-
 .../client/java/impl/consumer/ProcessQueue.java    |   0
 .../java/impl/consumer/ProcessQueueImpl.java       | 105 ++++--
 .../impl/consumer/PushConsumerBuilderImpl.java     |   9 +-
 .../java/impl/consumer/PushConsumerImpl.java       |  55 +--
 .../java/impl/consumer/PushConsumerSettings.java   |  19 +-
 .../java/impl/consumer/ReceiveMessageResult.java   |   0
 .../impl/consumer/SimpleConsumerBuilderImpl.java   |  13 +-
 .../java/impl/consumer/SimpleConsumerImpl.java     |  50 ++-
 .../java/impl/consumer/SimpleConsumerSettings.java |  16 +-
 .../java/impl/consumer/StandardConsumeService.java |   7 +-
 .../consumer/SubscriptionTopicRouteDataResult.java |   3 +-
 .../java/impl/producer/ProducerBuilderImpl.java    |   9 +-
 .../client/java/impl/producer/ProducerImpl.java    |  99 ++++--
 .../java/impl/producer/ProducerSettings.java       |  14 +-
 .../producer/PublishingTopicRouteDataResult.java   |   0
 .../client/java/impl/producer/SendReceiptImpl.java |   4 +-
 .../client/java/impl/producer/TransactionImpl.java |  24 +-
 .../client/java/logging/CustomConsoleAppender.java |   3 +-
 .../client/java/logging/ProcessIdConverter.java    |   0
 .../client/java/message/MessageBuilderImpl.java    |   6 +-
 .../client/java/message/MessageCommon.java         |   6 +-
 .../client/java/message/MessageIdCodec.java        |   5 +-
 .../client/java/message/MessageIdImpl.java         |   0
 .../rocketmq/client/java/message/MessageImpl.java  |  14 +-
 .../rocketmq/client/java/message/MessageType.java  |   0
 .../client/java/message/MessageViewImpl.java       |  31 +-
 .../client/java/message/PublishingMessageImpl.java |   6 +-
 .../client/java/message/protocol/Encoding.java     |   0
 .../client/java/message/protocol/Resource.java     |   0
 .../client/java/metrics/MessageCacheObserver.java  |   0
 .../rocketmq/client/java/metrics/MessageMeter.java |  55 ++-
 .../rocketmq/client/java/metrics/Metric.java       |   0
 .../java/metrics/MetricMessageInterceptor.java     |  10 +-
 .../rocketmq/client/java/metrics/MetricName.java   |   0
 .../client/java/metrics/RocketmqAttributes.java    |   9 +-
 .../rocketmq/client/java/misc/Dispatcher.java      |   4 +-
 .../client/java/misc/ExecutorServices.java         |   0
 .../rocketmq/client/java/misc/LinkedElement.java   |   0
 .../rocketmq/client/java/misc/LinkedIterator.java  |   0
 .../rocketmq/client/java/misc/MetadataUtils.java   |   0
 .../client/java/misc/RequestIdGenerator.java       |   0
 .../client/java/misc/ThreadFactoryImpl.java        |   0
 .../rocketmq/client/java/misc/Utilities.java       |   6 +-
 .../java/retry/CustomizedBackoffRetryPolicy.java   |  10 +-
 .../java/retry/ExponentialBackoffRetryPolicy.java  |  12 +-
 .../rocketmq/client/java/retry/RetryPolicy.java    |   0
 .../apache/rocketmq/client/java/route/Address.java |   0
 .../rocketmq/client/java/route/AddressScheme.java  |   0
 .../apache/rocketmq/client/java/route/Broker.java  |   3 +-
 .../rocketmq/client/java/route/Endpoints.java      |   7 +-
 .../client/java/route/MessageQueueImpl.java        |   0
 .../rocketmq/client/java/route/Permission.java     |   0
 .../rocketmq/client/java/route/TopicRouteData.java |   3 +-
 .../client/java/route/TopicRouteDataResult.java    |   4 +-
 .../rocketmq/client/java/rpc/AuthInterceptor.java  |   4 +-
 .../client/java/rpc/IpNameResolverFactory.java     |   0
 .../client/java/rpc/LoggingInterceptor.java        |  16 +-
 .../apache/rocketmq/client/java/rpc/RpcClient.java |   0
 .../rocketmq/client/java/rpc/RpcClientImpl.java    |   2 +-
 .../apache/rocketmq/client/java/rpc/Signature.java |   0
 .../apache/rocketmq/client/java/rpc/TLSHelper.java |   0
 .../rocketmq.metadata.properties                   |   0
 ...ache.rocketmq.client.apis.ClientServiceProvider |   0
 .../src/main/resources/logback.xml                 |   0
 .../client/java/ClientServiceProviderImplTest.java |  10 +-
 .../client/java/impl/ClientManagerImplTest.java    |   0
 .../java/impl/consumer/ProcessQueueImplTest.java   |  51 +--
 .../impl/consumer/PushConsumerBuilderImplTest.java |   3 +-
 .../java/impl/consumer/PushConsumerImplTest.java   |  60 ++--
 .../impl/consumer/SimpleConsumerBuilderTest.java   |   3 +-
 .../java/impl/consumer/SimpleConsumerImplTest.java |  76 ++--
 .../impl/consumer/StandardConsumeServiceTest.java  |  15 +-
 .../impl/producer/ProducerBuilderImplTest.java     |   5 +-
 .../java/impl/producer/ProducerImplTest.java       |  93 +++--
 .../java/impl/producer/TransactionImplTest.java    |  32 +-
 .../client/java/message/MessageIdCodecTest.java    |   5 +-
 .../client/java/message/MessageImplTest.java       |  22 +-
 .../retry/CustomizedBackoffRetryPolicyTest.java    |   6 +-
 .../rocketmq/client/java/route/EndpointsTest.java  |   6 +-
 .../apache/rocketmq/client/java/tool/TestBase.java |  36 +-
 .../org.mockito.plugins.MockMaker                  |   0
 java/pom.xml                                       |  35 +-
 java/style/checkstyle.xml                          | 385 +++++++++++++++++++++
 java/style/intellij-codestyle.xml                  |  67 ++++
 java/style/spotbugs-suppressions.xml               |  12 +
 115 files changed, 1363 insertions(+), 530 deletions(-)

diff --git a/java/README.md b/java/README.md
index 2a91aa5..dc36bc9 100644
--- a/java/README.md
+++ b/java/README.md
@@ -1 +1,47 @@
-## RocketMQ Clients for Java
+# RocketMQ Clients for Java
+
+[![Java](https://github.com/apache/rocketmq-clients/actions/workflows/java_build.yml/badge.svg)](https://github.com/apache/rocketmq-clients/actions/workflows/java_build.yml)
+
+The java implementation of client for [Apache RocketMQ](https://rocketmq.apache.org/).
+
+## Prerequisites
+
+Java 11 or higher is required to build this project. The built artifacts can be used on Java 8 or
+higher.
+
+<table>
+  <tr>
+    <td><b>Build required:</b></td>
+    <td><b>Java 11 or later</b></td>
+  </tr>
+  <tr>
+    <td><b>Runtime required:</b></td>
+    <td><b>Java 8 or later</b></td>
+  </tr>
+</table>
+
+## Getting Started
+
+Add dependency to your `pom.xml`, and replace the `${rocketmq.version}` by the latest version.
+
+```xml
+<dependency>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-client-java</artifactId>
+    <version>${rocketmq.version}</version>
+</dependency>
+```
+
+It is worth noting that `rocketmq-client-java` is a shaded jar, which means you could not substitute its dependencies.
+From the perspective of avoiding dependency conflicts, you may need a shaded client in most case, but we also provided
+the no-shaded client.
+
+```xml
+<dependency>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-client-java-noshade</artifactId>
+    <version>${rocketmq.version}</version>
+</dependency>
+```
+
+
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
index 866b0f4..c7006e4 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
@@ -28,20 +28,21 @@ public class ClientConfiguration {
     private final SessionCredentialsProvider sessionCredentialsProvider;
     private final Duration requestTimeout;
 
-    public static ClientConfigurationBuilder newBuilder() {
-        return new ClientConfigurationBuilder();
-    }
-
     /**
      * The caller is supposed to have validated the arguments and handled throwing exception or
      * logging warnings already, so we avoid repeating args check here.
      */
-    ClientConfiguration(String accessPoint, SessionCredentialsProvider sessionCredentialsProvider, Duration requestTimeout) {
+    ClientConfiguration(String accessPoint, SessionCredentialsProvider sessionCredentialsProvider,
+        Duration requestTimeout) {
         this.accessPoint = accessPoint;
         this.sessionCredentialsProvider = sessionCredentialsProvider;
         this.requestTimeout = requestTimeout;
     }
 
+    public static ClientConfigurationBuilder newBuilder() {
+        return new ClientConfigurationBuilder();
+    }
+
     public String getAccessPoint() {
         return accessPoint;
     }
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index ca67644..672826e 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -17,10 +17,10 @@
 
 package org.apache.rocketmq.client.apis;
 
-import java.time.Duration;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.time.Duration;
+
 /**
  * Builder to set {@link ClientConfiguration}.
  */
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/SessionCredentials.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/SessionCredentials.java
index e16decd..5328eab 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/SessionCredentials.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/SessionCredentials.java
@@ -17,10 +17,10 @@
 
 package org.apache.rocketmq.client.apis;
 
-import java.util.Optional;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Optional;
+
 /**
  * Session credentials used in service authentications.
  */
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java
index 2f21cd2..dcda10e 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/FilterExpression.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.rocketmq.client.apis.consumer;
 
-import java.util.Objects;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.base.Objects;
+
 /**
  * Filter expression is an efficient way to filter message for {@link SimpleConsumer} and {@link PushConsumer}.
  * Consumer who applied the filter expression only can receive the filtered messages.
@@ -68,6 +67,6 @@ public class FilterExpression {
 
     @Override
     public int hashCode() {
-        return Objects.hash(expression, filterExpressionType);
+        return Objects.hashCode(expression, filterExpressionType);
     }
 }
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java
index a26b741..66e4d43 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/MessageListener.java
@@ -23,7 +23,8 @@ import org.apache.rocketmq.client.apis.message.MessageView;
  * MessageListener is used only for push consumer to process message consumption synchronously.
  *
  * <p> Refer to {@link PushConsumer}, push consumer will get message from server
- * and dispatch the message to backend thread pool which control by parameter threadCount to consumer message concurrently.
+ * and dispatch the message to backend thread pool which control by parameter threadCount to consumer message
+ * concurrently.
  */
 public interface MessageListener {
     /**
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
index 3773008..90ab988 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumer.java
@@ -41,9 +41,10 @@ import org.apache.rocketmq.client.apis.message.MessageView;
  * should be set in this case.
  *
  * <p> Simple consumer divide message consumption to 3 parts.
- * Firstly, call receive api get messages from server; Then process message by yourself; At last, your must call Ack api to commit this message.
- * If there is error when process message ,your can reconsume the message later which control by the invisibleDuration parameter.
- * Also, you can change the invisibleDuration by call changeInvisibleDuration api.
+ * Firstly, call receive api get messages from server; Then process message by yourself; At last, your must call Ack api
+ * to commit this message.
+ * If there is error when process message ,your can reconsume the message later which control by the invisibleDuration
+ * parameter. Also, you can change the invisibleDuration by call changeInvisibleDuration api.
  */
 public interface SimpleConsumer extends Closeable {
     /**
@@ -92,7 +93,8 @@ public interface SimpleConsumer extends Closeable {
      * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned.
      *
      * @param maxMessageNum     max message num when server returns.
-     * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout.
+     * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be
+     *                          invisible to other consumer unless timout.
      * @return list of messageView
      */
     List<MessageView> receive(int maxMessageNum, Duration invisibleDuration) throws ClientException;
@@ -103,7 +105,8 @@ public interface SimpleConsumer extends Closeable {
      * Otherwise, it will await the passed timeout. If the timeout expires, an empty map will be returned.
      *
      * @param maxMessageNum     max message num when server returns.
-     * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be invisible to other consumer unless timout.
+     * @param invisibleDuration set the invisibleDuration of messages return from server. These messages will be
+     *                          invisible to other consumer unless timout.
      * @return list of messageView
      */
     CompletableFuture<List<MessageView>> receiveAsync(int maxMessageNum, Duration invisibleDuration);
@@ -133,8 +136,10 @@ public interface SimpleConsumer extends Closeable {
      * <p> The origin invisible duration for a message decide by ack request.
      *
      * <p>You must call change request before the origin invisible duration timeout.
-     * If called change request later than the origin invisible duration, this request does not take effect and throw exception.
-     * Duplicate change request will refresh the next visible time of this message to other consumers.
+     * If called change request later than the origin invisible duration, this request does not take effect and throw
+     * exception.
+     *
+     * <p>Duplicate change request will refresh the next visible time of this message to other consumers.
      *
      * @param messageView       special messageView with handle want to change.
      * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time.
@@ -147,8 +152,10 @@ public interface SimpleConsumer extends Closeable {
      * <p> The origin invisible duration for a message decide by ack request.
      *
      * <p> You must call change request before the origin invisible duration timeout.
-     * If called change request later than the origin invisible duration, this request does not take effect and throw exception.
-     * Duplicate change request will refresh the next visible time of this message to other consumers.
+     * If called change request later than the origin invisible duration, this request does not take effect and throw
+     * exception.
+     *
+     * <p>Duplicate change request will refresh the next visible time of this message to other consumers.
      *
      * @param messageView       special messageView with handle want to change.
      * @param invisibleDuration new timestamp the message could be visible and reconsume which start from current time.
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
index d2c6134..49a2804 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/SimpleConsumerBuilder.java
@@ -49,7 +49,8 @@ public interface SimpleConsumerBuilder {
 
     /**
      * Set the max await time when receive message from server.
-     * The simple consumer will hold this long-polling receive requests until  a message is returned or a timeout occurs.
+     * The simple consumer will hold this long-polling receive requests until  a message is returned or a timeout
+     * occurs.
      *
      * @param awaitDuration The maximum time to block when no message available.
      * @return the consumer builder instance.
diff --git a/java/client-shade/pom.xml b/java/client-shade/pom.xml
new file mode 100644
index 0000000..a5be176
--- /dev/null
+++ b/java/client-shade/pom.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-client-java-parent</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>5.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-client-java</artifactId>
+
+    <properties>
+        <maven.compiler.release>8</maven.compiler.release>
+    </properties>
+
+</project>
\ No newline at end of file
diff --git a/java/client-java/pom.xml b/java/client/pom.xml
similarity index 97%
rename from java/client-java/pom.xml
rename to java/client/pom.xml
index b7b5e4b..3337ced 100644
--- a/java/client-java/pom.xml
+++ b/java/client/pom.xml
@@ -9,7 +9,7 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>rocketmq-client-java</artifactId>
+    <artifactId>rocketmq-client-java-noshade</artifactId>
 
     <properties>
         <maven.compiler.release>8</maven.compiler.release>
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/ClientServiceProviderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/ClientServiceProviderImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/ClientServiceProviderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/ClientServiceProviderImpl.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/UserAgent.java b/java/client/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
similarity index 94%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
index bdd7754..138f49c 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/UserAgent.java
@@ -23,6 +23,9 @@ import org.apache.rocketmq.client.java.misc.MetadataUtils;
 import org.apache.rocketmq.client.java.misc.Utilities;
 
 public class UserAgent {
+    public static final UserAgent INSTANCE = new UserAgent(MetadataUtils.getVersion(), Utilities.getOsDescription(),
+        Utilities.hostName());
+    
     private final String version;
     private final String platform;
     private final String hostName;
@@ -33,8 +36,6 @@ public class UserAgent {
         this.hostName = hostName;
     }
 
-    public static final UserAgent INSTANCE = new UserAgent(MetadataUtils.getVersion(), Utilities.getOsDescription(), Utilities.hostName());
-
     public UA toProtoBuf() {
         return UA.newBuilder()
             .setLanguage(Language.JAVA)
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPoints.java b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPoints.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPoints.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPoints.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPointsStatus.java b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPointsStatus.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPointsStatus.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageHookPointsStatus.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/hook/MessageInterceptor.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/Client.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
similarity index 94%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 8bd2ad0..b7b45fa 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.client.java.impl;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.HeartbeatRequest;
 import apache.rocketmq.v2.HeartbeatResponse;
@@ -39,8 +41,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.Metadata;
 import java.io.UnsupportedEncodingException;
 import java.security.InvalidKeyException;
@@ -84,8 +84,8 @@ import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.TopicRouteData;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
 import org.apache.rocketmq.client.java.rpc.Signature;
-
-import static com.google.common.base.Preconditions.checkNotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
 public abstract class ClientImpl extends AbstractIdleService implements Client, MessageInterceptor {
@@ -95,10 +95,18 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     protected volatile ClientManager clientManager;
     protected final ClientConfiguration clientConfiguration;
     protected final Endpoints accessEndpoints;
+    protected final Set<String> topics;
+    // Thread-safe set.
+    protected final Set<Endpoints> isolated;
+    protected final ExecutorService clientCallbackExecutor;
+    protected final MessageMeter messageMeter;
+    /**
+     * Telemetry command executor, which is aims to execute commands from remote.
+     */
+    protected final ThreadPoolExecutor telemetryCommandExecutor;
+    protected final String clientId;
 
     private volatile ScheduledFuture<?> updateRouteCacheFuture;
-
-    protected final Set<String> topics;
     private final ConcurrentMap<String, TopicRouteDataResult> topicRouteResultCache;
 
     @GuardedBy("inflightRouteFutureLock")
@@ -109,24 +117,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     private final ConcurrentMap<Endpoints, TelemetrySession> telemetrySessionTable;
     private final ReadWriteLock telemetrySessionsLock;
 
-    // Thread-safe set.
-    protected final Set<Endpoints> isolated;
-
     @GuardedBy("messageInterceptorsLock")
     private final List<MessageInterceptor> messageInterceptors;
     private final ReadWriteLock messageInterceptorsLock;
 
-    protected final ExecutorService clientCallbackExecutor;
-
-    protected final MessageMeter messageMeter;
-
-    /**
-     * Telemetry command executor, which is aims to execute commands from remote.
-     */
-    protected final ThreadPoolExecutor telemetryCommandExecutor;
-
-    protected final String clientId;
-
     public ClientImpl(ClientConfiguration clientConfiguration, Set<String> topics) {
         this.clientConfiguration = checkNotNull(clientConfiguration, "clientConfiguration should not be null");
         final String accessPoint = clientConfiguration.getAccessPoint();
@@ -181,16 +175,19 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         // Register client after client id generation.
         this.clientManager = ClientManagerRegistry.getInstance().registerClient(this);
         // Fetch topic route from remote.
-        LOGGER.info("Begin to fetch topic(s) route data from remote during client startup, clientId={}, topics={}", clientId, topics);
+        LOGGER.info("Begin to fetch topic(s) route data from remote during client startup, clientId={}, topics={}",
+            clientId, topics);
         // Aggregate all topic route data futures into a composited future.
         final List<ListenableFuture<TopicRouteDataResult>> futures = topics.stream()
             .map(this::getRouteDataResult)
             .collect(Collectors.toList());
         List<TopicRouteDataResult> results;
         try {
-            results = Futures.allAsList(futures).get(TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP.toNanos(), TimeUnit.NANOSECONDS);
+            results = Futures.allAsList(futures).get(TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP.toNanos(),
+                TimeUnit.NANOSECONDS);
         } catch (Throwable t) {
-            LOGGER.error("Failed to get topic route data result from remote during client startup, clientId={}, topics={}", clientId, topics, t);
+            LOGGER.error("Failed to get topic route data result from remote during client startup, clientId={}, "
+                + "topics={}", clientId, topics, t);
             throw new ResourceNotFoundException(t);
         }
         // Find any topic whose topic route data is failed to fetch from remote.
@@ -203,7 +200,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
             final Status status = result.getStatus();
             throw new ClientException(status.getCode().getNumber(), status.getMessage());
         }
-        LOGGER.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}", clientId, topics);
+        LOGGER.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}",
+            clientId, topics);
         // Update route cache periodically.
         final ScheduledExecutorService scheduler = clientManager.getScheduler();
         this.updateRouteCacheFuture = scheduler.scheduleWithFixedDelay(() -> {
@@ -260,7 +258,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                 try {
                     interceptor.doBefore(hookPoint, messageCommons);
                 } catch (Throwable t) {
-                    LOGGER.warn("Exception raised while intercepting message, hookPoint={}, clientId={}", hookPoint, clientId);
+                    LOGGER.warn("Exception raised while intercepting message, hookPoint={}, clientId={}", hookPoint,
+                        clientId);
                 }
             }
         } finally {
@@ -277,7 +276,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                 try {
                     interceptor.doAfter(hookPoints, messageCommons, duration, status);
                 } catch (Throwable t) {
-                    LOGGER.warn("Exception raised while intercepting message, hookPoint={}, clientId={}", hookPoints, clientId);
+                    LOGGER.warn("Exception raised while intercepting message, hookPoint={}, clientId={}", hookPoints,
+                        clientId);
                 }
             }
         } finally {
@@ -305,13 +305,15 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                     .build();
                 telemeter(endpoints, telemetryCommand);
             } catch (Throwable t) {
-                LOGGER.error("Failed to send thread stack trace to remote, endpoints={}, nonce={}, clientId={}", endpoints, nonce, clientId, t);
+                LOGGER.error("Failed to send thread stack trace to remote, endpoints={}, nonce={}, clientId={}",
+                    endpoints, nonce, clientId, t);
             }
         };
         try {
             telemetryCommandExecutor.submit(task);
         } catch (Throwable t) {
-            LOGGER.error("[Bug] Exception raised while submitting task to print thread stack trace, endpoints={}, nonce={}, clientId={}", endpoints, nonce, clientId, t);
+            LOGGER.error("[Bug] Exception raised while submitting task to print thread stack trace, endpoints={}, "
+                + "nonce={}, clientId={}", endpoints, nonce, clientId, t);
         }
     }
 
@@ -342,7 +344,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
             try {
                 telemeter(endpoints, command);
             } catch (Throwable t) {
-                LOGGER.error("Failed to telemeter settings to remote, clientId={}, endpoints={}", clientId, endpoints, t);
+                LOGGER.error("Failed to telemeter settings, clientId={}, endpoints={}", clientId, endpoints, t);
             }
         }
     }
@@ -432,14 +434,16 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         Futures.addCallback(future, new FutureCallback<List<TelemetrySession>>() {
             @Override
             public void onSuccess(List<TelemetrySession> sessions) {
-                LOGGER.info("Register session successfully, current route will be cached, topic={}, topicRouteDataResult={}", topic, topicRouteDataResult);
+                LOGGER.info("Register session successfully, current route will be cached, topic={}, "
+                    + "topicRouteDataResult={}", topic, topicRouteDataResult);
                 final TopicRouteDataResult old = topicRouteResultCache.put(topic, topicRouteDataResult);
                 if (topicRouteDataResult.equals(old)) {
                     // Log if topic route result remains the same.
                     LOGGER.info("Topic route result remains the same, topic={}, clientId={}", topic, clientId);
                 } else {
                     // Log if topic route result is updated.
-                    LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId, old, topicRouteDataResult);
+                    LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId,
+                        old, topicRouteDataResult);
                 }
                 onTopicRouteDataResultUpdate0(topic, topicRouteDataResult);
                 future0.set(null);
@@ -448,7 +452,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
             @Override
             public void onFailure(Throwable t) {
                 // Note: Topic route would not be updated if failed to register session.
-                LOGGER.error("Failed to register session, current route will NOT be cached, topic={}, topicRouteDataResult={}", topic, topicRouteDataResult);
+                LOGGER.error("Failed to register session, current route will NOT be cached, topic={}, "
+                    + "topicRouteDataResult={}", topic, topicRouteDataResult);
                 future0.setException(t);
             }
         }, MoreExecutors.directExecutor());
@@ -465,7 +470,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
      * @param command   request of message consume verification from remote.
      */
     public void onVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command) {
-        LOGGER.warn("Ignore verify message command from remote, which is not expected, clientId={}, command={}", clientId, command);
+        LOGGER.warn("Ignore verify message command from remote, which is not expected, clientId={}, command={}",
+            clientId, command);
         final String nonce = command.getNonce();
         final Status status = Status.newBuilder().setCode(Code.NOT_IMPLEMENTED).build();
         VerifyMessageResult verifyMessageResult = VerifyMessageResult.newBuilder().setNonce(nonce).build();
@@ -487,14 +493,16 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
      * @param command   request of orphaned transaction recovery from remote.
      */
     public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand command) {
-        LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, client id={}, command={}", clientId, command);
+        LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, client id={}, "
+            + "command={}", clientId, command);
     }
 
     private void updateRouteCache() {
         LOGGER.info("Start to update route cache for a new round, clientId={}", clientId);
         topicRouteResultCache.keySet().forEach(topic -> {
             // Set timeout for future on purpose.
-            final ListenableFuture<TopicRouteDataResult> future = Futures.withTimeout(fetchTopicRoute(topic), TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP, getScheduler());
+            final ListenableFuture<TopicRouteDataResult> future = Futures.withTimeout(fetchTopicRoute(topic),
+                TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP, getScheduler());
             Futures.addCallback(future, new FutureCallback<TopicRouteDataResult>() {
                 @Override
                 public void onSuccess(TopicRouteDataResult topicRouteDataResult) {
@@ -503,7 +511,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
 
                 @Override
                 public void onFailure(Throwable t) {
-                    LOGGER.error("Failed to fetch topic route for update cache, topic={}, clientId={}", topic, clientId, t);
+                    LOGGER.error("Failed to fetch topic route for update cache, topic={}, clientId={}", topic,
+                        clientId, t);
                 }
             }, MoreExecutors.directExecutor());
         });
@@ -583,7 +592,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                     LOGGER.info("Send heartbeat successfully, endpoints={}, clientId={}", endpoints, clientId);
                     final boolean removed = isolated.remove(endpoints);
                     if (removed) {
-                        LOGGER.info("Rejoin endpoints which is isolated before, clientId={}, endpoints={}", clientId, endpoints);
+                        LOGGER.info("Rejoin endpoints which is isolated before, clientId={}, endpoints={}", clientId,
+                            endpoints);
                     }
                 }
 
@@ -593,7 +603,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                 }
             }, MoreExecutors.directExecutor());
         } catch (Throwable e) {
-            LOGGER.error("Exception raised while preparing heartbeat, endpoints={}, clientId={}", endpoints, clientId, e);
+            LOGGER.error("Exception raised while preparing heartbeat, endpoints={}, clientId={}", endpoints, clientId,
+                e);
         }
     }
 
@@ -679,10 +690,12 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                 Futures.whenAllSucceed(updateFuture).run(() -> {
                     inflightRouteFutureLock.lock();
                     try {
-                        final Set<SettableFuture<TopicRouteDataResult>> newFutureSet = inflightRouteFutureTable.remove(topic);
+                        final Set<SettableFuture<TopicRouteDataResult>> newFutureSet =
+                            inflightRouteFutureTable.remove(topic);
                         if (null == newFutureSet) {
                             // Should never reach here.
-                            LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic, clientId);
+                            LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic,
+                                clientId);
                             return;
                         }
                         LOGGER.debug("Fetch topic route successfully, topic={}, in-flight route future "
@@ -692,7 +705,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                         }
                     } catch (Throwable t) {
                         // Should never reach here.
-                        LOGGER.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic, clientId, t);
+                        LOGGER.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic,
+                            clientId, t);
                     } finally {
                         inflightRouteFutureLock.unlock();
                     }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index cfad8ad..ddbbbf9 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -42,8 +42,6 @@ import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
@@ -69,6 +67,8 @@ import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.rpc.RpcClient;
 import org.apache.rocketmq.client.java.rpc.RpcClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @see ClientManager
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSettings.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientType.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
similarity index 88%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
index 90b02f3..bb51870 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
@@ -27,8 +27,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
 import java.io.UnsupportedEncodingException;
@@ -37,6 +35,8 @@ import java.security.NoSuchAlgorithmException;
 import java.time.Duration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Telemetry session is constructed before first communication between client and remote route endpoints.
@@ -50,17 +50,17 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
     private final Endpoints endpoints;
     private volatile StreamObserver<TelemetryCommand> requestObserver;
 
-    public static ListenableFuture<TelemetrySession> register(ClientImpl client, ClientManager clientManager,
-        Endpoints endpoints) {
-        return new TelemetrySession(client, clientManager, endpoints).register();
-    }
-
     private TelemetrySession(ClientImpl client, ClientManager clientManager, Endpoints endpoints) {
         this.client = client;
         this.clientManager = clientManager;
         this.endpoints = endpoints;
     }
 
+    public static ListenableFuture<TelemetrySession> register(ClientImpl client, ClientManager clientManager,
+        Endpoints endpoints) {
+        return new TelemetrySession(client, clientManager, endpoints).register();
+    }
+
     private ListenableFuture<TelemetrySession> register() {
         ListenableFuture<TelemetrySession> future;
         try {
@@ -69,7 +69,8 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
             final Settings settings = clientSettings.toProtobuf();
             final TelemetryCommand settingsCommand = TelemetryCommand.newBuilder().setSettings(settings).build();
             this.telemeter(settingsCommand);
-            future = Futures.transform(clientSettings.getArrivedFuture(), input -> this, MoreExecutors.directExecutor());
+            future = Futures.transform(clientSettings.getArrivedFuture(), input -> this,
+                MoreExecutors.directExecutor());
         } catch (Throwable t) {
             SettableFuture<TelemetrySession> future0 = SettableFuture.create();
             future0.setException(t);
@@ -78,12 +79,14 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
         Futures.addCallback(future, new FutureCallback<TelemetrySession>() {
             @Override
             public void onSuccess(TelemetrySession session) {
-                LOGGER.info("Register telemetry session successfully, endpoints={}, clientId={}", endpoints, client.getClientId());
+                LOGGER.info("Register telemetry session successfully, endpoints={}, clientId={}", endpoints,
+                    client.getClientId());
             }
 
             @Override
             public void onFailure(Throwable t) {
-                LOGGER.error("Failed to register telemetry session, endpoints={}, clientId={}", endpoints, client.getClientId(), t);
+                LOGGER.error("Failed to register telemetry session, endpoints={}, clientId={}", endpoints,
+                    client.getClientId(), t);
                 release();
             }
         }, MoreExecutors.directExecutor());
@@ -106,7 +109,8 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
     /**
      * Initialize telemetry session.
      */
-    private synchronized void init() throws UnsupportedEncodingException, NoSuchAlgorithmException, InvalidKeyException, ClientException {
+    private synchronized void init() throws UnsupportedEncodingException, NoSuchAlgorithmException,
+        InvalidKeyException, ClientException {
         this.release();
         final Metadata metadata = client.sign();
         this.requestObserver = clientManager.telemetry(endpoints, metadata, Duration.ofNanos(Long.MAX_VALUE), this);
@@ -141,35 +145,41 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
             switch (command.getCommandCase()) {
                 case SETTINGS: {
                     final Settings settings = command.getSettings();
-                    LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints, client.getClientId());
+                    LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints,
+                        client.getClientId());
                     client.onSettingsCommand(endpoints, settings);
                     break;
                 }
                 case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
                     final RecoverOrphanedTransactionCommand recoverOrphanedTransactionCommand =
                         command.getRecoverOrphanedTransactionCommand();
-                    LOGGER.info("Receive orphaned transaction recovery command from remote, endpoints={}, clientId={}", endpoints, client.getClientId());
+                    LOGGER.info("Receive orphaned transaction recovery command from remote, endpoints={}, "
+                        + "clientId={}", endpoints, client.getClientId());
                     client.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand);
                     break;
                 }
                 case VERIFY_MESSAGE_COMMAND: {
                     final VerifyMessageCommand verifyMessageCommand = command.getVerifyMessageCommand();
-                    LOGGER.info("Receive message verification command from remote, endpoints={}, clientId={}", client.getClientId());
+                    LOGGER.info("Receive message verification command from remote, endpoints={}, clientId={}",
+                        client.getClientId());
                     client.onVerifyMessageCommand(endpoints, verifyMessageCommand);
                     break;
                 }
                 case PRINT_THREAD_STACK_TRACE_COMMAND: {
                     final PrintThreadStackTraceCommand printThreadStackTraceCommand =
                         command.getPrintThreadStackTraceCommand();
-                    LOGGER.info("Receive thread stack print command from remote, endpoints={}, clientId={}", endpoints, client.getClientId());
+                    LOGGER.info("Receive thread stack print command from remote, endpoints={}, clientId={}",
+                        endpoints, client.getClientId());
                     client.onPrintThreadStackCommand(endpoints, printThreadStackTraceCommand);
                     break;
                 }
                 default:
-                    LOGGER.warn("Receive unrecognized command from remote, endpoints={}, command={}, clientId={}", endpoints, command, client.getClientId());
+                    LOGGER.warn("Receive unrecognized command from remote, endpoints={}, command={}, clientId={}",
+                        endpoints, command, client.getClientId());
             }
         } catch (Throwable t) {
-            LOGGER.error("[Bug] unexpected exception raised while receiving command from remote, command={}, clientId={}", command, client.getClientId());
+            LOGGER.error("[Bug] unexpected exception raised while receiving command from remote, command={}, "
+                + "clientId={}", command, client.getClientId());
         }
     }
 
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignment.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignment.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignment.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignment.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignments.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignments.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignments.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/Assignments.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
index b045548..793b425 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java
@@ -23,8 +23,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
@@ -36,6 +34,8 @@ import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.misc.Dispatcher;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("NullableProblems")
 public abstract class ConsumeService extends Dispatcher {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
index b68556b..27fbc36 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java
@@ -18,8 +18,6 @@
 package org.apache.rocketmq.client.java.impl.consumer;
 
 import com.google.common.base.Stopwatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
@@ -31,6 +29,8 @@ import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ConsumeTask implements Callable<ConsumeResult> {
     private static final Logger LOGGER = LoggerFactory.getLogger(ConsumeTask.class);
@@ -67,7 +67,8 @@ public class ConsumeTask implements Callable<ConsumeResult> {
             consumeResult = ConsumeResult.FAILURE;
         }
         final Duration duration = stopwatch.elapsed();
-        MessageHookPointsStatus status = ConsumeResult.SUCCESS.equals(consumeResult) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+        MessageHookPointsStatus status = ConsumeResult.SUCCESS.equals(consumeResult) ? MessageHookPointsStatus.OK :
+            MessageHookPointsStatus.ERROR;
         messageInterceptor.doAfter(MessageHookPoints.CONSUME, messageCommons, duration, status);
         // Make sure that the return value is the subset of messageViews.
         return consumeResult;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
similarity index 91%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index b44b112..f674d01 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -39,8 +39,6 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.Timestamp;
 import com.google.protobuf.util.Durations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.Metadata;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -59,6 +57,8 @@ import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
 abstract class ConsumerImpl extends ClientImpl {
@@ -79,7 +79,8 @@ abstract class ConsumerImpl extends ClientImpl {
         try {
             Metadata metadata = sign();
             final Endpoints endpoints = mq.getBroker().getEndpoints();
-            final ListenableFuture<Iterator<ReceiveMessageResponse>> future = clientManager.receiveMessage(endpoints, metadata, request, timeout);
+            final ListenableFuture<Iterator<ReceiveMessageResponse>> future = clientManager.receiveMessage(endpoints,
+                metadata, request, timeout);
             return Futures.transform(future, it -> {
                 // Null here means status not set yet.
                 Status status = null;
@@ -96,13 +97,15 @@ abstract class ConsumerImpl extends ClientImpl {
                             break;
                         case DELIVERY_TIMESTAMP:
                             deliveryTimestampFromRemote = response.getDeliveryTimestamp();
+                            break;
                         default:
-                            LOGGER.warn("[Bug] Not recognized content for receive message response, mq={}, clientId={}, resp={}", mq, clientId, response);
+                            LOGGER.warn("[Bug] Not recognized content for receive message response, mq={}," +
+                                " clientId={}, resp={}", mq, clientId, response);
                     }
                 }
                 for (Message message : messageList) {
-                    final MessageViewImpl messageView = MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote);
-                    messages.add(messageView);
+                    final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote);
+                    messages.add(view);
                 }
                 return new ReceiveMessageResult(endpoints, status, messages);
             }, MoreExecutors.directExecutor());
@@ -156,9 +159,11 @@ abstract class ConsumerImpl extends ClientImpl {
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 final Duration duration = stopwatch.elapsed();
-                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ?
+                    MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
                 if (!Code.OK.equals(code)) {
-                    LOGGER.error("Failed to ack message, code={}, status message=[{}], topic={}, messageId={}, clientId={}", code, status.getMessage(), topic, messageId, clientId);
+                    LOGGER.error("Failed to ack message, code={}, status message=[{}], topic={}, messageId={}," +
+                        " clientId={}", code, status.getMessage(), topic, messageId, clientId);
                 }
                 doAfter(MessageHookPoints.ACK, messageCommons, duration, messageHookPointsStatus);
             }
@@ -167,7 +172,8 @@ abstract class ConsumerImpl extends ClientImpl {
             public void onFailure(Throwable t) {
                 final Duration duration = stopwatch.elapsed();
                 doAfter(MessageHookPoints.ACK, messageCommons, duration, MessageHookPointsStatus.ERROR);
-                LOGGER.error("Exception raised during message acknowledgement, topic={}, messageId={}, clientId={}", topic, messageId, clientId, t);
+                LOGGER.error("Exception raised during message acknowledgement, topic={}, messageId={}, clientId={}",
+                    topic, messageId, clientId, t);
             }
         }, MoreExecutors.directExecutor());
         return future;
@@ -184,7 +190,8 @@ abstract class ConsumerImpl extends ClientImpl {
         try {
             final ChangeInvisibleDurationRequest request = wrapChangeInvisibleDuration(messageView, invisibleDuration);
             final Metadata metadata = sign();
-            future = clientManager.changeInvisibleDuration(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
+            future = clientManager.changeInvisibleDuration(endpoints, metadata, request,
+                clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
             final SettableFuture<ChangeInvisibleDurationResponse> future0 = SettableFuture.create();
             future0.setException(t);
@@ -197,9 +204,11 @@ abstract class ConsumerImpl extends ClientImpl {
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 final Duration duration = stopwatch.elapsed();
-                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ?
+                    MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
                 if (!Code.OK.equals(code)) {
-                    LOGGER.error("Failed to change message invisible duration, messageId={}, endpoints={}, code={}, status message=[{}], clientId={}", messageId, endpoints, code, status.getMessage(), clientId);
+                    LOGGER.error("Failed to change message invisible duration, messageId={}, endpoints={}, code={}," +
+                        " status message=[{}], clientId={}", messageId, endpoints, code, status.getMessage(), clientId);
                 }
                 doAfter(MessageHookPoints.CHANGE_INVISIBLE_DURATION, messageCommons, duration, messageHookPointsStatus);
             }
@@ -208,7 +217,8 @@ abstract class ConsumerImpl extends ClientImpl {
             public void onFailure(Throwable t) {
                 final Duration duration = stopwatch.elapsed();
                 doAfter(MessageHookPoints.ACK, messageCommons, duration, MessageHookPointsStatus.ERROR);
-                LOGGER.error("Exception raised during message acknowledgement, messageId={}, endpoints={}, clientId={}", messageId, endpoints, clientId, t);
+                LOGGER.error("Exception raised during message acknowledgement, messageId={}, endpoints={}, clientId={}",
+                    messageId, endpoints, clientId, t);
 
             }
         }, MoreExecutors.directExecutor());
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
index 0169915..c8beeb2 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
@@ -21,8 +21,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -35,6 +33,8 @@ import org.apache.rocketmq.client.apis.consumer.MessageListener;
 import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("NullableProblems")
 class FifoConsumeService extends ConsumeService {
@@ -53,7 +53,8 @@ class FifoConsumeService extends ConsumeService {
         }
         final MessageViewImpl next = iterator.next();
         final ListenableFuture<ConsumeResult> future0 = consume(next);
-        final ListenableFuture<Void> future = Futures.transformAsync(future0, result -> pq.eraseFifoMessage(next, result), MoreExecutors.directExecutor());
+        final ListenableFuture<Void> future = Futures.transformAsync(future0, result -> pq.eraseFifoMessage(next,
+            result), MoreExecutors.directExecutor());
         Futures.addCallback(future, new FutureCallback<Void>() {
             @Override
             public void onSuccess(Void ignore) {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
similarity index 87%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index bd93e26..3183627 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -29,8 +29,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -53,6 +51,8 @@ import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default implementation of {@link ProcessQueue}.
@@ -63,12 +63,12 @@ import org.apache.rocketmq.client.java.route.MessageQueueImpl;
  */
 @SuppressWarnings({"NullableProblems", "UnstableApiUsage"})
 class ProcessQueueImpl implements ProcessQueue {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ProcessQueueImpl.class);
-
     public static final Duration FORWARD_FIFO_MESSAGE_TO_DLQ_DELAY = Duration.ofMillis(100);
     public static final Duration ACK_FIFO_MESSAGE_DELAY = Duration.ofMillis(100);
     public static final Duration RECEIVE_LATER_DELAY = Duration.ofSeconds(3);
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(ProcessQueueImpl.class);
+
     private final PushConsumerImpl consumer;
 
     /**
@@ -120,12 +120,14 @@ class ProcessQueueImpl implements ProcessQueue {
 
     @Override
     public boolean expired() {
-        Duration maxIdleDuration = Duration.ofNanos(2 * consumer.getPushConsumerSettings().getLongPollingTimeout().toNanos());
+        final PushConsumerSettings settings = consumer.getPushConsumerSettings();
+        Duration maxIdleDuration = Duration.ofNanos(2 * settings.getLongPollingTimeout().toNanos());
         final Duration idleDuration = Duration.ofNanos(System.nanoTime() - activityNanoTime);
         if (idleDuration.compareTo(maxIdleDuration) < 0) {
             return false;
         }
-        LOGGER.warn("Process queue is idle, idle duration={}, max idle duration={}, mq={}, clientId={}", idleDuration, maxIdleDuration, mq, consumer.getClientId());
+        LOGGER.warn("Process queue is idle, idle duration={}, max idle duration={}, mq={}, clientId={}", idleDuration,
+            maxIdleDuration, mq, consumer.getClientId());
         return true;
     }
 
@@ -153,11 +155,13 @@ class ProcessQueueImpl implements ProcessQueue {
             corrupted.forEach(messageView -> {
                 final MessageId messageId = messageView.getMessageId();
                 if (consumer.getPushConsumerSettings().isFifo()) {
-                    LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}, messageId={}, clientId={}", mq, messageId, consumer.getClientId());
+                    LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}," +
+                        " messageId={}, clientId={}", mq, messageId, consumer.getClientId());
                     forwardToDeadLetterQueue(messageView);
                     return;
                 }
-                LOGGER.error("Message is corrupted, nack it in standard mode, mq={}, messageId={}, clientId={}", mq, messageId, consumer.getClientId());
+                LOGGER.error("Message is corrupted, nack it in standard mode, mq={}, messageId={}, clientId={}", mq,
+                    messageId, consumer.getClientId());
                 nackMessage(messageView);
             });
         }
@@ -188,18 +192,21 @@ class ProcessQueueImpl implements ProcessQueue {
                 return;
             }
             // Should never reach here.
-            LOGGER.error("[Bug] Failed to schedule receive message request, mq={}, clientId={}", mq, consumer.getClientId(), t);
+            LOGGER.error("[Bug] Failed to schedule receive message request, mq={}, clientId={}", mq,
+                consumer.getClientId(), t);
             receiveMessageLater();
         }
     }
 
     public void receiveMessage() {
         if (dropped) {
-            LOGGER.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", mq, consumer.getClientId());
+            LOGGER.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", mq,
+                consumer.getClientId());
             return;
         }
         if (this.isCacheFull()) {
-            LOGGER.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq, consumer.getClientId());
+            LOGGER.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq,
+                consumer.getClientId());
             receiveMessageLater();
             return;
         }
@@ -208,7 +215,8 @@ class ProcessQueueImpl implements ProcessQueue {
 
     private void receiveMessageImmediately() {
         if (!consumer.isRunning()) {
-            LOGGER.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, consumer.getClientId());
+            LOGGER.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq,
+                consumer.getClientId());
             return;
         }
         try {
@@ -228,18 +236,20 @@ class ProcessQueueImpl implements ProcessQueue {
                 public void onSuccess(ReceiveMessageResult result) {
                     // Intercept after message reception.
                     final Duration duration = stopwatch.elapsed();
-                    final List<MessageCommon> messageCommons = result.getMessages().stream().map(MessageViewImpl::getMessageCommon).collect(Collectors.toList());
+                    final List<MessageCommon> commons = result.getMessages().stream()
+                        .map(MessageViewImpl::getMessageCommon).collect(Collectors.toList());
                     if (result.getStatus().isPresent() && Code.OK.equals(result.getStatus().get().getCode())) {
-                        consumer.doAfter(MessageHookPoints.RECEIVE, messageCommons, duration, MessageHookPointsStatus.OK);
+                        consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.OK);
                     } else {
-                        consumer.doAfter(MessageHookPoints.RECEIVE, messageCommons, duration, MessageHookPointsStatus.ERROR);
+                        consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.ERROR);
                     }
 
                     try {
                         onReceiveMessageResult(result);
                     } catch (Throwable t) {
                         // Should never reach here.
-                        LOGGER.error("[Bug] Exception raised while handling receive result, would receive later, mq={}, endpoints={}, clientId={}",
+                        LOGGER.error("[Bug] Exception raised while handling receive result, would receive later," +
+                                " mq={}, endpoints={}, clientId={}",
                             mq, endpoints, consumer.getClientId(), t);
                         receiveMessageLater();
                     }
@@ -249,16 +259,18 @@ class ProcessQueueImpl implements ProcessQueue {
                 public void onFailure(Throwable t) {
                     // Intercept after message reception.
                     final Duration duration = stopwatch.elapsed();
-                    consumer.doAfter(MessageHookPoints.RECEIVE, Collections.emptyList(), duration, MessageHookPointsStatus.ERROR);
+                    consumer.doAfter(MessageHookPoints.RECEIVE, Collections.emptyList(), duration,
+                        MessageHookPointsStatus.ERROR);
 
-                    LOGGER.error("Exception raised while message reception, would receive later, mq={}, endpoints={}, clientId={}",
-                        mq, endpoints, consumer.getClientId(), t);
+                    LOGGER.error("Exception raised while message reception, would receive later, mq={}, endpoints={}," +
+                        " clientId={}", mq, endpoints, consumer.getClientId(), t);
                     receiveMessageLater();
                 }
             }, MoreExecutors.directExecutor());
             consumer.getReceptionTimes().getAndIncrement();
         } catch (Throwable t) {
-            LOGGER.error("Exception raised while message reception, would receive later, mq={}, clientId={}", mq, consumer.getClientId(), t);
+            LOGGER.error("Exception raised while message reception, would receive later, mq={}, clientId={}", mq,
+                consumer.getClientId(), t);
             receiveMessageLater();
         }
     }
@@ -267,14 +279,16 @@ class ProcessQueueImpl implements ProcessQueue {
         final int cacheMessageCountThresholdPerQueue = consumer.cacheMessageCountThresholdPerQueue();
         final long actualMessagesQuantity = this.cachedMessagesCount();
         if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity) {
-            LOGGER.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}, mq={}, clientId={}",
+            LOGGER.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}," +
+                    " mq={}, clientId={}",
                 cacheMessageCountThresholdPerQueue, actualMessagesQuantity, mq, consumer.getClientId());
             return true;
         }
         final int cacheMessageBytesThresholdPerQueue = consumer.cacheMessageBytesThresholdPerQueue();
         final long actualCachedMessagesBytes = this.cachedMessageBytes();
         if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes) {
-            LOGGER.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes, actual={} bytes, mq={}, clientId={}",
+            LOGGER.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes," +
+                    " actual={} bytes, mq={}, clientId={}",
                 cacheMessageBytesThresholdPerQueue, actualCachedMessagesBytes, mq, consumer.getClientId());
             return true;
         }
@@ -316,7 +330,8 @@ class ProcessQueueImpl implements ProcessQueue {
                 receiveMessage();
             }
             optionalStatus = Optional.of(Status.newBuilder().setCode(Code.OK).build());
-            LOGGER.error("[Bug] Status not set but message(s) found in the receive result, fix the status to OK, mq={}, endpoints={}", mq, endpoints);
+            LOGGER.error("[Bug] Status not set but message(s) found in the receive result, fix the status to OK," +
+                " mq={}, endpoints={}", mq, endpoints);
         }
         final Status status = optionalStatus.get();
         final Code code = status.getCode();
@@ -327,17 +342,20 @@ class ProcessQueueImpl implements ProcessQueue {
                     consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
                     consumer.getConsumeService().signal();
                 }
-                LOGGER.debug("Receive message with OK, mq={}, endpoints={}, messages found count={}, clientId={}", mq, endpoints, messages.size(), consumer.getClientId());
+                LOGGER.debug("Receive message with OK, mq={}, endpoints={}, messages found count={}, clientId={}",
+                    mq, endpoints, messages.size(), consumer.getClientId());
                 receiveMessage();
                 break;
             case MESSAGE_NOT_FOUND:
-                LOGGER.info("Message not found while receiving message, mq={}, endpoints={}, clientId={}, code={}, status message=[{}]",
+                LOGGER.info("Message not found while receiving message, mq={}, endpoints={}, clientId={}, code={}," +
+                        " status message=[{}]",
                     mq, endpoints, consumer.getClientId(), code.getNumber(), status.getMessage());
                 receiveMessage();
                 break;
             // Fall through on purpose.
             default:
-                LOGGER.error("Failed to receive message from remote, try it later, mq={}, endpoints={}, clientId={}, code={}, status message=[{}]",
+                LOGGER.error("Failed to receive message from remote, try it later, mq={}, endpoints={}, clientId={}," +
+                        " code={}, status message=[{}]",
                     mq, endpoints, consumer.getClientId(), code.getNumber(), status.getMessage());
                 receiveMessageLater();
         }
@@ -431,14 +449,18 @@ class ProcessQueueImpl implements ProcessQueue {
         if (ConsumeResult.FAILURE.equals(consumeResult) && attempt < maxAttempts) {
             final Duration nextAttemptDelay = retryPolicy.getNextAttemptDelay(attempt);
             attempt = messageView.incrementAndGetDeliveryAttempt();
-            LOGGER.debug("Prepare to redeliver the fifo message because of the consumption failure, maxAttempt={}, attempt={}, mq={}, messageId={}, nextAttemptDelay={}, clientId={}",
+            LOGGER.debug("Prepare to redeliver the fifo message because of the consumption failure, maxAttempt={}," +
+                    " attempt={}, mq={}, messageId={}, nextAttemptDelay={}, clientId={}",
                 maxAttempts, attempt, mq, messageId, nextAttemptDelay, consumer.getClientId());
             final ListenableFuture<ConsumeResult> future = service.consume(messageView, nextAttemptDelay);
-            return Futures.transformAsync(future, result -> eraseFifoMessage(messageView, result), MoreExecutors.directExecutor());
+            return Futures.transformAsync(future, result -> eraseFifoMessage(messageView, result),
+                MoreExecutors.directExecutor());
         }
         boolean ok = ConsumeResult.SUCCESS.equals(consumeResult);
         if (!ok) {
-            LOGGER.info("Failed to consume fifo message finally, run out of attempt times, maxAttempts={}, attempt={}, mq={}, messageId={}, clientId={}",
+            LOGGER.info("Failed to consume fifo message finally, run out of attempt times, maxAttempts={}, "
+                    + "attempt={}," +
+                    " mq={}, messageId={}, clientId={}",
                 maxAttempts, attempt, mq, messageId, consumer.getClientId());
         }
         // Ack message or forward it to DLQ depends on consumption result.
@@ -465,7 +487,8 @@ class ProcessQueueImpl implements ProcessQueue {
                 final Code code = status.getCode();
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
-                    LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later, clientId={}, messageId={}, attempt={}, mq={}, code={}, status message={}",
+                    LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later," +
+                            " clientId={}, messageId={}, attempt={}, mq={}, code={}, status message={}",
                         clientId, messageView.getMessageId(), attempt, mq, code, status.getMessage());
                     forwardToDeadLetterQueue(messageView, 1 + attempt, future0);
                     return;
@@ -496,7 +519,8 @@ class ProcessQueueImpl implements ProcessQueue {
         final String clientId = consumer.getClientId();
         // Process queue is dropped, no need to proceed.
         if (dropped) {
-            LOGGER.info("Process queue was dropped, give up to forward message to dead letter queue, mq={}, messageId={}, clientId={}", mq, messageId, clientId);
+            LOGGER.info("Process queue was dropped, give up to forward message to dead letter queue, mq={}," +
+                " messageId={}, clientId={}", mq, messageId, clientId);
             return;
         }
         final ScheduledExecutorService scheduler = consumer.getScheduler();
@@ -508,7 +532,8 @@ class ProcessQueueImpl implements ProcessQueue {
                 return;
             }
             // Should never reach here.
-            LOGGER.error("[Bug] Failed to schedule DLQ message request, mq={}, messageId={}, clientId={}", mq, messageView.getMessageId(), clientId);
+            LOGGER.error("[Bug] Failed to schedule DLQ message request, mq={}, messageId={}, clientId={}", mq,
+                messageView.getMessageId(), clientId);
             forwardToDeadLetterQueueLater(messageView, 1 + attempt, future0);
         }
     }
@@ -531,15 +556,16 @@ class ProcessQueueImpl implements ProcessQueue {
                 final Code code = status.getCode();
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
-                    LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, attempt={}, messageId={}, mq={}, code={}, endpoints={}, status message=[{}]",
+                    LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, attempt={}," +
+                            " messageId={}, mq={}, code={}, endpoints={}, status message=[{}]",
                         clientId, attempt, messageView.getMessageId(), mq, code, endpoints, status.getMessage());
                     ackFifoMessageLater(messageView, 1 + attempt, future0);
                     return;
                 }
                 // Log retries.
                 if (1 < attempt) {
-                    LOGGER.info("Re-ack fifo message successfully, clientId={}, attempt={}, messageId={}, mq={}, endpoints={}",
-                        clientId, attempt, messageView.getMessageId(), mq, endpoints);
+                    LOGGER.info("Re-ack fifo message successfully, clientId={}, attempt={}, messageId={}, mq={}," +
+                        " endpoints={}", clientId, attempt, messageView.getMessageId(), mq, endpoints);
                 }
                 // Set result if FIFO message is acknowledged successfully.
                 future0.set(null);
@@ -548,7 +574,8 @@ class ProcessQueueImpl implements ProcessQueue {
             @Override
             public void onFailure(Throwable t) {
                 // Log failure and retry later.
-                LOGGER.error("Exception raised while ack fifo message, clientId={}, would attempt to re-ack later, attempt={}, messageId={}, mq={}, endpoints={}",
+                LOGGER.error("Exception raised while ack fifo message, clientId={}, would attempt to re-ack later," +
+                        " attempt={}, messageId={}, mq={}, endpoints={}",
                     clientId, attempt, messageView.getMessageId(), mq, endpoints, t);
                 ackFifoMessageLater(messageView, 1 + attempt, future0);
             }
@@ -561,7 +588,8 @@ class ProcessQueueImpl implements ProcessQueue {
         final String clientId = consumer.getClientId();
         // Process queue is dropped, no need to proceed.
         if (dropped) {
-            LOGGER.info("Process queue was dropped, give up to ack message, mq={}, messageId={}, clientId={}", mq, messageId, clientId);
+            LOGGER.info("Process queue was dropped, give up to ack message, mq={}, messageId={}, clientId={}",
+                mq, messageId, clientId);
             return;
         }
         final ScheduledExecutorService scheduler = consumer.getScheduler();
@@ -573,7 +601,8 @@ class ProcessQueueImpl implements ProcessQueue {
                 return;
             }
             // Should never reach here.
-            LOGGER.error("[Bug] Failed to schedule ack fifo message request, mq={}, messageId={}, clientId={}", mq, messageId, clientId);
+            LOGGER.error("[Bug] Failed to schedule ack fifo message request, mq={}, messageId={}, clientId={}",
+                mq, messageId, clientId);
             ackFifoMessageLater(messageView, 1 + attempt, future0);
         }
     }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
index 13cae19..de3c6ef 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImpl.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
@@ -27,9 +30,6 @@ import org.apache.rocketmq.client.apis.consumer.MessageListener;
 import org.apache.rocketmq.client.apis.consumer.PushConsumer;
 import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Implementation of {@link PushConsumerBuilder}
  */
@@ -59,7 +59,8 @@ public class PushConsumerBuilderImpl implements PushConsumerBuilder {
     @Override
     public PushConsumerBuilder setConsumerGroup(String consumerGroup) {
         checkNotNull(consumerGroup, "consumerGroup should not be null");
-        checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(), "consumerGroup does not match the regex [regex=%s]", CONSUMER_GROUP_PATTERN.pattern());
+        checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(), "consumerGroup does not match the "
+            + "regex [regex=%s]", CONSUMER_GROUP_PATTERN.pattern());
         this.consumerGroup = consumerGroup;
         return this;
     }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
similarity index 93%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 59784a2..b14c047 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -33,8 +33,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.Metadata;
 import java.time.Duration;
 import java.util.Collections;
@@ -73,6 +71,8 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default implementation of {@link PushConsumer}
@@ -86,6 +86,9 @@ import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
 class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCacheObserver {
     private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerImpl.class);
 
+    final AtomicLong consumptionOkQuantity;
+    final AtomicLong consumptionErrorQuantity;
+
     private final ClientConfiguration clientConfiguration;
     private final PushConsumerSettings pushConsumerSettings;
     private final String consumerGroup;
@@ -104,9 +107,6 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
      */
     private final AtomicLong receivedMessagesQuantity;
 
-    final AtomicLong consumptionOkQuantity;
-    final AtomicLong consumptionErrorQuantity;
-
     private final ThreadPoolExecutor consumptionExecutor;
     private final ConcurrentMap<MessageQueueImpl, ProcessQueue> processQueueTable;
     private ConsumeService consumeService;
@@ -215,7 +215,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
     public PushConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException {
         // Check consumer status.
         if (!this.isRunning()) {
-            LOGGER.error("Unable to add subscription because push consumer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to add subscription because push consumer is not running, state={}, clientId={}",
+                this.state(), clientId);
             throw new IllegalStateException("Push consumer is not running now");
         }
 
@@ -237,7 +238,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
     public PushConsumer unsubscribe(String topic) {
         // Check consumer status.
         if (!this.isRunning()) {
-            LOGGER.error("Unable to remove subscription because push consumer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to remove subscription because push consumer is not running, state={}, clientId={}",
+                this.state(), clientId);
             throw new IllegalStateException("Push consumer is not running now");
         }
         subscriptionExpressions.remove(topic);
@@ -278,7 +280,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
             final Status status = response.getStatus();
             final Code code = status.getCode();
             if (!Code.OK.equals(code)) {
-                final String message = String.format("Failed to query assignment, code=%d, status message=[{%s}]", code.getNumber(), status.getMessage());
+                final String message = String.format("Failed to query assignment, code=%d, status message=[{%s}]",
+                    code.getNumber(), status.getMessage());
                 throw new RuntimeException(message);
             }
             SettableFuture<Assignments> future0 = SettableFuture.create();
@@ -385,14 +388,17 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
                     public void onSuccess(Assignments latest) {
                         if (latest.getAssignmentList().isEmpty()) {
                             if (null == existed || existed.getAssignmentList().isEmpty()) {
-                                LOGGER.info("Acquired empty assignments from remote, would scan later, topic={}, clientId={}", topic, clientId);
+                                LOGGER.info("Acquired empty assignments from remote, would scan later, topic={}, "
+                                    + "clientId={}", topic, clientId);
                                 return;
                             }
-                            LOGGER.info("Attention!!! acquired empty assignments from remote, but existed assignments is not empty, topic={}, clientId={}", topic, clientId);
+                            LOGGER.info("Attention!!! acquired empty assignments from remote, but existed assignments"
+                                + " is not empty, topic={}, clientId={}", topic, clientId);
                         }
 
                         if (!latest.equals(existed)) {
-                            LOGGER.info("Assignments of topic={} has changed, {} => {}, clientId={}", topic, existed, latest, clientId);
+                            LOGGER.info("Assignments of topic={} has changed, {} => {}, clientId={}", topic, existed,
+                                latest, clientId);
                             syncProcessQueue(topic, latest, filterExpression);
                             cacheAssignments.put(topic, latest);
                             return;
@@ -404,7 +410,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
                     @SuppressWarnings("NullableProblems")
                     @Override
                     public void onFailure(Throwable t) {
-                        LOGGER.error("Exception raised while scanning the assignments, topic={}, clientId={}", topic, clientId, t);
+                        LOGGER.error("Exception raised while scanning the assignments, topic={}, clientId={}", topic,
+                            clientId, t);
                     }
                 }, MoreExecutors.directExecutor());
             }
@@ -472,7 +479,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
             public void onSuccess(ConsumeResult consumeResult) {
                 Code code = ConsumeResult.SUCCESS.equals(consumeResult) ? Code.OK : Code.FAILED_TO_CONSUME_MESSAGE;
                 Status status = Status.newBuilder().setCode(code).build();
-                final VerifyMessageResult verifyMessageResult = VerifyMessageResult.newBuilder().setNonce(nonce).build();
+                final VerifyMessageResult verifyMessageResult =
+                    VerifyMessageResult.newBuilder().setNonce(nonce).build();
                 TelemetryCommand command = TelemetryCommand.newBuilder()
                     .setVerifyMessageResult(verifyMessageResult)
                     .setStatus(status)
@@ -480,14 +488,16 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
                 try {
                     telemeter(endpoints, command);
                 } catch (Throwable t) {
-                    LOGGER.error("Failed to send message verification result command, endpoints={}, command={}, messageId={}, clientId={}", endpoints, command, messageId, clientId, t);
+                    LOGGER.error("Failed to send message verification result command, endpoints={}, command={}, "
+                        + "messageId={}, clientId={}", endpoints, command, messageId, clientId, t);
                 }
             }
 
             @Override
             public void onFailure(Throwable t) {
                 // Should never reach here.
-                LOGGER.error("[Bug] Failed to get message verification result, endpoints={}, messageId={}, clientId={}", endpoints, messageId, clientId, t);
+                LOGGER.error("[Bug] Failed to get message verification result, endpoints={}, messageId={}, "
+                    + "clientId={}", endpoints, messageId, clientId, t);
             }
         }, MoreExecutors.directExecutor());
     }
@@ -527,7 +537,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
             @Override
             public void onSuccess(ForwardMessageToDeadLetterQueueResponse response) {
                 final Duration duration = stopwatch.elapsed();
-                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(response.getStatus().getCode()) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(response.getStatus().getCode()) ?
+                    MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
                 // Intercept after forwarding message to DLQ.
                 doAfter(MessageHookPoints.FORWARD_TO_DLQ, messageCommons, duration, messageHookPointsStatus);
             }
@@ -550,11 +561,15 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
         final long consumptionOkQuantity = this.consumptionOkQuantity.getAndSet(0);
         final long consumptionErrorQuantity = this.consumptionErrorQuantity.getAndSet(0);
 
-        LOGGER.info("clientId={}, consumerGroup={}, receptionTimes={}, receivedMessagesQuantity={}, consumptionOkQuantity={}, consumptionErrorQuantity={}",
-            clientId, consumerGroup, receptionTimes, receivedMessagesQuantity, consumptionOkQuantity, consumptionErrorQuantity);
+        LOGGER.info("clientId={}, consumerGroup={}, receptionTimes={}, receivedMessagesQuantity={}, "
+                + "consumptionOkQuantity={}, consumptionErrorQuantity={}",
+            clientId, consumerGroup, receptionTimes, receivedMessagesQuantity, consumptionOkQuantity,
+            consumptionErrorQuantity);
         for (ProcessQueue pq : processQueueTable.values()) {
-            LOGGER.info("Process queue stats: clientId={}, mq={}, pendingMessageCount={}, inflightMessageCount={}, cachedMessageBytes={}",
-                clientId, pq.getMessageQueue(), pq.getPendingMessageCount(), pq.getInflightMessageCount(), pq.getCachedMessageBytes());
+            LOGGER.info("Process queue stats: clientId={}, mq={}, pendingMessageCount={}, inflightMessageCount={}, "
+                    + "cachedMessageBytes={}",
+                clientId, pq.getMessageQueue(), pq.getPendingMessageCount(), pq.getInflightMessageCount(),
+                pq.getCachedMessageBytes());
         }
     }
 
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
index 90e96a2..b9e97a5 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerSettings.java
@@ -24,8 +24,6 @@ import apache.rocketmq.v2.Subscription;
 import apache.rocketmq.v2.SubscriptionEntry;
 import com.google.common.base.MoreObjects;
 import com.google.protobuf.util.Durations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
@@ -39,6 +37,8 @@ import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PushConsumerSettings extends ClientSettings {
     private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerSettings.class);
@@ -73,8 +73,10 @@ public class PushConsumerSettings extends ClientSettings {
         List<SubscriptionEntry> subscriptionEntries = new ArrayList<>();
         for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
             final FilterExpression filterExpression = entry.getValue();
-            apache.rocketmq.v2.Resource topic = apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
-            final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder = apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
+            apache.rocketmq.v2.Resource topic =
+                apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
+            final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
+                apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
             final FilterExpressionType type = filterExpression.getFilterExpressionType();
             switch (type) {
                 case TAG:
@@ -86,10 +88,12 @@ public class PushConsumerSettings extends ClientSettings {
                 default:
                     LOGGER.warn("[Bug] Unrecognized filter type, type={}", type);
             }
-            SubscriptionEntry subscriptionEntry = SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
+            SubscriptionEntry subscriptionEntry =
+                SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
             subscriptionEntries.add(subscriptionEntry);
         }
-        Subscription subscription = Subscription.newBuilder().setGroup(group.toProtobuf()).addAllSubscriptions(subscriptionEntries).build();
+        Subscription subscription =
+            Subscription.newBuilder().setGroup(group.toProtobuf()).addAllSubscriptions(subscriptionEntries).build();
         return Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
             .setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setSubscription(subscription)
             .setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
@@ -99,7 +103,8 @@ public class PushConsumerSettings extends ClientSettings {
     public void applySettingsCommand(Settings settings) {
         final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
         if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
-            LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, client type={}", clientId, pubSubCase, clientType);
+            LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, "
+                + "client type={}", clientId, pubSubCase, clientType);
             return;
         }
         final Subscription subscription = settings.getSubscription();
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
similarity index 94%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
index fe19f4b..297845d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderImpl.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -27,9 +30,6 @@ import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 public class SimpleConsumerBuilderImpl implements SimpleConsumerBuilder {
     private static final Pattern CONSUMER_GROUP_PATTERN = Pattern.compile("^[%|a-zA-Z0-9._-]{1,255}$");
 
@@ -53,8 +53,8 @@ public class SimpleConsumerBuilderImpl implements SimpleConsumerBuilder {
     @Override
     public SimpleConsumerBuilder setConsumerGroup(String consumerGroup) {
         checkNotNull(consumerGroup, "consumerGroup should not be null");
-        checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(), "consumerGroup does not match the regex [regex=%s]",
-            CONSUMER_GROUP_PATTERN.pattern());
+        checkArgument(CONSUMER_GROUP_PATTERN.matcher(consumerGroup).matches(),
+            "consumerGroup does not match the regex [regex=%s]", CONSUMER_GROUP_PATTERN.pattern());
         this.consumerGroup = consumerGroup;
         return this;
     }
@@ -79,7 +79,8 @@ public class SimpleConsumerBuilderImpl implements SimpleConsumerBuilder {
         checkNotNull(consumerGroup, "consumerGroup has not been set yet");
         checkArgument(!subscriptionExpressions.isEmpty(), "subscriptionExpressions have not been set yet");
         checkNotNull(awaitDuration, "awaitDuration has not been set yet");
-        final SimpleConsumerImpl consumer = new SimpleConsumerImpl(clientConfiguration, consumerGroup, awaitDuration, subscriptionExpressions);
+        final SimpleConsumerImpl consumer = new SimpleConsumerImpl(clientConfiguration, consumerGroup, awaitDuration,
+            subscriptionExpressions);
         consumer.startAsync().awaitRunning();
         return consumer;
     }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index f0605b6..db48b87 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -27,8 +27,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -51,6 +49,8 @@ import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of {@link SimpleConsumer}
@@ -66,20 +66,21 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     private final AtomicInteger topicIndex;
 
     private final Map<String /* topic */, FilterExpression> subscriptionExpressions;
-    private final ConcurrentMap<String /* topic */, SubscriptionTopicRouteDataResult> subscriptionTopicRouteDataResultCache;
+    private final ConcurrentMap<String /* topic */, SubscriptionTopicRouteDataResult> subTopicRouteDataResultCache;
 
     public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration,
         Map<String, FilterExpression> subscriptionExpressions) {
         super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
         Resource groupResource = new Resource(consumerGroup);
-        this.simpleConsumerSettings = new SimpleConsumerSettings(clientId, accessEndpoints, groupResource, clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
+        this.simpleConsumerSettings = new SimpleConsumerSettings(clientId, accessEndpoints, groupResource,
+            clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
         this.consumerGroup = consumerGroup;
         this.awaitDuration = awaitDuration;
 
         this.topicIndex = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
 
         this.subscriptionExpressions = subscriptionExpressions;
-        this.subscriptionTopicRouteDataResultCache = new ConcurrentHashMap<>();
+        this.subTopicRouteDataResultCache = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -111,7 +112,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     public SimpleConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException {
         // Check consumer status.
         if (!this.isRunning()) {
-            LOGGER.error("Unable to add subscription because simple consumer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to add subscription because simple consumer is not running, state={}, clientId={}",
+                this.state(), clientId);
             throw new IllegalStateException("Simple consumer is not running now");
         }
         final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
@@ -131,7 +133,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     @Override
     public SimpleConsumer unsubscribe(String topic) {
         if (!this.isRunning()) {
-            LOGGER.error("Unable to remove subscription because simple consumer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to remove subscription because simple consumer is not running, state={}, "
+                + "clientId={}", this.state(), clientId);
             throw new IllegalStateException("Simple consumer is not running now");
         }
         subscriptionExpressions.remove(topic);
@@ -167,7 +170,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     public ListenableFuture<List<MessageView>> receive0(int maxMessageNum, Duration invisibleDuration) {
         SettableFuture<List<MessageView>> future = SettableFuture.create();
         if (!this.isRunning()) {
-            LOGGER.error("Unable to receive message because simple consumer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to receive message because simple consumer is not running, state={}, clientId={}",
+                this.state(), clientId);
             future.setException(new IllegalStateException("Simple consumer is not running now"));
             return future;
         }
@@ -175,7 +179,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         final ArrayList<String> topics = new ArrayList<>(copy.keySet());
         // All topic is subscribed.
         if (topics.isEmpty()) {
-            final IllegalArgumentException exception = new IllegalArgumentException("There is no topic to receive message");
+            final IllegalArgumentException exception = new IllegalArgumentException("There is no topic to receive "
+                + "message");
             future.setException(exception);
             return future;
         }
@@ -184,7 +189,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         final ListenableFuture<SubscriptionTopicRouteDataResult> routeFuture = getSubscriptionTopicRouteResult(topic);
         final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
             final MessageQueueImpl mq = result.takeMessageQueue();
-            final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression, invisibleDuration);
+            final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
+                invisibleDuration);
             return receiveMessage(request, mq, awaitDuration);
         }, MoreExecutors.directExecutor());
         return Futures.transformAsync(future0, result -> {
@@ -225,12 +231,14 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         SettableFuture<Void> future0 = SettableFuture.create();
         // Check consumer status.
         if (!this.isRunning()) {
-            LOGGER.error("Unable to ack message because simple consumer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to ack message because simple consumer is not running, state={}, clientId={}",
+                this.state(), clientId);
             future0.setException(new IllegalStateException("Simple consumer is not running now"));
             return future0;
         }
         if (!(messageView instanceof MessageViewImpl)) {
-            final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for messageView");
+            final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for "
+                + "messageView");
             future0.setException(exception);
             return future0;
         }
@@ -268,12 +276,14 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     public ListenableFuture<Void> changeInvisibleDuration0(MessageView messageView, Duration invisibleDuration) {
         SettableFuture<Void> future0 = SettableFuture.create();
         if (!(messageView instanceof MessageViewImpl)) {
-            final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for messageView");
+            final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for "
+                + "messageView");
             future0.setException(exception);
             return future0;
         }
         MessageViewImpl impl = (MessageViewImpl) messageView;
-        final ListenableFuture<ChangeInvisibleDurationResponse> future = changInvisibleDuration(impl, invisibleDuration);
+        final ListenableFuture<ChangeInvisibleDurationResponse> future = changInvisibleDuration(impl,
+            invisibleDuration);
         return Futures.transformAsync(future, response -> {
             // Refresh receipt handle manually.
             impl.setReceiptHandle(response.getReceiptHandle());
@@ -302,21 +312,23 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     }
 
     public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
-        final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult = new SubscriptionTopicRouteDataResult(topicRouteDataResult);
-        subscriptionTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
+        final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult =
+            new SubscriptionTopicRouteDataResult(topicRouteDataResult);
+        subTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
     }
 
     private ListenableFuture<SubscriptionTopicRouteDataResult> getSubscriptionTopicRouteResult(final String topic) {
         SettableFuture<SubscriptionTopicRouteDataResult> future0 = SettableFuture.create();
-        final SubscriptionTopicRouteDataResult result = subscriptionTopicRouteDataResultCache.get(topic);
+        final SubscriptionTopicRouteDataResult result = subTopicRouteDataResultCache.get(topic);
         if (null != result) {
             future0.set(result);
             return future0;
         }
         final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
         return Futures.transform(future, topicRouteDataResult -> {
-            final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult = new SubscriptionTopicRouteDataResult(topicRouteDataResult);
-            subscriptionTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
+            final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult =
+                new SubscriptionTopicRouteDataResult(topicRouteDataResult);
+            subTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
             return subscriptionTopicRouteDataResult;
         }, MoreExecutors.directExecutor());
     }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
index 70c981a..0117237 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerSettings.java
@@ -23,8 +23,6 @@ import apache.rocketmq.v2.Subscription;
 import apache.rocketmq.v2.SubscriptionEntry;
 import com.google.common.base.MoreObjects;
 import com.google.protobuf.util.Durations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
@@ -36,6 +34,8 @@ import org.apache.rocketmq.client.java.impl.ClientSettings;
 import org.apache.rocketmq.client.java.impl.ClientType;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SimpleConsumerSettings extends ClientSettings {
     private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerSettings.class);
@@ -57,8 +57,10 @@ public class SimpleConsumerSettings extends ClientSettings {
         List<SubscriptionEntry> subscriptionEntries = new ArrayList<>();
         for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
             final FilterExpression filterExpression = entry.getValue();
-            apache.rocketmq.v2.Resource topic = apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
-            final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder = apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
+            apache.rocketmq.v2.Resource topic =
+                apache.rocketmq.v2.Resource.newBuilder().setName(entry.getKey()).build();
+            final apache.rocketmq.v2.FilterExpression.Builder expressionBuilder =
+                apache.rocketmq.v2.FilterExpression.newBuilder().setExpression(filterExpression.getExpression());
             final FilterExpressionType type = filterExpression.getFilterExpressionType();
             switch (type) {
                 case TAG:
@@ -70,7 +72,8 @@ public class SimpleConsumerSettings extends ClientSettings {
                 default:
                     LOGGER.warn("[Bug] Unrecognized filter type for simple consumer, type={}", type);
             }
-            SubscriptionEntry subscriptionEntry = SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
+            SubscriptionEntry subscriptionEntry =
+                SubscriptionEntry.newBuilder().setTopic(topic).setExpression(expressionBuilder.build()).build();
             subscriptionEntries.add(subscriptionEntry);
         }
         Subscription subscription = Subscription.newBuilder().setGroup(group.toProtobuf())
@@ -85,7 +88,8 @@ public class SimpleConsumerSettings extends ClientSettings {
     public void applySettingsCommand(Settings settings) {
         final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
         if (!Settings.PubSubCase.SUBSCRIPTION.equals(pubSubCase)) {
-            LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, client type={}", clientId, pubSubCase, clientType);
+            LOGGER.error("[Bug] Issued settings not match with the client type, client id ={}, pub-sub case={}, "
+                + "client type={}", clientId, pubSubCase, clientType);
             return;
         }
         this.arrivedFuture.set(null);
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
similarity index 99%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
index bb0681d..52d0c21 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java
@@ -21,8 +21,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -35,6 +33,8 @@ import org.apache.rocketmq.client.apis.consumer.MessageListener;
 import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("NullableProblems")
 public class StandardConsumeService extends ConsumeService {
@@ -89,7 +89,6 @@ public class StandardConsumeService extends ConsumeService {
         boolean dispatched;
         do {
             dispatched = dispatch0();
-        }
-        while (dispatched);
+        } while (dispatched);
     }
 }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
index 522934d..804cf3f 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
@@ -63,7 +63,8 @@ public class SubscriptionTopicRouteDataResult {
         }
         if (messageQueues.isEmpty()) {
             // Should never reach here.
-            throw new ResourceNotFoundException("Failed to take message queue due to readable message queue doesn't exist");
+            throw new ResourceNotFoundException("Failed to take message queue due to readable message queue doesn't "
+                + "exist");
         }
         final int next = messageQueueIndex.getAndIncrement();
         return messageQueues.get(IntMath.mod(next, messageQueues.size()));
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
similarity index 95%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
index 3ab768a..30d4f9d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImpl.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
@@ -27,9 +30,6 @@ import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
 import org.apache.rocketmq.client.apis.producer.TransactionChecker;
 import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Implementation of {@link ProducerBuilder}
  */
@@ -57,7 +57,8 @@ public class ProducerBuilderImpl implements ProducerBuilder {
     @Override
     public ProducerBuilder setTopics(String... topics) {
         final Set<String> set = Arrays.stream(topics).peek(topic -> checkNotNull(topic, "topic should not be null"))
-            .peek(topic -> checkArgument(MessageBuilderImpl.TOPIC_PATTERN.matcher(topic).matches(), "topic does not match the regex [regex=%s]", MessageBuilderImpl.TOPIC_PATTERN.pattern()))
+            .peek(topic -> checkArgument(MessageBuilderImpl.TOPIC_PATTERN.matcher(topic).matches(), "topic does not "
+                + "match the regex [regex=%s]", MessageBuilderImpl.TOPIC_PATTERN.pattern()))
             .collect(Collectors.toSet());
         this.topics.addAll(set);
         return this;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
similarity index 87%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 0877499..7c3d658 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -34,8 +34,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.Metadata;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -73,6 +71,8 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default implementation of {@link Producer}
@@ -92,10 +92,12 @@ class ProducerImpl extends ClientImpl implements Producer {
      * The caller is supposed to have validated the arguments and handled throwing exception or
      * logging warnings already, so we avoid repeating args check here.
      */
-    ProducerImpl(ClientConfiguration clientConfiguration, Set<String> topics, int maxAttempts, TransactionChecker checker) {
+    ProducerImpl(ClientConfiguration clientConfiguration, Set<String> topics, int maxAttempts,
+        TransactionChecker checker) {
         super(clientConfiguration, topics);
         ExponentialBackoffRetryPolicy retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
-        this.producerSettings = new ProducerSettings(clientId, accessEndpoints, retryPolicy, clientConfiguration.getRequestTimeout(), topics.stream().map(Resource::new).collect(Collectors.toSet()));
+        this.producerSettings = new ProducerSettings(clientId, accessEndpoints, retryPolicy,
+            clientConfiguration.getRequestTimeout(), topics.stream().map(Resource::new).collect(Collectors.toSet()));
         this.checker = checker;
 
         this.publishingRouteDataResultCache = new ConcurrentHashMap<>();
@@ -121,14 +123,17 @@ class ProducerImpl extends ClientImpl implements Producer {
         final String transactionId = command.getTransactionId();
         final String messageId = command.getOrphanedTransactionalMessage().getSystemProperties().getMessageId();
         if (null == checker) {
-            LOGGER.error("No transaction checker registered, ignore it, messageId={}, transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId);
+            LOGGER.error("No transaction checker registered, ignore it, messageId={}, transactionId={}, endpoints={},"
+                + " clientId={}", messageId, transactionId, endpoints, clientId);
             return;
         }
         MessageViewImpl messageView;
         try {
             messageView = MessageViewImpl.fromProtobuf(command.getOrphanedTransactionalMessage(), mq);
         } catch (Throwable t) {
-            LOGGER.error("[Bug] Failed to decode message during orphaned transaction message recovery, messageId={}, transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, endpoints, clientId, t);
+            LOGGER.error("[Bug] Failed to decode message during orphaned transaction message recovery, messageId={}, "
+                    + "transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, endpoints,
+                clientId, t);
             return;
         }
         ListenableFuture<TransactionResolution> future;
@@ -148,15 +153,18 @@ class ProducerImpl extends ClientImpl implements Producer {
                     if (null == resolution || TransactionResolution.UNKNOWN.equals(resolution)) {
                         return;
                     }
-                    endTransaction(endpoints, messageView.getMessageCommon(), messageView.getMessageId(), transactionId, resolution);
+                    endTransaction(endpoints, messageView.getMessageCommon(), messageView.getMessageId(),
+                        transactionId, resolution);
                 } catch (Throwable t) {
-                    LOGGER.error("Exception raised while ending the transaction, messageId={}, transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
+                    LOGGER.error("Exception raised while ending the transaction, messageId={}, transactionId={}, "
+                        + "endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
                 }
             }
 
             @Override
             public void onFailure(Throwable t) {
-                LOGGER.error("Exception raised while checking the transaction, messageId={}, transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
+                LOGGER.error("Exception raised while checking the transaction, messageId={}, transactionId={}, "
+                    + "endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t);
 
             }
         }, MoreExecutors.directExecutor());
@@ -202,7 +210,8 @@ class ProducerImpl extends ClientImpl implements Producer {
         } catch (Throwable t) {
             throw new ClientException(t);
         }
-        final ListenableFuture<List<SendReceiptImpl>> future = send0(Collections.singletonList(publishingMessage), true);
+        final ListenableFuture<List<SendReceiptImpl>> future = send0(Collections.singletonList(publishingMessage),
+            true);
         final List<SendReceiptImpl> receipts = handleClientFuture(future);
         final SendReceiptImpl sendReceipt = receipts.iterator().next();
         ((TransactionImpl) transaction).tryAddReceipt(publishingMessage, sendReceipt);
@@ -235,7 +244,8 @@ class ProducerImpl extends ClientImpl implements Producer {
     @Override
     public Transaction beginTransaction() {
         if (!this.isRunning()) {
-            LOGGER.error("Unable to begin a transaction because producer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to begin a transaction because producer is not running, state={}, clientId={}",
+                this.state(), clientId);
             throw new IllegalStateException("Producer is not running now");
         }
         return new TransactionImpl(this);
@@ -270,7 +280,8 @@ class ProducerImpl extends ClientImpl implements Producer {
 
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final List<MessageCommon> messageCommons = Collections.singletonList(messageCommon);
-        MessageHookPoints messageHookPoints = TransactionResolution.COMMIT.equals(resolution) ? MessageHookPoints.COMMIT_TRANSACTION : MessageHookPoints.ROLLBACK_TRANSACTION;
+        MessageHookPoints messageHookPoints = TransactionResolution.COMMIT.equals(resolution) ?
+            MessageHookPoints.COMMIT_TRANSACTION : MessageHookPoints.ROLLBACK_TRANSACTION;
         doBefore(messageHookPoints, messageCommons);
 
         final ListenableFuture<EndTransactionResponse> future =
@@ -281,7 +292,8 @@ class ProducerImpl extends ClientImpl implements Producer {
                 final Duration duration = stopwatch.elapsed();
                 final Status status = result.getStatus();
                 final Code code = status.getCode();
-                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
+                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK :
+                    MessageHookPointsStatus.ERROR;
                 doAfter(messageHookPoints, messageCommons, duration, messageHookPointsStatus);
             }
 
@@ -324,14 +336,16 @@ class ProducerImpl extends ClientImpl implements Producer {
         if (!this.isRunning()) {
             final IllegalStateException e = new IllegalStateException("Producer is not running now");
             future.setException(e);
-            LOGGER.error("Unable to send message because producer is not running, state={}, clientId={}", this.state(), clientId);
+            LOGGER.error("Unable to send message because producer is not running, state={}, clientId={}",
+                this.state(), clientId);
             return future;
         }
 
         List<PublishingMessageImpl> pubMessages = new ArrayList<>();
         for (Message message : messages) {
             try {
-                final PublishingMessageImpl pubMessage = new PublishingMessageImpl(message, producerSettings, txEnabled);
+                final PublishingMessageImpl pubMessage = new PublishingMessageImpl(message, producerSettings,
+                    txEnabled);
                 pubMessages.add(pubMessage);
             } catch (Throwable t) {
                 // Failed to refine message, no need to proceed.
@@ -347,7 +361,8 @@ class ProducerImpl extends ClientImpl implements Producer {
             // Messages have different topics, no need to proceed.
             final IllegalArgumentException e = new IllegalArgumentException("Messages to send have different topics");
             future.setException(e);
-            LOGGER.error("Messages to be sent have different topics, no need to proceed, topic(s)={}, clientId={}", topics, clientId);
+            LOGGER.error("Messages to be sent have different topics, no need to proceed, topic(s)={}, clientId={}",
+                topics, clientId);
             return future;
         }
 
@@ -358,9 +373,11 @@ class ProducerImpl extends ClientImpl implements Producer {
             .collect(Collectors.toSet());
         if (1 < messageTypes.size()) {
             // Messages have different message type, no need to proceed.
-            final IllegalArgumentException e = new IllegalArgumentException("Messages to send have different types, please check");
+            final IllegalArgumentException e = new IllegalArgumentException("Messages to send have different types, "
+                + "please check");
             future.setException(e);
-            LOGGER.error("Messages to be sent have different message types, no need to proceed, topic={}, messageType(s)={}, clientId={}", topic, messageTypes, clientId, e);
+            LOGGER.error("Messages to be sent have different message types, no need to proceed, topic={}, messageType"
+                + "(s)={}, clientId={}", topic, messageTypes, clientId, e);
             return future;
         }
 
@@ -374,9 +391,11 @@ class ProducerImpl extends ClientImpl implements Producer {
                 .map(Optional::get).collect(Collectors.toSet());
 
             if (1 < messageGroups.size()) {
-                final IllegalArgumentException e = new IllegalArgumentException("FIFO messages to send have different message groups, messageGroups=" + messageGroups);
+                final IllegalArgumentException e = new IllegalArgumentException("FIFO messages to send have different"
+                    + " message groups, messageGroups=" + messageGroups);
                 future.setException(e);
-                LOGGER.error("FIFO messages to be sent have different message groups, no need to proceed, topic={}, messageGroups={}, clientId={}", topic, messageGroups, clientId, e);
+                LOGGER.error("FIFO messages to be sent have different message groups, no need to proceed, topic={}, "
+                    + "messageGroups={}, clientId={}", topic, messageGroups, clientId, e);
                 return future;
             }
             messageGroup = messageGroups.iterator().next();
@@ -418,28 +437,31 @@ class ProducerImpl extends ClientImpl implements Producer {
         }
         // Calculate the current message queue.
         final MessageQueueImpl messageQueue = candidates.get(IntMath.mod(attempt - 1, candidates.size()));
-//        if (!messageQueue.matchMessageType(messageType)) {
-//            final IllegalArgumentException e = new IllegalArgumentException("Current message type not match with topic accept message types");
-//            future.setException(e);
-//            return;
-//        }
+        //        if (!messageQueue.matchMessageType(messageType)) {
+        //            final IllegalArgumentException e = new IllegalArgumentException("Current message type not match
+        //            with topic accept message types");
+        //            future.setException(e);
+        //            return;
+        //        }
         final Endpoints endpoints = messageQueue.getBroker().getEndpoints();
         final SendMessageRequest request = wrapSendMessageRequest(messages);
 
         final ListenableFuture<SendMessageResponse> responseFuture = clientManager.sendMessage(endpoints, metadata,
             request, clientConfiguration.getRequestTimeout());
 
-        final ListenableFuture<List<SendReceiptImpl>> attemptFuture = Futures.transformAsync(responseFuture, response -> {
-            final SettableFuture<List<SendReceiptImpl>> future0 = SettableFuture.create();
-            future0.set(SendReceiptImpl.processSendResponse(messageQueue, response));
-            return future0;
-        }, MoreExecutors.directExecutor());
+        final ListenableFuture<List<SendReceiptImpl>> attemptFuture = Futures.transformAsync(responseFuture,
+            response -> {
+                final SettableFuture<List<SendReceiptImpl>> future0 = SettableFuture.create();
+                future0.set(SendReceiptImpl.processSendResponse(messageQueue, response));
+                return future0;
+            }, MoreExecutors.directExecutor());
 
         final int maxAttempts = this.getRetryPolicy().getMaxAttempts();
 
         // Intercept before message publishing.
         final Stopwatch stopwatch = Stopwatch.createStarted();
-        final List<MessageCommon> messageCommons = messages.stream().map(PublishingMessageImpl::getMessageCommon).collect(Collectors.toList());
+        final List<MessageCommon> messageCommons =
+            messages.stream().map(PublishingMessageImpl::getMessageCommon).collect(Collectors.toList());
         doBefore(MessageHookPoints.SEND, messageCommons);
 
         Futures.addCallback(attemptFuture, new FutureCallback<List<SendReceiptImpl>>() {
@@ -494,16 +516,19 @@ class ProducerImpl extends ClientImpl implements Producer {
                 if (MessageType.TRANSACTION.equals(messageType)) {
                     future.setException(t);
                     LOGGER.error("Failed to send transactional message finally, maxAttempts=1, attempt={}, " +
-                        "topic={}, messageId(s), endpoints={}, clientId={}", attempt, topic, messageIds, endpoints, clientId, t);
+                            "topic={}, messageId(s), endpoints={}, clientId={}", attempt, topic, messageIds, endpoints,
+                        clientId, t);
                     return;
                 }
                 // Try to do more attempts.
                 int nextAttempt = 1 + attempt;
                 final Duration delay = ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
                 LOGGER.warn("Failed to send message, would attempt to resend after {}, maxAttempts={}," +
-                        " attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay, maxAttempts, attempt,
+                        " attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay, maxAttempts,
+                    attempt,
                     topic, messageIds, endpoints, clientId, t);
-                clientManager.getScheduler().schedule(() -> send0(future, topic, messageType, candidates, messages, nextAttempt),
+                clientManager.getScheduler().schedule(() -> send0(future, topic, messageType, candidates, messages,
+                        nextAttempt),
                     delay.toNanos(), TimeUnit.NANOSECONDS);
             }
         }, clientCallbackExecutor);
@@ -511,7 +536,8 @@ class ProducerImpl extends ClientImpl implements Producer {
 
     @Override
     public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
-        final PublishingTopicRouteDataResult publishingTopicRouteDataResult = new PublishingTopicRouteDataResult(topicRouteDataResult);
+        final PublishingTopicRouteDataResult publishingTopicRouteDataResult =
+            new PublishingTopicRouteDataResult(topicRouteDataResult);
         publishingRouteDataResultCache.put(topic, publishingTopicRouteDataResult);
     }
 
@@ -524,7 +550,8 @@ class ProducerImpl extends ClientImpl implements Producer {
         }
         return Futures.transformAsync(getRouteDataResult(topic), topicRouteDataResult -> {
             SettableFuture<PublishingTopicRouteDataResult> future = SettableFuture.create();
-            final PublishingTopicRouteDataResult publishingTopicRouteDataResult = new PublishingTopicRouteDataResult(topicRouteDataResult);
+            final PublishingTopicRouteDataResult publishingTopicRouteDataResult =
+                new PublishingTopicRouteDataResult(topicRouteDataResult);
             publishingRouteDataResultCache.put(topic, publishingTopicRouteDataResult);
             future.set(publishingTopicRouteDataResult);
             return future;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index 8e6e90e..9445b30 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -21,8 +21,6 @@ import apache.rocketmq.v2.Publishing;
 import apache.rocketmq.v2.Settings;
 import com.google.common.base.MoreObjects;
 import com.google.protobuf.util.Durations;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -32,6 +30,8 @@ import org.apache.rocketmq.client.java.impl.ClientType;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ProducerSettings extends ClientSettings {
     private static final Logger LOGGER = LoggerFactory.getLogger(ProducerSettings.class);
@@ -68,8 +68,11 @@ public class ProducerSettings extends ClientSettings {
 
     @Override
     public Settings toProtobuf() {
-        final Publishing publishing = Publishing.newBuilder().addAllTopics(topics.stream().map(Resource::toProtobuf).collect(Collectors.toList())).build();
-        final Settings.Builder builder = Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
+        final Publishing publishing =
+            Publishing.newBuilder().addAllTopics(topics.stream().map(Resource::toProtobuf)
+                .collect(Collectors.toList())).build();
+        final Settings.Builder builder =
+            Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
             .setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
         return builder.setBackoffPolicy(retryPolicy.toProtobuf()).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
     }
@@ -78,7 +81,8 @@ public class ProducerSettings extends ClientSettings {
     public void applySettingsCommand(Settings settings) {
         final Settings.PubSubCase pubSubCase = settings.getPubSubCase();
         if (!Settings.PubSubCase.PUBLISHING.equals(pubSubCase)) {
-            LOGGER.error("[Bug] Issued settings not match with the client type, clientId={}, pubSubCase={}, clientType={}", clientId, pubSubCase, clientType);
+            LOGGER.error("[Bug] Issued settings not match with the client type, clientId={}, pubSubCase={}, "
+                + "clientType={}", clientId, pubSubCase, clientType);
             return;
         }
         final Publishing publishing = settings.getPublishing();
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
similarity index 94%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index 7f1ac10..e718228 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -84,7 +84,9 @@ public class SendReceiptImpl implements SendReceipt {
                 throwableList.add(e);
                 continue;
             }
-            final SendReceiptImpl impl = new SendReceiptImpl(MessageIdCodec.getInstance().decode(entry.getMessageId()), entry.getTransactionId(), mq, entry.getOffset());
+            final SendReceiptImpl impl =
+                new SendReceiptImpl(MessageIdCodec.getInstance().decode(entry.getMessageId()),
+                    entry.getTransactionId(), mq, entry.getOffset());
             sendReceipts.add(impl);
         }
         if (!throwableList.isEmpty()) {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
similarity index 89%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
index e3f8dd0..aef1098 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/TransactionImpl.java
@@ -18,8 +18,6 @@
 package org.apache.rocketmq.client.java.impl.producer;
 
 import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Map;
@@ -33,18 +31,17 @@ import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.Transaction;
 import org.apache.rocketmq.client.apis.producer.TransactionResolution;
 import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class TransactionImpl implements Transaction {
     private static final Logger LOGGER = LoggerFactory.getLogger(ProducerImpl.class);
-
-    private final ProducerImpl producerImpl;
-
     private static final int MAX_MESSAGE_NUM = 1;
 
+    private final ProducerImpl producerImpl;
     @GuardedBy("messagesLock")
     private final Set<PublishingMessageImpl> messages;
     private final ReadWriteLock messagesLock;
-
     private final ConcurrentMap<PublishingMessageImpl, SendReceiptImpl> messageSendReceiptMap;
 
     public TransactionImpl(ProducerImpl producerImpl) {
@@ -58,7 +55,8 @@ class TransactionImpl implements Transaction {
         messagesLock.readLock().lock();
         try {
             if (messages.size() > MAX_MESSAGE_NUM) {
-                throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " + MAX_MESSAGE_NUM);
+                throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " +
+                    MAX_MESSAGE_NUM);
             }
         } finally {
             messagesLock.readLock().unlock();
@@ -66,9 +64,11 @@ class TransactionImpl implements Transaction {
         messagesLock.writeLock().lock();
         try {
             if (messages.size() >= MAX_MESSAGE_NUM) {
-                throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " + MAX_MESSAGE_NUM);
+                throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " +
+                    MAX_MESSAGE_NUM);
             }
-            final PublishingMessageImpl publishingMessage = new PublishingMessageImpl(message, producerImpl.producerSettings, true);
+            final PublishingMessageImpl publishingMessage = new PublishingMessageImpl(message,
+                producerImpl.producerSettings, true);
             messages.add(publishingMessage);
             return publishingMessage;
         } finally {
@@ -97,7 +97,8 @@ class TransactionImpl implements Transaction {
         for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) {
             final PublishingMessageImpl publishingMessage = entry.getKey();
             final SendReceiptImpl sendReceipt = entry.getValue();
-            producerImpl.endTransaction(sendReceipt.getEndpoints(), publishingMessage.getMessageCommon(), sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.COMMIT);
+            producerImpl.endTransaction(sendReceipt.getEndpoints(), publishingMessage.getMessageCommon(),
+                sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.COMMIT);
         }
     }
 
@@ -109,7 +110,8 @@ class TransactionImpl implements Transaction {
         for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) {
             final PublishingMessageImpl publishingMessage = entry.getKey();
             final SendReceiptImpl sendReceipt = entry.getValue();
-            producerImpl.endTransaction(sendReceipt.getEndpoints(), publishingMessage.getMessageCommon(), sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.ROLLBACK);
+            producerImpl.endTransaction(sendReceipt.getEndpoints(), publishingMessage.getMessageCommon(),
+                sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.ROLLBACK);
         }
     }
 }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java b/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
similarity index 91%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
index 5904626..e131471 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/logging/CustomConsoleAppender.java
@@ -21,7 +21,8 @@ import ch.qos.logback.core.ConsoleAppender;
 
 public class CustomConsoleAppender<E> extends ConsoleAppender<E> {
     public static final String ENABLE_CONSOLE_APPENDER_KEY = "mq.consoleAppender.enabled";
-    private final boolean enabled = Boolean.parseBoolean(System.getenv(ENABLE_CONSOLE_APPENDER_KEY)) || Boolean.parseBoolean(System.getProperty(ENABLE_CONSOLE_APPENDER_KEY));
+    private final boolean enabled = Boolean.parseBoolean(System.getenv(ENABLE_CONSOLE_APPENDER_KEY)) ||
+        Boolean.parseBoolean(System.getProperty(ENABLE_CONSOLE_APPENDER_KEY));
 
     public CustomConsoleAppender() {
     }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/ProcessIdConverter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/logging/ProcessIdConverter.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/logging/ProcessIdConverter.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/logging/ProcessIdConverter.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
index 8343e18..112fbb7 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageBuilderImpl.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.client.java.message;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -28,9 +31,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.message.MessageBuilder;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 public class MessageBuilderImpl implements MessageBuilder {
     public static final Pattern TOPIC_PATTERN = Pattern.compile("^[%|a-zA-Z0-9._-]{1,127}$");
     private static final int MESSAGE_BODY_LENGTH_THRESHOLD = 1024 * 1024 * 4;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
similarity index 95%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
index 33d075f..737e878 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageCommon.java
@@ -58,14 +58,16 @@ public class MessageCommon {
 
     public MessageCommon(String topic, byte[] body, String tag, String messageGroup, Long deliveryTimestamp,
         String parentTraceContext, Collection<String> keys, Map<String, String> properties) {
-        this(null, topic, body, properties, tag, keys, messageGroup, deliveryTimestamp, null, parentTraceContext, null, null, null, null, null);
+        this(null, topic, body, properties, tag, keys, messageGroup, deliveryTimestamp, null, parentTraceContext,
+            null, null, null, null, null);
     }
 
     public MessageCommon(MessageId messageId, String topic, byte[] body, String tag, String messageGroup,
         Long deliveryTimestamp, Collection<String> keys, Map<String, String> properties, String bornHost,
         String traceContext, long bornTimestamp, int deliveryAttempt, Stopwatch decodeStopwatch,
         Timestamp deliveryTimestampFromRemote) {
-        this(messageId, topic, body, properties, tag, keys, messageGroup, deliveryTimestamp, bornHost, null, traceContext, bornTimestamp, deliveryAttempt, decodeStopwatch, deliveryTimestampFromRemote);
+        this(messageId, topic, body, properties, tag, keys, messageGroup, deliveryTimestamp, bornHost, null,
+            traceContext, bornTimestamp, deliveryAttempt, decodeStopwatch, deliveryTimestampFromRemote);
     }
 
     private MessageCommon(@Nullable MessageId messageId, String topic, byte[] body,
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
similarity index 99%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
index 9369e74..59d1578 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdCodec.java
@@ -53,13 +53,12 @@ import org.apache.rocketmq.client.java.misc.Utilities;
  * </pre>
  */
 public class MessageIdCodec {
-    private static final MessageIdCodec INSTANCE = new MessageIdCodec();
-
     public static final int MESSAGE_ID_LENGTH_FOR_V1_OR_LATER = 34;
-
     public static final String MESSAGE_ID_VERSION_V0 = "00";
     public static final String MESSAGE_ID_VERSION_V1 = "01";
 
+    private static final MessageIdCodec INSTANCE = new MessageIdCodec();
+
     private final String processFixedStringV1;
     private final long secondsSinceCustomEpoch;
     private final long secondsStartTimestamp;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageIdImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageIdImpl.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
similarity index 98%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
index 1b03ca6..2293c23 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageImpl.java
@@ -33,8 +33,10 @@ import org.apache.rocketmq.client.apis.message.Message;
  * @see Message
  */
 public class MessageImpl implements Message {
-    private final String topic;
+    protected final Collection<String> keys;
+
     final byte[] body;
+    private final String topic;
 
     @Nullable
     private final String tag;
@@ -45,7 +47,6 @@ public class MessageImpl implements Message {
     @Nullable
     private final String parentTraceContext;
 
-    protected final Collection<String> keys;
     private final Map<String, String> properties;
 
     /**
@@ -65,10 +66,6 @@ public class MessageImpl implements Message {
         this.properties = properties;
     }
 
-    public MessageCommon getMessageCommon() {
-        return new MessageCommon(topic, body, tag, messageGroup, deliveryTimestamp, parentTraceContext, keys, properties);
-    }
-
     MessageImpl(Message message) {
         this.topic = message.getTopic();
         if (message instanceof MessageImpl) {
@@ -89,6 +86,11 @@ public class MessageImpl implements Message {
         this.properties = message.getProperties();
     }
 
+    public MessageCommon getMessageCommon() {
+        return new MessageCommon(topic, body, tag, messageGroup, deliveryTimestamp, parentTraceContext, keys,
+            properties);
+    }
+
     /**
      * @see Message#getTopic()
      */
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageType.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
similarity index 93%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
index 9e0d2b7..5b3525d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/MessageViewImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.client.java.message;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import apache.rocketmq.v2.Digest;
 import apache.rocketmq.v2.DigestType;
 import apache.rocketmq.v2.Encoding;
@@ -27,8 +29,6 @@ import com.google.common.base.Stopwatch;
 import com.google.protobuf.ProtocolStringList;
 import com.google.protobuf.Timestamp;
 import com.google.protobuf.util.Timestamps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.NoSuchAlgorithmException;
@@ -44,8 +44,8 @@ import org.apache.rocketmq.client.java.misc.LinkedIterator;
 import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-
-import static com.google.common.base.Preconditions.checkNotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageView {
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageViewImpl.class);
@@ -73,8 +73,9 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
 
     public MessageViewImpl(MessageId messageId, String topic, byte[] body, String tag, String messageGroup,
         Long deliveryTimestamp, Collection<String> keys, Map<String, String> properties,
-        String bornHost, long bornTimestamp, int deliveryAttempt, MessageQueueImpl messageQueue, String receiptHandle,
-        String traceContext, long offset, boolean corrupted, Timestamp deliveryTimestampFromRemote) {
+        String bornHost, long bornTimestamp, int deliveryAttempt, MessageQueueImpl messageQueue,
+        String receiptHandle, String traceContext, long offset, boolean corrupted,
+        Timestamp deliveryTimestampFromRemote) {
         this.messageId = checkNotNull(messageId, "messageId should not be null");
         this.topic = checkNotNull(topic, "topic should not be null");
         this.body = checkNotNull(body, "body should not be null");
@@ -98,7 +99,8 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
     }
 
     public MessageCommon getMessageCommon() {
-        return new MessageCommon(messageId, topic, body, tag, messageGroup, deliveryTimestamp, keys, properties, bornHost, traceContext, bornTimestamp, deliveryAttempt, decodeStopwatch, null);
+        return new MessageCommon(messageId, topic, body, tag, messageGroup, deliveryTimestamp, keys, properties,
+            bornHost, traceContext, bornTimestamp, deliveryAttempt, decodeStopwatch, null);
     }
 
     /**
@@ -264,7 +266,8 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
                     }
                 } catch (NoSuchAlgorithmException e) {
                     corrupted = true;
-                    LOGGER.error("MD5 is not supported unexpectedly, skip it, topic={}, messageId={}", topic, messageId);
+                    LOGGER.error("MD5 is not supported unexpectedly, skip it, topic={}, messageId={}", topic,
+                        messageId);
                 }
                 break;
             case SHA1:
@@ -275,11 +278,13 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
                     }
                 } catch (NoSuchAlgorithmException e) {
                     corrupted = true;
-                    LOGGER.error("SHA-1 is not supported unexpectedly, skip it, topic={}, messageId={}", topic, messageId);
+                    LOGGER.error("SHA-1 is not supported unexpectedly, skip it, topic={}, messageId={}", topic,
+                        messageId);
                 }
                 break;
             default:
-                LOGGER.error("Unsupported message body digest algorithm, digestType={}, topic={}, messageId={}", digestType, topic, messageId);
+                LOGGER.error("Unsupported message body digest algorithm, digestType={}, topic={}, messageId={}",
+                    digestType, topic, messageId);
         }
         final Encoding bodyEncoding = systemProperties.getBodyEncoding();
         switch (bodyEncoding) {
@@ -294,7 +299,8 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
             case IDENTITY:
                 break;
             default:
-                LOGGER.error("Unsupported message encoding algorithm, topic={}, messageId={}, bodyEncoding={}", topic, messageId, bodyEncoding);
+                LOGGER.error("Unsupported message encoding algorithm, topic={}, messageId={}, bodyEncoding={}", topic,
+                    messageId, bodyEncoding);
         }
 
         String tag = systemProperties.hasTag() ? systemProperties.getTag() : null;
@@ -309,7 +315,8 @@ public class MessageViewImpl implements LinkedElement<MessageViewImpl>, MessageV
         final Map<String, String> properties = message.getUserPropertiesMap();
         final String receiptHandle = systemProperties.getReceiptHandle();
         String traceContext = systemProperties.hasTraceContext() ? systemProperties.getTraceContext() : null;
-        return new MessageViewImpl(messageId, topic, body, tag, messageGroup, deliveryTimestamp, keys, properties, bornHost, bornTimestamp, deliveryAttempt, mq, receiptHandle, traceContext, offset, corrupted, timestamp);
+        return new MessageViewImpl(messageId, topic, body, tag, messageGroup, deliveryTimestamp, keys, properties,
+            bornHost, bornTimestamp, deliveryAttempt, mq, receiptHandle, traceContext, offset, corrupted, timestamp);
     }
 
     @Override
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 97ea1ea..7c6ea9c 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -45,7 +45,8 @@ public class PublishingMessageImpl extends MessageImpl {
     private final MessageType messageType;
     private volatile String traceContext;
 
-    public PublishingMessageImpl(Message message, ProducerSettings producerSettings, boolean txEnabled) throws IOException {
+    public PublishingMessageImpl(Message message, ProducerSettings producerSettings, boolean txEnabled)
+        throws IOException {
         super(message);
         this.traceContext = null;
         final int length = message.getBody().remaining();
@@ -66,7 +67,8 @@ public class PublishingMessageImpl extends MessageImpl {
                 body = new byte[length];
                 message.getBody().get(body);
             }
-            final byte[] compressed = Utilities.compressBytesGzip(body, producerSettings.getMessageGzipCompressionLevel());
+            final byte[] compressed = Utilities.compressBytesGzip(body,
+                producerSettings.getMessageGzipCompressionLevel());
             this.compressedBody = ByteBuffer.wrap(compressed).asReadOnlyBuffer();
             this.encoding = Encoding.GZIP;
         } else {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/protocol/Encoding.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Encoding.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/protocol/Encoding.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Encoding.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java b/java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/message/protocol/Resource.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageCacheObserver.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
similarity index 88%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
index 3a9b4bd..edec539 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.client.java.metrics;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.ManagedChannel;
 import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
@@ -51,6 +49,8 @@ import org.apache.rocketmq.client.java.impl.ClientImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.rpc.AuthInterceptor;
 import org.apache.rocketmq.client.java.rpc.IpNameResolverFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MessageMeter {
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageMeter.class);
@@ -83,51 +83,63 @@ public class MessageMeter {
     }
 
     LongCounter getSendSuccessTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.SEND_SUCCESS_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.SEND_SUCCESS_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     LongCounter getSendFailureTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.SEND_FAILURE_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.SEND_FAILURE_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     DoubleHistogram getSendSuccessCostTimeHistogram() {
-        return histogramMap.computeIfAbsent(MetricName.SEND_SUCCESS_COST_TIME, name -> meter.histogramBuilder(name.getName()).build());
+        return histogramMap.computeIfAbsent(MetricName.SEND_SUCCESS_COST_TIME,
+            name -> meter.histogramBuilder(name.getName()).build());
     }
 
     LongCounter getProcessSuccessTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.PROCESS_SUCCESS_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.PROCESS_SUCCESS_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     LongCounter getProcessFailureTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.PROCESS_FAILURE_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.PROCESS_FAILURE_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     DoubleHistogram getProcessCostTimeHistogram() {
-        return histogramMap.computeIfAbsent(MetricName.PROCESS_TIME, name -> meter.histogramBuilder(name.getName()).build());
+        return histogramMap.computeIfAbsent(MetricName.PROCESS_TIME,
+            name -> meter.histogramBuilder(name.getName()).build());
     }
 
     LongCounter getAckSuccessTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.ACK_SUCCESS_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.ACK_SUCCESS_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     LongCounter getAckFailureTotalCounter() {
-        return counterMap.computeIfAbsent(MetricName.ACK_FAILURE_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.ACK_FAILURE_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     LongCounter getChangeInvisibleDurationSuccessCounter() {
-        return counterMap.computeIfAbsent(MetricName.CHANGE_INVISIBLE_DURATION_SUCCESS_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.CHANGE_INVISIBLE_DURATION_SUCCESS_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     LongCounter getChangeInvisibleDurationFailureCounter() {
-        return counterMap.computeIfAbsent(MetricName.CHANGE_INVISIBLE_DURATION_FAILURE_TOTAL, name -> meter.counterBuilder(name.getName()).build());
+        return counterMap.computeIfAbsent(MetricName.CHANGE_INVISIBLE_DURATION_FAILURE_TOTAL,
+            name -> meter.counterBuilder(name.getName()).build());
     }
 
     DoubleHistogram getMessageDeliveryLatencyHistogram() {
-        return histogramMap.computeIfAbsent(MetricName.DELIVERY_LATENCY, name -> meter.histogramBuilder(name.getName()).build());
+        return histogramMap.computeIfAbsent(MetricName.DELIVERY_LATENCY,
+            name -> meter.histogramBuilder(name.getName()).build());
     }
 
     DoubleHistogram getMessageAwaitTimeHistogram() {
-        return histogramMap.computeIfAbsent(MetricName.AWAIT_TIME, name -> meter.histogramBuilder(name.getName()).build());
+        return histogramMap.computeIfAbsent(MetricName.AWAIT_TIME,
+            name -> meter.histogramBuilder(name.getName()).build());
     }
 
     @SuppressWarnings("deprecation")
@@ -138,12 +150,14 @@ public class MessageMeter {
             }
             final Optional<Endpoints> optionalEndpoints = metric.tryGetMetricEndpoints();
             if (!optionalEndpoints.isPresent()) {
-                LOGGER.error("[Bug] Metric switch is on but endpoints is not filled, clientId={}", client.getClientId());
+                LOGGER.error("[Bug] Metric switch is on but endpoints is not filled, clientId={}",
+                    client.getClientId());
                 return;
             }
             final Endpoints newMetricEndpoints = optionalEndpoints.get();
             if (newMetricEndpoints.equals(metricEndpoints)) {
-                LOGGER.debug("Message metric exporter endpoints remains the same, clientId={}, endpoints={}", client.getClientId(), newMetricEndpoints);
+                LOGGER.debug("Message metric exporter endpoints remains the same, clientId={}, endpoints={}",
+                    client.getClientId(), newMetricEndpoints);
                 return;
             }
             final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)
@@ -172,13 +186,15 @@ public class MessageMeter {
                 .setAggregation(Aggregation.explicitBucketHistogram(Arrays.asList(0.1d, 1d, 10d, 30d, 60d)))
                 .build();
 
-            PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter).setInterval(Duration.ofSeconds(1)).build();
+            PeriodicMetricReader reader =
+                PeriodicMetricReader.builder(exporter).setInterval(Duration.ofSeconds(1)).build();
             provider = SdkMeterProvider.builder().registerMetricReader(reader)
                 .registerView(instrumentSelector, view)
                 .build();
             final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build();
             meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME);
-            LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", client.getClientId(), metricEndpoints, newMetricEndpoints);
+            LOGGER.info("Message meter exporter is updated, clientId={}, {} => {}", client.getClientId(),
+                metricEndpoints, newMetricEndpoints);
             this.reset();
             metricEndpoints = newMetricEndpoints;
         } catch (Throwable t) {
@@ -205,7 +221,8 @@ public class MessageMeter {
             try {
                 latch.await();
             } catch (Throwable t) {
-                LOGGER.error("Exception raised while waiting for the shutdown of meter, clientId={}", client.getClientId());
+                LOGGER.error("Exception raised while waiting for the shutdown of meter, clientId={}",
+                    client.getClientId());
             }
         }
         LOGGER.info("Shutdown the message meter, clientId={}", client.getClientId());
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
similarity index 98%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
index 97a6657..440f89d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java
@@ -19,8 +19,6 @@ package org.apache.rocketmq.client.java.metrics;
 
 import com.google.protobuf.Timestamp;
 import com.google.protobuf.util.Timestamps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.metrics.DoubleHistogram;
 import io.opentelemetry.api.metrics.LongCounter;
@@ -34,6 +32,8 @@ import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.hook.MessageInterceptor;
 import org.apache.rocketmq.client.java.impl.ClientImpl;
 import org.apache.rocketmq.client.java.message.MessageCommon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MetricMessageInterceptor implements MessageInterceptor {
     private static final Logger LOGGER = LoggerFactory.getLogger(MetricMessageInterceptor.class);
@@ -159,8 +159,10 @@ public class MetricMessageInterceptor implements MessageInterceptor {
     }
 
     private void doAfterChangInvisibleDuration(List<MessageCommon> messageCommons, MessageHookPointsStatus status) {
-        final LongCounter changeInvisibleDurationSuccessCounter = messageMeter.getChangeInvisibleDurationSuccessCounter();
-        final LongCounter changeInvisibleDurationFailureCounter = messageMeter.getChangeInvisibleDurationFailureCounter();
+        final LongCounter changeInvisibleDurationSuccessCounter =
+            messageMeter.getChangeInvisibleDurationSuccessCounter();
+        final LongCounter changeInvisibleDurationFailureCounter =
+            messageMeter.getChangeInvisibleDurationFailureCounter();
         final Optional<String> optionalConsumerGroup = messageMeter.tryGetConsumerGroup();
         if (!optionalConsumerGroup.isPresent()) {
             LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", messageMeter.getClient());
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
similarity index 96%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
index 5cc8465..d7a2d28 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/RocketmqAttributes.java
@@ -17,14 +17,15 @@
 
 package org.apache.rocketmq.client.java.metrics;
 
-import io.opentelemetry.api.common.AttributeKey;
-
 import static io.opentelemetry.api.common.AttributeKey.stringKey;
 
+import io.opentelemetry.api.common.AttributeKey;
+
 public class RocketmqAttributes {
     public static final AttributeKey<String> TOPIC = stringKey("topic");
-
     public static final AttributeKey<String> CLIENT_ID = stringKey("client_id");
-
     public static final AttributeKey<String> CONSUMER_GROUP = stringKey("consumer_group");
+
+    private RocketmqAttributes() {
+    }
 }
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
index ff0e8af..c4e1c71 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Dispatcher.java
@@ -18,12 +18,12 @@
 package org.apache.rocketmq.client.java.misc;
 
 import com.google.common.util.concurrent.AbstractIdleService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A driver to notify downstream to dispatch task.
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/ExecutorServices.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/ExecutorServices.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/ExecutorServices.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/ExecutorServices.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedIterator.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/MetadataUtils.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/MetadataUtils.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/MetadataUtils.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/MetadataUtils.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/RequestIdGenerator.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/RequestIdGenerator.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/RequestIdGenerator.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/RequestIdGenerator.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/ThreadFactoryImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/ThreadFactoryImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/ThreadFactoryImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/ThreadFactoryImpl.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
similarity index 98%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
index 639365f..2705cd7 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/misc/Utilities.java
@@ -60,12 +60,14 @@ public class Utilities {
     /**
      * Used to build output as Hex
      */
-    private static final char[] DIGITS_LOWER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+    private static final char[] DIGITS_LOWER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd',
+        'e', 'f'};
 
     /**
      * Used to build output as Hex
      */
-    private static final char[] DIGITS_UPPER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
+    private static final char[] DIGITS_UPPER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D',
+        'E', 'F'};
 
     private static final String CLIENT_ID_SEPARATOR = "@";
 
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
similarity index 95%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
index 5c177bf..2157613 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.client.java.retry;
 
+import static apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
+import static com.google.common.base.Preconditions.checkArgument;
+
 import apache.rocketmq.v2.CustomizedBackoff;
 import com.google.common.base.MoreObjects;
 import com.google.protobuf.util.Durations;
@@ -24,9 +27,6 @@ import java.time.Duration;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
-import static com.google.common.base.Preconditions.checkArgument;
-
 public class CustomizedBackoffRetryPolicy implements RetryPolicy {
     private final List<Duration> durations;
     private final int maxAttempts;
@@ -70,7 +70,9 @@ public class CustomizedBackoffRetryPolicy implements RetryPolicy {
     @Override
     public apache.rocketmq.v2.RetryPolicy toProtobuf() {
         CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder()
-            .addAllNext(durations.stream().map(duration -> Durations.fromNanos(duration.toNanos())).collect(Collectors.toList()))
+            .addAllNext(durations.stream()
+                .map(duration -> Durations.fromNanos(duration.toNanos()))
+                .collect(Collectors.toList()))
             .build();
         return apache.rocketmq.v2.RetryPolicy.newBuilder()
             .setMaxAttempts(maxAttempts)
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
similarity index 96%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
index b216f06..79f033a 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
@@ -17,17 +17,18 @@
 
 package org.apache.rocketmq.client.java.retry;
 
+import static apache.rocketmq.v2.RetryPolicy.StrategyCase.EXPONENTIAL_BACKOFF;
+import static com.google.common.base.Preconditions.checkArgument;
+
 import apache.rocketmq.v2.ExponentialBackoff;
 import com.google.common.base.MoreObjects;
 import com.google.protobuf.util.Durations;
 import java.time.Duration;
 import java.util.Random;
 
-import static apache.rocketmq.v2.RetryPolicy.StrategyCase.EXPONENTIAL_BACKOFF;
-import static com.google.common.base.Preconditions.checkArgument;
-
 /**
- * The {@link ExponentialBackoffRetryPolicy} defines a policy to do more attempts when failure is encountered, mainly refer to
+ * The {@link ExponentialBackoffRetryPolicy} defines a policy to do more attempts when failure is encountered, mainly
+ * refer to
  * <a href="https://github.com/grpc/proposal/blob/master/A6-client-retries.md">gRPC Retry Design</a>.
  */
 public class ExponentialBackoffRetryPolicy implements RetryPolicy {
@@ -62,7 +63,8 @@ public class ExponentialBackoffRetryPolicy implements RetryPolicy {
     @Override
     public Duration getNextAttemptDelay(int attempt) {
         checkArgument(attempt > 0, "attempt must be positive");
-        int randomNumberBound = (int) Math.min(initialBackoff.toNanos() * Math.pow(backoffMultiplier, 1.0 * (attempt - 1)),
+        int randomNumberBound = (int) Math.min(initialBackoff.toNanos() * Math.pow(backoffMultiplier,
+                1.0 * (attempt - 1)),
             maxBackoff.toNanos());
         if (randomNumberBound <= 0) {
             return Duration.ZERO;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Address.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Address.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Address.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/Address.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/AddressScheme.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/AddressScheme.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/AddressScheme.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/AddressScheme.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Broker.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
similarity index 96%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
index 590149c..ab77a3d 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Broker.java
@@ -34,7 +34,8 @@ public class Broker {
     }
 
     public apache.rocketmq.v2.Broker toProtobuf() {
-        return apache.rocketmq.v2.Broker.newBuilder().setName(name).setId(id).setEndpoints(endpoints.toProtobuf()).build();
+        return apache.rocketmq.v2.Broker.newBuilder().setName(name).setId(id).setEndpoints(endpoints.toProtobuf())
+            .build();
     }
 
     public String getName() {
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
similarity index 98%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
index 8e50e76..4a4f562 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Endpoints.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.client.java.route;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.google.common.base.Objects;
 import com.google.common.net.InternetDomainName;
 import java.net.InetSocketAddress;
@@ -24,12 +26,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Pattern;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 public class Endpoints {
     public static final int DEFAULT_PORT = 80;
 
-    private static final Pattern IPV4_HOST_PATTERN = Pattern.compile("^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])*$");
+    private static final Pattern IPV4_HOST_PATTERN = Pattern.compile("^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0"
+        + "-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])*$");
     private static final String ENDPOINT_SEPARATOR = ";";
     private static final String ADDRESS_SEPARATOR = ",";
     private static final String COLON = ":";
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/MessageQueueImpl.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Permission.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/Permission.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/Permission.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/Permission.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
similarity index 97%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
index dbed25c..a0725a4 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
@@ -65,7 +65,8 @@ public class TopicRouteData {
     public Endpoints pickEndpointsToQueryAssignments() throws ResourceNotFoundException {
         int nextIndex = index.getAndIncrement();
         for (int i = 0; i < messageQueueImpls.size(); i++) {
-            final MessageQueueImpl messageQueueImpl = messageQueueImpls.get(IntMath.mod(nextIndex++, messageQueueImpls.size()));
+            final MessageQueueImpl messageQueueImpl = messageQueueImpls.get(IntMath.mod(nextIndex++,
+                messageQueueImpls.size()));
             final Broker broker = messageQueueImpl.getBroker();
             if (Utilities.MASTER_BROKER_ID != broker.getId()) {
                 continue;
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
index dd0f553..1efb034 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
@@ -17,13 +17,13 @@
 
 package org.apache.rocketmq.client.java.route;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import apache.rocketmq.v2.Status;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import javax.annotation.concurrent.Immutable;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Result topic route data fetched from remote.
  */
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
index 11ca662..66820e6 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/AuthInterceptor.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.client.java.rpc;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.CallOptions;
 import io.grpc.Channel;
 import io.grpc.ClientCall;
@@ -27,6 +25,8 @@ import io.grpc.ForwardingClientCall;
 import io.grpc.Metadata;
 import io.grpc.MethodDescriptor;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AuthInterceptor implements ClientInterceptor {
     private static final Logger LOGGER = LoggerFactory.getLogger(AuthInterceptor.class);
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/IpNameResolverFactory.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/IpNameResolverFactory.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/IpNameResolverFactory.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/IpNameResolverFactory.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
similarity index 86%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
index b6f7ba9..bff53d7 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/LoggingInterceptor.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.client.java.rpc;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import io.grpc.CallOptions;
 import io.grpc.Channel;
 import io.grpc.ClientCall;
@@ -28,6 +26,8 @@ import io.grpc.ForwardingClientCallListener;
 import io.grpc.Metadata;
 import io.grpc.MethodDescriptor;
 import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The client log interceptor based on grpc can track any remote procedure call that interacts with the client locally.
@@ -53,18 +53,21 @@ public class LoggingInterceptor implements ClientInterceptor {
         return new ForwardingClientCall.SimpleForwardingClientCall<T, E>(next.newCall(method, callOptions)) {
             @Override
             public void start(Listener<E> responseListener, final Metadata headers) {
-                LOGGER.trace("gRPC request header, rpcId={}, serviceName={}, methodName={}, authority={}, headers={}", rpcId, serviceName, methodName, authority, headers);
+                LOGGER.trace("gRPC request header, rpcId={}, serviceName={}, methodName={}, authority={}, headers={}",
+                    rpcId, serviceName, methodName, authority, headers);
                 Listener<E> observabilityListener =
                     new ForwardingClientCallListener.SimpleForwardingClientCallListener<E>(responseListener) {
                         @Override
                         public void onMessage(E response) {
-                            LOGGER.trace("gRPC response, rpcId={}, serviceName={}, methodName={}, content:\n{}", rpcId, serviceName, methodName, response);
+                            LOGGER.trace("gRPC response, rpcId={}, serviceName={}, methodName={}, content:\n{}",
+                                rpcId, serviceName, methodName, response);
                             super.onMessage(response);
                         }
 
                         @Override
                         public void onHeaders(Metadata headers) {
-                            LOGGER.trace("gRPC response header, rpcId={}, serviceName={}, methodName={}, authority={}, headers={}", rpcId, serviceName, methodName, authority, headers);
+                            LOGGER.trace("gRPC response header, rpcId={}, serviceName={}, methodName={}, "
+                                + "authority={}, headers={}", rpcId, serviceName, methodName, authority, headers);
                             super.onHeaders(headers);
                         }
                     };
@@ -73,7 +76,8 @@ public class LoggingInterceptor implements ClientInterceptor {
 
             @Override
             public void sendMessage(T request) {
-                LOGGER.trace("gRPC request, rpcId={}, serviceName={}, methodName={}, content:\n{}", rpcId, serviceName, methodName, request);
+                LOGGER.trace("gRPC request, rpcId={}, serviceName={}, methodName={}, content:\n{}", rpcId,
+                    serviceName, methodName, request);
                 super.sendMessage(request);
             }
         };
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
similarity index 99%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index d31a64e..15b94aa 100644
--- a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -81,7 +81,7 @@ public class RpcClientImpl implements RpcClient {
 
         final NettyChannelBuilder channelBuilder =
             NettyChannelBuilder.forTarget(endpoints.getGrpcTarget())
-//                .withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
+                // .withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                 .keepAliveTime(KEEP_ALIVE_DURATION.toNanos(), TimeUnit.NANOSECONDS)
                 .maxInboundMessageSize(GRPC_MAX_MESSAGE_SIZE)
                 .intercept(LoggingInterceptor.getInstance())
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
diff --git a/java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
similarity index 100%
rename from java/client-java/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
diff --git a/java/client-java/src/main/resources-filtered/rocketmq.metadata.properties b/java/client/src/main/resources-filtered/rocketmq.metadata.properties
similarity index 100%
rename from java/client-java/src/main/resources-filtered/rocketmq.metadata.properties
rename to java/client/src/main/resources-filtered/rocketmq.metadata.properties
diff --git a/java/client-java/src/main/resources/META-INF/services/org.apache.rocketmq.client.apis.ClientServiceProvider b/java/client/src/main/resources/META-INF/services/org.apache.rocketmq.client.apis.ClientServiceProvider
similarity index 100%
rename from java/client-java/src/main/resources/META-INF/services/org.apache.rocketmq.client.apis.ClientServiceProvider
rename to java/client/src/main/resources/META-INF/services/org.apache.rocketmq.client.apis.ClientServiceProvider
diff --git a/java/client-java/src/main/resources/logback.xml b/java/client/src/main/resources/logback.xml
similarity index 100%
rename from java/client-java/src/main/resources/logback.xml
rename to java/client/src/main/resources/logback.xml
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
similarity index 86%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
index 23992be..ca198d9 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/ClientServiceProviderImplTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.client.java;
 
+import static org.junit.Assert.assertEquals;
+
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
 import org.apache.rocketmq.client.java.impl.consumer.PushConsumerBuilderImpl;
 import org.apache.rocketmq.client.java.impl.consumer.SimpleConsumerBuilderImpl;
@@ -24,8 +26,6 @@ import org.apache.rocketmq.client.java.impl.producer.ProducerBuilderImpl;
 import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
 public class ClientServiceProviderImplTest {
 
     @Test
@@ -35,12 +35,14 @@ public class ClientServiceProviderImplTest {
 
     @Test
     public void testNewPushConsumerBuilder() {
-        assertEquals(PushConsumerBuilderImpl.class, ClientServiceProvider.loadService().newPushConsumerBuilder().getClass());
+        assertEquals(PushConsumerBuilderImpl.class,
+            ClientServiceProvider.loadService().newPushConsumerBuilder().getClass());
     }
 
     @Test
     public void testNewSimpleConsumerBuilder() {
-        assertEquals(SimpleConsumerBuilderImpl.class, ClientServiceProvider.loadService().newSimpleConsumerBuilder().getClass());
+        assertEquals(SimpleConsumerBuilderImpl.class,
+            ClientServiceProvider.loadService().newSimpleConsumerBuilder().getClass());
     }
 
     @Test
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
similarity index 100%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
similarity index 90%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 7312c52..7c9baeb 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -17,6 +17,15 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import apache.rocketmq.v2.AckMessageResponse;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
@@ -44,15 +53,6 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 @RunWith(MockitoJUnitRunner.class)
 public class ProcessQueueImplTest extends TestBase {
     @Mock
@@ -116,9 +116,11 @@ public class ProcessQueueImplTest extends TestBase {
         when(retryPolicy.getNextAttemptDelay(anyInt())).thenReturn(Duration.ofSeconds(1));
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(pushConsumerSettings.isFifo()).thenReturn(false);
-        when(pushConsumer.changInvisibleDuration(any(MessageViewImpl.class), any(Duration.class))).thenReturn(okChangeInvisibleDurationFuture());
+        when(pushConsumer.changInvisibleDuration(any(MessageViewImpl.class), any(Duration.class)))
+            .thenReturn(okChangeInvisibleDurationFuture());
         processQueue.cacheMessages(messageViewList);
-        verify(pushConsumer, times(1)).changInvisibleDuration(any(MessageViewImpl.class), any(Duration.class));
+        verify(pushConsumer, times(1))
+            .changInvisibleDuration(any(MessageViewImpl.class), any(Duration.class));
     }
 
     @Test
@@ -127,9 +129,11 @@ public class ProcessQueueImplTest extends TestBase {
         List<MessageViewImpl> messageViewList = new ArrayList<>();
         messageViewList.add(corruptedMessageView0);
         when(pushConsumerSettings.isFifo()).thenReturn(true);
-        when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class))).thenReturn(okForwardMessageToDeadLetterQueueResponseFuture());
+        when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class)))
+            .thenReturn(okForwardMessageToDeadLetterQueueResponseFuture());
         processQueue.cacheMessages(messageViewList);
-        verify(pushConsumer, times(1)).forwardMessageToDeadLetterQueue(any(MessageViewImpl.class));
+        verify(pushConsumer, times(1))
+            .forwardMessageToDeadLetterQueue(any(MessageViewImpl.class));
         final Iterator<MessageViewImpl> iterator = processQueue.tryTakeFifoMessages();
         assertFalse(iterator.hasNext());
     }
@@ -147,10 +151,12 @@ public class ProcessQueueImplTest extends TestBase {
         ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(fakeEndpoints(), status, messageViewList);
         SettableFuture<ReceiveMessageResult> future0 = SettableFuture.create();
         future0.set(receiveMessageResult);
-        when(pushConsumer.receiveMessage(any(ReceiveMessageRequest.class), any(MessageQueueImpl.class), any(Duration.class))).thenReturn(future0);
+        when(pushConsumer.receiveMessage(any(ReceiveMessageRequest.class), any(MessageQueueImpl.class),
+            any(Duration.class))).thenReturn(future0);
         when(pushConsumerSettings.getReceiveBatchSize()).thenReturn(32);
         ReceiveMessageRequest request = ReceiveMessageRequest.newBuilder().build();
-        when(pushConsumer.wrapReceiveMessageRequest(anyInt(), any(MessageQueueImpl.class), any(FilterExpression.class))).thenReturn(request);
+        when(pushConsumer.wrapReceiveMessageRequest(anyInt(), any(MessageQueueImpl.class),
+            any(FilterExpression.class))).thenReturn(request);
         processQueue.fetchMessageImmediately();
         Thread.sleep(ProcessQueueImpl.RECEIVE_LATER_DELAY.toMillis() / 2);
         verify(pushConsumer, times(cachedMessagesCountThresholdPerQueue))
@@ -175,7 +181,8 @@ public class ProcessQueueImplTest extends TestBase {
         final ListenableFuture<AckMessageResponse> future = okAckMessageResponseFuture();
         when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future);
         processQueue.eraseMessage(optionalMessageView.get(), ConsumeResult.SUCCESS);
-        future.addListener(() -> verify(pushConsumer, times(1)).ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
+        future.addListener(() -> verify(pushConsumer, times(1))
+            .ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
         assertEquals(processQueue.cachedMessagesCount(), cachedMessageCount - 1);
         assertEquals(processQueue.inflightMessagesCount(), 0);
         assertEquals(consumptionOkQuantity.get(), 1);
@@ -220,7 +227,8 @@ public class ProcessQueueImplTest extends TestBase {
         when(retryPolicy.getMaxAttempts()).thenReturn(1);
         when(pushConsumer.getConsumptionExecutor()).thenReturn(SINGLE_THREAD_POOL_EXECUTOR);
         final ListenableFuture<Void> future = processQueue.eraseFifoMessage(messageView, ConsumeResult.SUCCESS);
-        future.addListener(() -> verify(pushConsumer, times(1)).ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
+        future.addListener(() -> verify(pushConsumer, times(1))
+            .ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
     }
 
     @Test
@@ -229,13 +237,15 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future0 = okForwardMessageToDeadLetterQueueResponseFuture();
+        ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future0 =
+            okForwardMessageToDeadLetterQueueResponseFuture();
         when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(retryPolicy.getMaxAttempts()).thenReturn(1);
         when(pushConsumer.getConsumptionExecutor()).thenReturn(SINGLE_THREAD_POOL_EXECUTOR);
         final ListenableFuture<Void> future = processQueue.eraseFifoMessage(messageView, ConsumeResult.FAILURE);
-        future.addListener(() -> verify(pushConsumer, times(1)).forwardMessageToDeadLetterQueue(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
+        future.addListener(() -> verify(pushConsumer, times(1))
+            .forwardMessageToDeadLetterQueue(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
     }
 
     @Test
@@ -254,6 +264,7 @@ public class ProcessQueueImplTest extends TestBase {
         when(consumeService.consume(any(MessageViewImpl.class), any(Duration.class))).thenReturn(consumeFuture);
         when(pushConsumer.getConsumeService()).thenReturn(consumeService);
         final ListenableFuture<Void> future = processQueue.eraseFifoMessage(messageView, ConsumeResult.FAILURE);
-        future.addListener(() -> verify(pushConsumer, times(1)).ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
+        future.addListener(() -> verify(pushConsumer, times(1))
+            .ackMessage(any(MessageViewImpl.class)), MoreExecutors.directExecutor());
     }
 }
\ No newline at end of file
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
similarity index 96%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
index e1c6673..cb37b6b 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerBuilderImplTest.java
@@ -72,7 +72,8 @@ public class PushConsumerBuilderImplTest extends TestBase {
     @Test(expected = IllegalArgumentException.class)
     public void testBuildWithoutExpressions() throws ClientException {
         final PushConsumerBuilderImpl builder = new PushConsumerBuilderImpl();
-        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+        ClientConfiguration clientConfiguration =
+            ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
         builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_GROUP_0)
             .setMessageListener(messageView -> ConsumeResult.SUCCESS)
             .build();
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
similarity index 80%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
index deff901..54e507c 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
@@ -17,6 +17,13 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import apache.rocketmq.v2.CustomizedBackoff;
 import apache.rocketmq.v2.QueryAssignmentRequest;
 import apache.rocketmq.v2.QueryRouteRequest;
@@ -33,16 +40,16 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
 import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
 import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.tool.TestBase;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -50,13 +57,6 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 @RunWith(MockitoJUnitRunner.class)
 public class PushConsumerImplTest extends TestBase {
     @Mock
@@ -77,21 +77,27 @@ public class PushConsumerImplTest extends TestBase {
     private final int maxCacheMessageSizeInBytes = 1024;
     private final int consumptionThreadCount = 4;
 
-    private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+    private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+        .setEndpoints(FAKE_ACCESS_POINT).build();
 
     private PushConsumerImpl pushConsumer;
 
     private void start(PushConsumerImpl pushConsumer) throws ClientException {
-        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class)))
+        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
+            any(Duration.class)))
             .thenReturn(okQueryRouteResponseFuture());
-        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class)))
+        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+            any(TelemetrySession.class)))
             .thenReturn(telemetryRequestObserver);
-        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("TestScheduler"));
+        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
+            "TestScheduler"));
         when(clientManager.getScheduler()).thenReturn(scheduler);
         doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
 
-        CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder().addNext(Durations.fromNanos(Duration.ofSeconds(3).toNanos())).build();
-        RetryPolicy retryPolicy = RetryPolicy.newBuilder().setMaxAttempts(17).setCustomizedBackoff(customizedBackoff).build();
+        CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder()
+            .addNext(Durations.fromNanos(Duration.ofSeconds(3).toNanos())).build();
+        RetryPolicy retryPolicy = RetryPolicy.newBuilder().setMaxAttempts(17).setCustomizedBackoff(customizedBackoff)
+            .build();
         Subscription subscription = Subscription.newBuilder().build();
         Settings settings = Settings.newBuilder().setSubscription(subscription).setBackoffPolicy(retryPolicy).build();
         final Service service = pushConsumer.startAsync();
@@ -108,14 +114,19 @@ public class PushConsumerImplTest extends TestBase {
 
     @Test
     public void testScanAssignment() throws ExecutionException, InterruptedException, ClientException {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
+            maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
         start(pushConsumer);
-        when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class))).thenReturn(okQueryAssignmentResponseFuture());
+        when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class),
+            any(Duration.class))).thenReturn(okQueryAssignmentResponseFuture());
         pushConsumer.scanAssignments();
-        verify(clientManager, atLeast(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, atLeast(1)).queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class));
+        verify(clientManager, atLeast(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, atLeast(1)).queryAssignment(any(Endpoints.class),
+            any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class));
         Assert.assertEquals(okQueryAssignmentResponseFuture().get().getAssignmentsCount(), pushConsumer.getQueueSize());
-        when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class))).thenReturn(okEmptyQueryAssignmentResponseFuture());
+        when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class),
+            any(Duration.class))).thenReturn(okEmptyQueryAssignmentResponseFuture());
         pushConsumer.scanAssignments();
         Assert.assertEquals(0, pushConsumer.getQueueSize());
         shutdown(pushConsumer);
@@ -123,19 +134,22 @@ public class PushConsumerImplTest extends TestBase {
 
     @Test(expected = IllegalStateException.class)
     public void testSubscribeWithoutStart() throws ClientException {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
+            maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
         pushConsumer.subscribe(FAKE_TOPIC_0, new FilterExpression(FAKE_TOPIC_0));
     }
 
     @Test(expected = IllegalStateException.class)
     public void testUnsubscribeWithoutStart() {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
+            maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
         pushConsumer.unsubscribe(FAKE_TOPIC_0);
     }
 
     @Test
     public void testSubscribeWithSubscriptionOverwriting() throws ClientException {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
+        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
+            maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
         start(pushConsumer);
         final FilterExpression filterExpression = new FilterExpression(FAKE_TAG_0);
         pushConsumer.subscribe(FAKE_TOPIC_0, filterExpression);
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
similarity index 94%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
index 315af9b..31609c4 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerBuilderTest.java
@@ -47,7 +47,8 @@ public class SimpleConsumerBuilderTest extends TestBase {
     @Test(expected = IllegalArgumentException.class)
     public void testBuildWithoutExpressions() throws ClientException {
         final SimpleConsumerBuilderImpl builder = new SimpleConsumerBuilderImpl();
-        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+        ClientConfiguration clientConfiguration =
+            ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
         builder.setClientConfiguration(clientConfiguration).setConsumerGroup(FAKE_GROUP_0)
             .build();
     }
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
similarity index 84%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index 59a2530..9568a81 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -17,6 +17,14 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import apache.rocketmq.v2.AckMessageRequest;
 import apache.rocketmq.v2.AckMessageResponse;
 import apache.rocketmq.v2.Broker;
@@ -53,23 +61,15 @@ import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
 import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
 import org.apache.rocketmq.client.java.impl.TelemetrySession;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.tool.TestBase;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 @RunWith(MockitoJUnitRunner.class)
 public class SimpleConsumerImplTest extends TestBase {
     @Mock
@@ -78,10 +78,11 @@ public class SimpleConsumerImplTest extends TestBase {
     private StreamObserver<TelemetryCommand> telemetryRequestObserver;
     @InjectMocks
     private ClientManagerRegistry clientManagerRegistry = ClientManagerRegistry.getInstance();
-    private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+    private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+        .setEndpoints(FAKE_ACCESS_POINT).build();
 
     private final Duration awaitDuration = Duration.ofSeconds(3);
-    private final Map<String, FilterExpression> subscriptionExpressions = createSubscriptionExpressions(FAKE_TOPIC_0);
+    private final Map<String, FilterExpression> subExpressions = createSubscriptionExpressions(FAKE_TOPIC_0);
 
     private SimpleConsumerImpl simpleConsumer;
 
@@ -91,15 +92,20 @@ public class SimpleConsumerImplTest extends TestBase {
         List<MessageQueue> messageQueueList = new ArrayList<>();
         MessageQueue mq = MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
             .setPermission(Permission.READ_WRITE)
-            .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0())).setId(0).build();
+            .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0())).setId(0)
+            .build();
         messageQueueList.add(mq);
-        QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status).addAllMessageQueues(messageQueueList).build();
+        QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status)
+            .addAllMessageQueues(messageQueueList).build();
         future0.set(response);
-        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class)))
+        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
+            any(Duration.class)))
             .thenReturn(future0);
-        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class)))
+        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+            any(TelemetrySession.class)))
             .thenReturn(telemetryRequestObserver);
-        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("TestScheduler"));
+        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
+            new ThreadFactoryImpl("TestScheduler"));
         when(clientManager.getScheduler()).thenReturn(scheduler);
         doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
 
@@ -119,38 +125,38 @@ public class SimpleConsumerImplTest extends TestBase {
 
     @Test(expected = IllegalStateException.class)
     public void testReceiveWithoutStart() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         simpleConsumer.receive(1, Duration.ofSeconds(1));
     }
 
     @Test(expected = IllegalStateException.class)
     public void testAckWithoutStart() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         simpleConsumer.ack(fakeMessageViewImpl());
     }
 
     @Test(expected = IllegalStateException.class)
     public void testSubscribeWithoutStart() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         simpleConsumer.subscribe(FAKE_TOPIC_1, FilterExpression.SUB_ALL);
     }
 
     @Test(expected = IllegalStateException.class)
     public void testUnsubscribeWithoutStart() {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         simpleConsumer.unsubscribe(FAKE_TOPIC_0);
     }
 
     @Test
     public void testStartAndShutdown() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         shutdown(simpleConsumer);
     }
 
     @Test
     public void testSubscribeWithSubscriptionOverwriting() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         final FilterExpression filterExpression = new FilterExpression(FAKE_TAG_0);
         simpleConsumer.subscribe(FAKE_TOPIC_0, filterExpression);
@@ -159,7 +165,7 @@ public class SimpleConsumerImplTest extends TestBase {
 
     @Test(expected = IllegalArgumentException.class)
     public void testReceiveWithAllTopicsAreUnsubscribed() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         simpleConsumer.unsubscribe(FAKE_TOPIC_0);
         try {
@@ -171,25 +177,29 @@ public class SimpleConsumerImplTest extends TestBase {
 
     @Test
     public void testReceiveMessageSuccess() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         int receivedMessageCount = 16;
-        final ListenableFuture<Iterator<ReceiveMessageResponse>> future = okReceiveMessageResponsesFuture(FAKE_TOPIC_0, receivedMessageCount);
-        when(clientManager.receiveMessage(any(Endpoints.class), any(Metadata.class), any(ReceiveMessageRequest.class), any(Duration.class))).thenReturn(future);
+        final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
+            okReceiveMessageResponsesFuture(FAKE_TOPIC_0, receivedMessageCount);
+        when(clientManager.receiveMessage(any(Endpoints.class), any(Metadata.class), any(ReceiveMessageRequest.class),
+            any(Duration.class))).thenReturn(future);
         final List<MessageView> messageViews = simpleConsumer.receive(1, Duration.ofSeconds(1));
-        verify(clientManager, times(1)).receiveMessage(any(Endpoints.class), any(Metadata.class), any(ReceiveMessageRequest.class), any(Duration.class));
+        verify(clientManager, times(1)).receiveMessage(any(Endpoints.class),
+            any(Metadata.class), any(ReceiveMessageRequest.class), any(Duration.class));
         assertEquals(receivedMessageCount, messageViews.size());
         shutdown(simpleConsumer);
     }
 
     @Test
     public void testAckMessageSuccess() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         try {
             final MessageViewImpl messageView = fakeMessageViewImpl();
             final ListenableFuture<AckMessageResponse> future = okAckMessageResponseFuture();
-            when(clientManager.ackMessage(any(Endpoints.class), any(Metadata.class), any(AckMessageRequest.class), any(Duration.class))).thenReturn(future);
+            when(clientManager.ackMessage(any(Endpoints.class), any(Metadata.class), any(AckMessageRequest.class),
+                any(Duration.class))).thenReturn(future);
             simpleConsumer.ack(messageView);
         } finally {
             shutdown(simpleConsumer);
@@ -198,12 +208,14 @@ public class SimpleConsumerImplTest extends TestBase {
 
     @Test
     public void testChangeInvisibleDurationSuccess() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subscriptionExpressions);
+        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         try {
             final MessageViewImpl messageView = fakeMessageViewImpl();
-            final ListenableFuture<ChangeInvisibleDurationResponse> future = okChangeInvisibleDurationResponseFuture(FAKE_RECEIPT_HANDLE_1);
-            when(clientManager.changeInvisibleDuration(any(Endpoints.class), any(Metadata.class), any(ChangeInvisibleDurationRequest.class), any(Duration.class))).thenReturn(future);
+            final ListenableFuture<ChangeInvisibleDurationResponse> future =
+                okChangeInvisibleDurationResponseFuture(FAKE_RECEIPT_HANDLE_1);
+            when(clientManager.changeInvisibleDuration(any(Endpoints.class), any(Metadata.class),
+                any(ChangeInvisibleDurationRequest.class), any(Duration.class))).thenReturn(future);
             simpleConsumer.changeInvisibleDuration0(messageView, Duration.ofSeconds(3));
             assertEquals(FAKE_RECEIPT_HANDLE_1, messageView.getReceiptHandle());
         } finally {
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
similarity index 96%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
index a64babf..e1298bd 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/StandardConsumeServiceTest.java
@@ -17,6 +17,12 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
@@ -34,12 +40,6 @@ import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 public class StandardConsumeServiceTest extends TestBase {
 
     @Test
@@ -73,7 +73,8 @@ public class StandardConsumeServiceTest extends TestBase {
                 Duration duration, MessageHookPointsStatus status) {
             }
         };
-        final StandardConsumeService service = new StandardConsumeService(FAKE_CLIENT_ID, processQueueTable, listener, SINGLE_THREAD_POOL_EXECUTOR, interceptor, SCHEDULER);
+        final StandardConsumeService service = new StandardConsumeService(FAKE_CLIENT_ID, processQueueTable, listener,
+            SINGLE_THREAD_POOL_EXECUTOR, interceptor, SCHEDULER);
         service.dispatch0();
         verify(processQueue0, times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
         verify(processQueue0, times(1)).eraseMessage(any(MessageViewImpl.class), any(ConsumeResult.class));
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
similarity index 97%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
index 2bf5520..5fab2b2 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerBuilderImplTest.java
@@ -20,8 +20,8 @@ package org.apache.rocketmq.client.java.impl.producer;
 import java.io.IOException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
-import org.apache.rocketmq.client.apis.ClientServiceProvider;
 import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
 import org.apache.rocketmq.client.apis.producer.Producer;
 import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
 import org.apache.rocketmq.client.java.impl.ClientManager;
@@ -79,7 +79,8 @@ public class ProducerBuilderImplTest {
     @Test
     public void testBuildWithoutTopic() throws ClientException, IOException {
         ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints("127.0.0.1:80").build();
-        Producer producer = ClientServiceProvider.loadService().newProducerBuilder().setClientConfiguration(clientConfiguration).build();
+        Producer producer = ClientServiceProvider.loadService().newProducerBuilder()
+            .setClientConfiguration(clientConfiguration).build();
         producer.close();
     }
 }
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
similarity index 82%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index 1f2d1ec..29f4278 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -17,6 +17,15 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import apache.rocketmq.v2.Broker;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.MessageQueue;
@@ -50,24 +59,15 @@ import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
 import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
 import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.tool.TestBase;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
 @RunWith(MockitoJUnitRunner.class)
 public class ProducerImplTest extends TestBase {
     @Mock
@@ -78,7 +78,9 @@ public class ProducerImplTest extends TestBase {
     @InjectMocks
     private ClientManagerRegistry clientManagerRegistry = ClientManagerRegistry.getInstance();
 
-    private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(FAKE_ACCESS_POINT).build();
+    private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+        .setEndpoints(FAKE_ACCESS_POINT).build();
+
     private final String[] str = {FAKE_TOPIC_0};
     private final Set<String> set = new HashSet<>(Arrays.asList(str));
 
@@ -88,7 +90,8 @@ public class ProducerImplTest extends TestBase {
     private final ProducerImpl producer = new ProducerImpl(clientConfiguration, set, 1, null);
 
     @InjectMocks
-    private final ProducerImpl producerWithoutTopicBinding = new ProducerImpl(clientConfiguration, new HashSet<>(), 1, null);
+    private final ProducerImpl producerWithoutTopicBinding = new ProducerImpl(clientConfiguration, new HashSet<>(), 1,
+        null);
 
     private void start(ProducerImpl producer) throws ClientException {
         SettableFuture<QueryRouteResponse> future0 = SettableFuture.create();
@@ -96,15 +99,20 @@ public class ProducerImplTest extends TestBase {
         List<MessageQueue> messageQueueList = new ArrayList<>();
         MessageQueue mq = MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
             .setPermission(Permission.READ_WRITE)
-            .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0())).setId(0).build();
+            .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0()))
+            .setId(0).build();
         messageQueueList.add(mq);
-        QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status).addAllMessageQueues(messageQueueList).build();
+        QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status)
+            .addAllMessageQueues(messageQueueList).build();
         future0.set(response);
-        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class)))
+        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
+            any(Duration.class)))
             .thenReturn(future0);
-        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class)))
+        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+            any(TelemetrySession.class)))
             .thenReturn(telemetryRequestObserver);
-        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("TestScheduler"));
+        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
+            "TestScheduler"));
         when(clientManager.getScheduler()).thenReturn(scheduler);
         doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
 
@@ -131,11 +139,14 @@ public class ProducerImplTest extends TestBase {
     @Test
     public void testSendWithTopicBinding() throws ClientException, ExecutionException, InterruptedException {
         start(producer);
-        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
+            any(Duration.class), any(TelemetrySession.class));
         final Message message = fakeMessage(FAKE_TOPIC_0);
         final ListenableFuture<SendMessageResponse> future = okSendMessageResponseFutureWithSingleEntry();
-        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future);
+        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
+            any(Duration.class))).thenReturn(future);
         final SendMessageResponse response = future.get();
         assertEquals(1, response.getEntriesCount());
         final apache.rocketmq.v2.SendResultEntry receipt = response.getEntriesList().iterator().next();
@@ -147,16 +158,21 @@ public class ProducerImplTest extends TestBase {
     @Test
     public void testSendWithoutTopicBinding() throws ClientException, ExecutionException, InterruptedException {
         start(producerWithoutTopicBinding);
-        verify(clientManager, never()).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, never()).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+        verify(clientManager, never()).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, never()).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+            any(TelemetrySession.class));
         final Message message = fakeMessage(FAKE_TOPIC_0);
         final ListenableFuture<SendMessageResponse> future = okSendMessageResponseFutureWithSingleEntry();
-        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future);
+        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
+            any(Duration.class))).thenReturn(future);
         final SendMessageResponse response = future.get();
         assertEquals(1, response.getEntriesCount());
         final SendReceipt sendReceipt = producerWithoutTopicBinding.send(message);
-        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
+            any(Duration.class), any(TelemetrySession.class));
         final apache.rocketmq.v2.SendResultEntry receipt = response.getEntriesList().iterator().next();
         assertEquals(receipt.getMessageId(), sendReceipt.getMessageId().toString());
         shutdown(producerWithoutTopicBinding);
@@ -165,8 +181,10 @@ public class ProducerImplTest extends TestBase {
     @Test
     public void testSendBatchMessage() throws ClientException, ExecutionException, InterruptedException {
         start(producer);
-        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
+            any(Duration.class), any(TelemetrySession.class));
         int batchMessageNum = 2;
         List<Message> messages = new ArrayList<>();
         for (int i = 0; i < batchMessageNum; i++) {
@@ -175,7 +193,8 @@ public class ProducerImplTest extends TestBase {
         }
 
         final ListenableFuture<SendMessageResponse> future = okBatchSendMessageResponseFuture();
-        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future);
+        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
+            any(Duration.class))).thenReturn(future);
         final SendMessageResponse response = future.get();
         assertEquals(batchMessageNum, response.getEntriesCount());
         final List<apache.rocketmq.v2.SendResultEntry> receipts = response.getEntriesList();
@@ -188,10 +207,13 @@ public class ProducerImplTest extends TestBase {
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void testSendBatchMessageWithDifferentTopic() throws ClientException, ExecutionException, InterruptedException {
+    public void testSendBatchMessageWithDifferentTopic() throws ClientException, ExecutionException,
+        InterruptedException {
         start(producer);
-        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+            any(TelemetrySession.class));
         int batchMessageNum = 2;
         List<Message> messages = new ArrayList<>();
 
@@ -214,10 +236,13 @@ public class ProducerImplTest extends TestBase {
     @Test(expected = ClientException.class)
     public void testSendMessageWithFailure() throws ClientException {
         start(producer);
-        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), any(TelemetrySession.class));
+        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
+            any(QueryRouteRequest.class), any(Duration.class));
+        verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
+            any(TelemetrySession.class));
         final ListenableFuture<SendMessageResponse> future = failureSendMessageResponseFuture();
-        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class), any(Duration.class))).thenReturn(future);
+        when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
+            any(Duration.class))).thenReturn(future);
         Message message0 = fakeMessage(FAKE_TOPIC_0);
         try {
             producer.send(message0);
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
similarity index 90%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
index a66e84d..3519385 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/TransactionImplTest.java
@@ -17,6 +17,11 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doNothing;
+
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
@@ -29,21 +34,12 @@ import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.tool.TestBase;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
-import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.when;
-
 @RunWith(MockitoJUnitRunner.class)
 public class TransactionImplTest extends TestBase {
     @Mock
@@ -98,7 +94,8 @@ public class TransactionImplTest extends TestBase {
     }
 
     @Test
-    public void testCommit() throws IOException, ClientException, ExecutionException, InterruptedException, NoSuchFieldException, IllegalAccessException {
+    public void testCommit() throws IOException, ClientException, ExecutionException, InterruptedException,
+        NoSuchFieldException, IllegalAccessException {
         final TransactionImpl transaction = new TransactionImpl(producer);
         final Message message0 = fakeMessage(FAKE_TOPIC_0);
 
@@ -113,14 +110,17 @@ public class TransactionImplTest extends TestBase {
         final PublishingMessageImpl publishingMessage = transaction.tryAddMessage(message0);
         final SendReceiptImpl receipt = fakeSendReceiptImpl(fakeMessageQueueImpl0());
         transaction.tryAddReceipt(publishingMessage, receipt);
-        ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor = ArgumentCaptor.forClass(TransactionResolution.class);
-        doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class), any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
+        ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor =
+            ArgumentCaptor.forClass(TransactionResolution.class);
+        doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class),
+            any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
         transaction.commit();
         assertEquals(TransactionResolution.COMMIT, resolutionArgumentCaptor.getValue());
     }
 
     @Test
-    public void testRollback() throws IOException, ClientException, ExecutionException, InterruptedException, NoSuchFieldException, IllegalAccessException {
+    public void testRollback() throws IOException, ClientException, ExecutionException, InterruptedException,
+        NoSuchFieldException, IllegalAccessException {
         final TransactionImpl transaction = new TransactionImpl(producer);
         final Message message0 = fakeMessage(FAKE_TOPIC_0);
 
@@ -135,8 +135,10 @@ public class TransactionImplTest extends TestBase {
         final PublishingMessageImpl publishingMessage = transaction.tryAddMessage(message0);
         final SendReceiptImpl receipt = fakeSendReceiptImpl(fakeMessageQueueImpl0());
         transaction.tryAddReceipt(publishingMessage, receipt);
-        ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor = ArgumentCaptor.forClass(TransactionResolution.class);
-        doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class), any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
+        ArgumentCaptor<TransactionResolution> resolutionArgumentCaptor =
+            ArgumentCaptor.forClass(TransactionResolution.class);
+        doNothing().when(producer).endTransaction(any(Endpoints.class), any(MessageCommon.class),
+            any(MessageId.class), anyString(), resolutionArgumentCaptor.capture());
         transaction.rollback();
         assertEquals(TransactionResolution.ROLLBACK, resolutionArgumentCaptor.getValue());
     }
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
similarity index 99%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
index daa70b9..4e3b81d 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageIdCodecTest.java
@@ -17,13 +17,12 @@
 
 package org.apache.rocketmq.client.java.message;
 
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.HashSet;
-import java.util.Set;
-
 public class MessageIdCodecTest {
     private final MessageIdCodec codec = MessageIdCodec.getInstance();
 
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
similarity index 96%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
index c43f107..be03d1d 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/message/MessageImplTest.java
@@ -17,21 +17,19 @@
 
 package org.apache.rocketmq.client.java.message;
 
-import java.util.Arrays;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
 public class MessageImplTest extends TestBase {
     private final ClientServiceProvider provider = ClientServiceProvider.loadService();
     private final String sampleTopic = "foobar";
@@ -70,7 +68,8 @@ public class MessageImplTest extends TestBase {
 
     @Test
     public void testTagSetter() {
-        final Message message = provider.newMessageBuilder().setTag("tagA").setTopic(FAKE_TOPIC_0).setBody(FAKE_MESSAGE_BODY).build();
+        final Message message = provider.newMessageBuilder().setTag("tagA").setTopic(FAKE_TOPIC_0)
+            .setBody(FAKE_MESSAGE_BODY).build();
         assertTrue(message.getTag().isPresent());
         assertEquals("tagA", message.getTag().get());
     }
@@ -92,7 +91,8 @@ public class MessageImplTest extends TestBase {
 
     @Test
     public void testKeySetter() {
-        final Message message = provider.newMessageBuilder().setKeys("keyA").setTopic(FAKE_TOPIC_0).setBody(FAKE_MESSAGE_BODY).build();
+        final Message message = provider.newMessageBuilder().setKeys("keyA").setTopic(FAKE_TOPIC_0)
+            .setBody(FAKE_MESSAGE_BODY).build();
         assertFalse(message.getKeys().isEmpty());
     }
 
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
similarity index 99%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
index 2ff7238..0faaf23 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.rocketmq.client.java.retry;
 
+import static apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
+import static org.junit.Assert.assertEquals;
+
 import apache.rocketmq.v2.CustomizedBackoff;
 import apache.rocketmq.v2.RetryPolicy;
 import com.google.protobuf.util.Durations;
@@ -25,9 +28,6 @@ import java.util.ArrayList;
 import java.util.List;
 import org.junit.Test;
 
-import static apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
-import static org.junit.Assert.assertEquals;
-
 public class CustomizedBackoffRetryPolicyTest {
 
     @Test
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
similarity index 96%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
index d2163ce..c45673d 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/route/EndpointsTest.java
@@ -82,7 +82,8 @@ public class EndpointsTest {
 
     @Test
     public void testEndpointsWithMultipleIpv6() {
-        final Endpoints endpoints = new Endpoints("1050:0000:0000:0000:0005:0600:300c:326b:8080;1050:0000:0000:0000:0005:0600:300c:326c:8081");
+        final Endpoints endpoints = new Endpoints("1050:0000:0000:0000:0005:0600:300c:326b:8080;" +
+            "1050:0000:0000:0000:0005:0600:300c:326c:8081");
         Assert.assertEquals(AddressScheme.IPv6, endpoints.getScheme());
         Assert.assertEquals(2, endpoints.getAddresses().size());
         final Iterator<Address> iterator = endpoints.getAddresses().iterator();
@@ -96,7 +97,8 @@ public class EndpointsTest {
         Assert.assertEquals(8081, address1.getPort());
 
         Assert.assertEquals(AddressScheme.IPv6, endpoints.getScheme());
-        Assert.assertEquals("ipv6:1050:0000:0000:0000:0005:0600:300c:326b:8080,1050:0000:0000:0000:0005:0600:300c:326c:8081", endpoints.getFacade());
+        Assert.assertEquals("ipv6:1050:0000:0000:0000:0005:0600:300c:326b:8080," +
+            "1050:0000:0000:0000:0005:0600:300c:326c:8081", endpoints.getFacade());
     }
 
     @Test
diff --git a/java/client-java/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
similarity index 94%
rename from java/client-java/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
rename to java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index cb5d68f..b30bf93 100644
--- a/java/client-java/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -42,7 +42,6 @@ import apache.rocketmq.v2.SystemProperties;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -64,15 +63,15 @@ import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
 import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
-import org.apache.rocketmq.client.java.misc.Utilities;
-import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
 import org.apache.rocketmq.client.java.message.MessageIdCodec;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
+import org.apache.rocketmq.client.java.misc.Utilities;
+import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 
 public class TestBase {
     protected static final String FAKE_CLIENT_ID = "mbp@29848@cno0nhxy";
@@ -163,7 +162,8 @@ public class TestBase {
         final byte[] body = RandomUtils.nextBytes(bodySize);
         Map<String, String> properties = new HashMap<>();
         List<String> keys = new ArrayList<>();
-        return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null, null, keys, properties, FAKE_HOST_0, 1, 1, mq, FAKE_RECEIPT_HANDLE_0, null, 1, corrupted, null);
+        return new MessageViewImpl(messageId, FAKE_TOPIC_0, body, null, null, null,
+            keys, properties, FAKE_HOST_0, 1, 1, mq, FAKE_RECEIPT_HANDLE_0, null, 1, corrupted, null);
     }
 
     protected MessageQueueImpl fakeMessageQueueImpl0() {
@@ -179,11 +179,13 @@ public class TestBase {
     }
 
     protected Broker fakePbBroker0() {
-        return Broker.newBuilder().setEndpoints(fakePbEndpoints0()).setName(FAKE_BROKER_NAME_0).setId(Utilities.MASTER_BROKER_ID).build();
+        return Broker.newBuilder().setEndpoints(fakePbEndpoints0()).setName(FAKE_BROKER_NAME_0)
+            .setId(Utilities.MASTER_BROKER_ID).build();
     }
 
     protected Broker fakePbBroker1() {
-        return Broker.newBuilder().setEndpoints(fakePbEndpoints1()).setName(FAKE_BROKER_NAME_1).setId(Utilities.MASTER_BROKER_ID).build();
+        return Broker.newBuilder().setEndpoints(fakePbEndpoints1()).setName(FAKE_BROKER_NAME_1)
+            .setId(Utilities.MASTER_BROKER_ID).build();
 
     }
 
@@ -196,11 +198,13 @@ public class TestBase {
     }
 
     protected MessageQueue fakePbMessageQueue0(Resource topicResource) {
-        return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker0()).setPermission(Permission.READ_WRITE).build();
+        return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker0())
+            .setPermission(Permission.READ_WRITE).build();
     }
 
     protected MessageQueue fakePbMessageQueue1(Resource topicResource) {
-        return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker1()).setPermission(Permission.READ_WRITE).build();
+        return MessageQueue.newBuilder().setTopic(topicResource).setBroker(fakePbBroker1())
+            .setPermission(Permission.READ_WRITE).build();
     }
 
     protected ListenableFuture<QueryRouteResponse> okQueryRouteResponseFuture() {
@@ -264,7 +268,8 @@ public class TestBase {
         return future;
     }
 
-    protected ListenableFuture<ForwardMessageToDeadLetterQueueResponse> okForwardMessageToDeadLetterQueueResponseFuture() {
+    protected ListenableFuture<ForwardMessageToDeadLetterQueueResponse>
+        okForwardMessageToDeadLetterQueueResponseFuture() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         SettableFuture<ForwardMessageToDeadLetterQueueResponse> future0 = SettableFuture.create();
         final ForwardMessageToDeadLetterQueueResponse response =
@@ -277,8 +282,9 @@ public class TestBase {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         SettableFuture<SendMessageResponse> future0 = SettableFuture.create();
         final String messageId = MessageIdCodec.getInstance().nextMessageId().toString();
-        SendResultEntry entry = SendResultEntry.newBuilder().setMessageId(messageId).setTransactionId(FAKE_TRANSACTION_ID)
-            .setOffset(1).build();
+        SendResultEntry entry =
+            SendResultEntry.newBuilder().setMessageId(messageId).setTransactionId(FAKE_TRANSACTION_ID)
+                .setOffset(1).build();
         SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build();
         future0.set(response);
         return future0;
@@ -300,7 +306,8 @@ public class TestBase {
             .setOffset(1).build();
         SendResultEntry entry1 = SendResultEntry.newBuilder().setMessageId(messageId)
             .setOffset(2).build();
-        SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry0).addEntries(entry1).build();
+        SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry0)
+            .addEntries(entry1).build();
         future0.set(response);
         return future0;
     }
@@ -355,7 +362,8 @@ public class TestBase {
     }
 
     protected ProducerSettings fakeProducerSettings() {
-        return new ProducerSettings(FAKE_CLIENT_ID, fakeEndpoints(), fakeExponentialBackoffRetryPolicy(), Duration.ofSeconds(1), new HashSet<>());
+        return new ProducerSettings(FAKE_CLIENT_ID, fakeEndpoints(), fakeExponentialBackoffRetryPolicy(),
+            Duration.ofSeconds(1), new HashSet<>());
     }
 
     protected SendReceiptImpl fakeSendReceiptImpl(
diff --git a/java/client-java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/java/client/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
similarity index 100%
rename from java/client-java/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
rename to java/client/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
diff --git a/java/pom.xml b/java/pom.xml
index ef38d67..abd7dc9 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -10,7 +10,8 @@
     <version>5.0.0-SNAPSHOT</version>
     <modules>
         <module>client-apis</module>
-        <module>client-java</module>
+        <module>client</module>
+        <module>client-shade</module>
     </modules>
 
     <properties>
@@ -32,6 +33,7 @@
 
         <!-- plugin -->
         <maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
+        <maven-checkstyle-plugin.version>3.1.1</maven-checkstyle-plugin.version>
     </properties>
 
     <repositories>
@@ -83,6 +85,16 @@
                 <artifactId>rocketmq-client-apis</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>rocketmq-client-java-noshade</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${project.groupId}</groupId>
+                <artifactId>rocketmq-client-java</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-proto</artifactId>
@@ -148,7 +160,26 @@
                 <artifactId>maven-compiler-plugin</artifactId>
                 <version>${maven-compiler-plugin.version}</version>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>${maven-checkstyle-plugin.version}</version>
+                <configuration>
+                    <consoleOutput>true</consoleOutput>
+                    <encoding>UTF-8</encoding>
+                    <configLocation>style/checkstyle.xml</configLocation>
+                    <includeTestSourceDirectory>true</includeTestSourceDirectory>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>validate</id>
+                        <phase>validate</phase>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
-
 </project>
\ No newline at end of file
diff --git a/java/style/checkstyle.xml b/java/style/checkstyle.xml
new file mode 100644
index 0000000..d9bfef4
--- /dev/null
+++ b/java/style/checkstyle.xml
@@ -0,0 +1,385 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<!--
+    Checkstyle configuration originally derived from the Google coding conventions from Google Java Style that can be
+    found at https://google.github.io/styleguide/javaguide.html. Deviations have been made where desired.
+ -->
+<module name="Checker">
+    <property name="charset" value="UTF-8"/>
+    <property name="severity" value="error"/>
+    <property name="fileExtensions" value="java"/>
+
+    <!-- Files must not contain tabs. -->
+    <module name="FileTabCharacter">
+        <property name="eachLine" value="true"/>
+    </module>
+
+    <!-- header -->
+    <module name="RegexpHeader">
+        <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/>
+        <property name="fileExtensions" value="java"/>
+    </module>
+
+    <!--    <module name="SuppressionFilter">-->
+    <!--        <property name="file"-->
+    <!--                  value="${checkstyle.suppressions.file}"-->
+    <!--                  default="checkstyle-suppressions.xml"/>-->
+    <!--    </module>-->
+
+    <module name="TreeWalker">
+
+        <!-- Allow suppressing rules via comments. -->
+        <module name="SuppressionCommentFilter"/>
+
+        <!-- Class names must match the file name in which they are defined. -->
+        <module name="OuterTypeFilename"/>
+
+        <!-- Only one class may be defined per file. -->
+        <module name="OneTopLevelClass"/>
+
+        <!-- Special escape sequences like \n and \t must be used over the octal or unicode equivalent. -->
+        <module name="IllegalTokenText">
+            <property name="tokens" value="STRING_LITERAL, CHAR_LITERAL"/>
+            <property name="format"
+                      value="\\u00(08|09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/>
+            <property name="message" value="Avoid using corresponding octal or Unicode escape."/>
+        </module>
+
+        <!-- Unicode escapes must not be used for printable characters. -->
+        <module name="AvoidEscapedUnicodeCharacters">
+            <property name="allowEscapesForControlCharacters" value="true"/>
+            <property name="allowByTailComment" value="true"/>
+            <property name="allowNonPrintableEscapes" value="true"/>
+        </module>
+
+        <!-- Stars must not be used in import statements. -->
+        <module name="AvoidStarImport"/>
+
+        <!-- Checks for unused imports. -->
+        <module name="UnusedImports"/>
+
+        <!-- Package name and imports must not be wrapped. -->
+        <module name="NoLineWrap"/>
+
+        <!-- Braces must be used for all blocks. -->
+        <module name="NeedBraces"/>
+
+        <!-- Braces must not be empty for most language constructs. -->
+        <module name="EmptyBlock">
+            <property name="option" value="TEXT"/>
+            <property name="tokens" value="LITERAL_TRY, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_SWITCH"/>
+        </module>
+
+        <!-- For language constructs related to the previous statement (eg. "else" or "catch"), the keywords must
+             be defined on the same line as the right curly brace. -->
+        <module name="RightCurly">
+            <property name="id" value="RightCurlySame"/>
+            <property name="tokens"
+                      value="LITERAL_TRY, LITERAL_CATCH, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_DO"/>
+        </module>
+
+        <!-- For other language constructs, they must be defined on a separate line. -->
+        <module name="RightCurly">
+            <property name="id" value="RightCurlyAlone"/>
+            <property name="option" value="alone"/>
+            <property name="tokens"
+                      value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT, INSTANCE_INIT"/>
+        </module>
+
+        <!-- Language constructs like "if" and "while" must be followed by whitespace. -->
+        <module name="WhitespaceAfter"/>
+
+        <!-- Language constructs must be surrounded by whitespace. -->
+        <module name="WhitespaceAround">
+            <property name="allowEmptyConstructors" value="true"/>
+            <property name="allowEmptyMethods" value="true"/>
+            <property name="allowEmptyTypes" value="true"/>
+            <property name="allowEmptyLoops" value="true"/>
+            <message key="ws.notFollowed"
+                     value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement."/>
+            <message key="ws.notPreceded"
+                     value="WhitespaceAround: ''{0}'' is not preceded with whitespace."/>
+        </module>
+
+        <!-- Only one statement per line is permitted. -->
+        <module name="OneStatementPerLine"/>
+
+        <!-- Variables must be defined on different lines. -->
+        <module name="MultipleVariableDeclarations"/>
+
+        <!-- No C-style array declarations are permitted (eg. String args[]). -->
+        <module name="ArrayTypeStyle"/>
+
+        <!-- Defaults must always be included for switch statements, even if they are empty. -->
+        <module name="MissingSwitchDefault"/>
+
+        <!-- Case blocks with statements on them must include a break, return, etc. or the comment "fall through". -->
+        <module name="FallThrough"/>
+
+        <!-- When defining long literals, an upper L must be used. -->
+        <module name="UpperEll"/>
+
+        <!-- Modifiers like public, abstract, static, etc. must follow a consistent order. -->
+        <module name="ModifierOrder"/>
+
+        <!-- Empty lines must separate methods and constructors. -->
+        <module name="EmptyLineSeparator">
+            <property name="allowNoEmptyLineBetweenFields" value="true"/>
+        </module>
+
+        <!-- New lines must happen before dots. -->
+        <module name="SeparatorWrap">
+            <property name="id" value="SeparatorWrapDot"/>
+            <property name="tokens" value="DOT"/>
+            <property name="option" value="nl"/>
+        </module>
+
+        <!-- New lines must happen after commas. -->
+        <module name="SeparatorWrap">
+            <property name="id" value="SeparatorWrapComma"/>
+            <property name="tokens" value="COMMA"/>
+            <property name="option" value="EOL"/>
+        </module>
+
+        <!-- Package names must follow a defined format. -->
+        <module name="PackageName">
+            <property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$"/>
+            <message key="name.invalidPattern"
+                     value="Package name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Type names must follow a defined format. -->
+        <module name="TypeName">
+            <message key="name.invalidPattern"
+                     value="Type name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Non-constant fields must follow a defined format. -->
+        <module name="MemberName">
+            <property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
+            <message key="name.invalidPattern"
+                     value="Member name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Constant fields must follow a defined format. -->
+        <module name="ConstantName">
+            <property name="format" value="^log?|[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$"/>
+        </module>
+
+        <!-- Method and lambda parameters must follow a defined format. -->
+        <module name="ParameterName">
+            <property name="id" value="ParameterNameNonPublic"/>
+            <property name="format" value="^[a-z]([a-zA-Z0-9]*)?$"/>
+            <property name="accessModifiers" value="protected, package, private"/>
+            <message key="name.invalidPattern"
+                     value="Parameter name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+        <module name="ParameterName">
+            <property name="id" value="ParameterNamePublic"/>
+            <property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
+            <property name="accessModifiers" value="public"/>
+            <message key="name.invalidPattern"
+                     value="Parameter name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Catch parameters must follow a defined format. -->
+        <module name="CatchParameterName">
+            <property name="format" value="^(e|t|[a-z][a-zA-Z0-9]*)$"/>
+            <message key="name.invalidPattern"
+                     value="Catch parameter name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Local variables must follow a defined format. -->
+        <module name="LocalVariableName">
+            <property name="tokens" value="VARIABLE_DEF"/>
+            <property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
+            <property name="allowOneCharVarInForLoop" value="true"/>
+            <message key="name.invalidPattern"
+                     value="Local variable name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Type parameters must follow a defined format. -->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
+            <message key="name.invalidPattern"
+                     value="Class type name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+        <module name="MethodTypeParameterName">
+            <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
+            <message key="name.invalidPattern"
+                     value="Method type name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+        <module name="InterfaceTypeParameterName">
+            <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
+            <message key="name.invalidPattern"
+                     value="Interface type name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Method names must follow a defined format. -->
+        <module name="MethodName">
+            <property name="format" value="^[a-z][a-zA-Z0-9]*$"/>
+            <message key="name.invalidPattern"
+                     value="Method name ''{0}'' must match pattern ''{1}''."/>
+        </module>
+
+        <!-- Finalizers must not be overridden. -->
+        <module name="NoFinalizer"/>
+
+        <!-- Whitespace around generics must follow a defined format. -->
+        <module name="GenericWhitespace">
+            <message key="ws.followed"
+                     value="GenericWhitespace ''{0}'' is followed by whitespace."/>
+            <message key="ws.preceded"
+                     value="GenericWhitespace ''{0}'' is preceded with whitespace."/>
+            <message key="ws.illegalFollow"
+                     value="GenericWhitespace ''{0}'' should be followed by whitespace."/>
+            <message key="ws.notPreceded"
+                     value="GenericWhitespace ''{0}'' is not preceded with whitespace."/>
+        </module>
+
+        <!-- File indentation must follow a convention of 4 spaces (8 for throws statements). -->
+<!--        <module name="Indentation">-->
+<!--            <property name="throwsIndent" value="8"/>-->
+<!--            <property name="arrayInitIndent" value="8"/>-->
+<!--        </module>-->
+
+        <!-- Abbreviations must follow the same conventions as any other word (eg. use Rpc, not RPC). -->
+        <module name="AbbreviationAsWordInName">
+            <property name="ignoreFinal" value="false"/>
+            <!--            <property name="allowedAbbreviationLength" value="3"/>-->
+            <property name="severity" value="warning"/>
+        </module>
+
+        <!-- Class contents must be defined in the order suggested by Sun/Oracle:
+             http://www.oracle.com/technetwork/java/javase/documentation/codeconventions-141855.html#1852 -->
+        <module name="DeclarationOrder"/>
+
+        <!--&lt;!&ndash; Overloaded methods and constructors must be defined together. &ndash;&gt;-->
+        <!--<module name="OverloadMethodsDeclarationOrder">-->
+        <!--<property name="severity" value="warning"/> &lt;!&ndash; TODO: Make error. &ndash;&gt;-->
+        <!--</module>-->
+
+        <!-- Variables must be declared near where they are used. -->
+        <module name="VariableDeclarationUsageDistance">
+            <property name="allowedDistance" value="10"/>
+        </module>
+
+        <!-- Static imports must occur before external package imports. -->
+        <module name="CustomImportOrder">
+            <property name="sortImportsInGroupAlphabetically" value="true"/>
+            <property name="separateLineBetweenGroups" value="true"/>
+            <property name="customImportOrderRules" value="STATIC###THIRD_PARTY_PACKAGE"/>
+        </module>
+
+        <!-- Method names must be specified on the same line as their parameter list. -->
+        <module name="MethodParamPad"/>
+
+        <!-- There must be no space between a method name and its parameter list. -->
+        <module name="ParenPad"/>
+
+        <!-- Non-field annotations must be on separate lines, or in the case of single parameterless annotation can be
+             placed on the same line as the signature. -->
+        <module name="AnnotationLocation">
+            <property name="id" value="AnnotationLocationMostCases"/>
+            <property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF"/>
+        </module>
+
+        <!-- Fields can have multiple annotations applied on the same line. -->
+        <module name="AnnotationLocation">
+            <property name="id" value="AnnotationLocationVariables"/>
+            <property name="tokens" value="VARIABLE_DEF"/>
+            <property name="allowSamelineMultipleAnnotations" value="true"/>
+        </module>
+
+        <!-- Catch blocks must not be empty without a comment. -->
+        <module name="EmptyCatchBlock"/>
+
+        <!-- Comments must be placed at the same indentation level as the surrounding code. -->
+        <module name="CommentsIndentation"/>
+
+        <!-- Checks for imports of certain packages           -->
+        <!-- See http://checkstyle.sf.net/config_imports.html -->
+        <module name="IllegalImport">
+            <property name="illegalPkgs" value="org.apache.http.annotation,javax.annotation.Generated"/>
+        </module>
+
+        <!-- Checks that the override annotation is specified when using @inheritDoc javadoc. -->
+        <module name="MissingOverride"/>
+
+        <!-- Do not allow assignment in subexpressions (except in some cases in loop conditions). -->
+        <module name="InnerAssignment"/>
+
+        <!-- Checks that we don't use System.out.print -->
+        <module name="Regexp">
+            <property name="format" value="System\s*\.\s*(out|err)\s*(\.|::)\s*print"/>
+            <property name="illegalPattern" value="true"/>
+            <property name="message" value="Don't use System console for logging, use a logger instead"/>
+            <property name="ignoreComments" value="true"/>
+        </module>
+
+        <!-- Checks that we don't use Objects.hash. Objects.hashCode is preferred-->
+        <module name="Regexp">
+            <property name="format" value="\bObjects.hash\b"/>
+            <property name="illegalPattern" value="true"/>
+            <property name="message" value="Don't use Objects.hash, use Objects.hashCode instead"/>
+            <property name="ignoreComments" value="true"/>
+        </module>
+
+        <!-- Checks that we don't use sslContext.newHandler directly -->
+        <module name="Regexp">
+            <property name="format" value="\sslContext.newHandler\b"/>
+            <property name="illegalPattern" value="true"/>
+            <property name="message"
+                      value="Don't use sslContext.newHandler directly, use NettyUtils.newSslHandler instead"/>
+            <property name="ignoreComments" value="true"/>
+        </module>
+
+
+        <!-- Checks that we don't use AttributeKey.newInstance directly -->
+        <module name="Regexp">
+            <property name="format" value="AttributeKey\.newInstance"/>
+            <property name="illegalPattern" value="true"/>
+            <property name="message" value="Use NettyUtils.getOrCreateAttributeKey to safely declare AttributeKeys"/>
+            <property name="ignoreComments" value="true"/>
+        </module>
+
+        <!-- Checks that we don't use Thread.currentThread().getContextClassLoader() directly -->
+        <module name="Regexp">
+            <property name="format" value="Thread\s*\.\s*currentThread\s*\(\s*\)\s*(\.|::)\s*getContextClassLoader"/>
+            <property name="illegalPattern" value="true"/>
+            <property name="ignoreComments" value="true"/>
+        </module>
+
+        <!-- Checks for redundant public modifier on interfaces and other redundant modifiers -->
+        <!--        <module name="RedundantModifier"/>-->
+
+        <!-- Checks for utility and constants classes to have private constructor-->
+        <module name="HideUtilityClassConstructor"/>
+    </module>
+
+    <!-- Enforce maximum line lengths. -->
+    <module name="LineLength">
+        <property name="max" value="120"/>
+        <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
+    </module>
+
+</module>
\ No newline at end of file
diff --git a/java/style/intellij-codestyle.xml b/java/style/intellij-codestyle.xml
new file mode 100644
index 0000000..7e479cf
--- /dev/null
+++ b/java/style/intellij-codestyle.xml
@@ -0,0 +1,67 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<code_scheme name="Apache RocketMQ Clients 5.0">
+    <option name="GENERATE_FINAL_LOCALS" value="false"/>
+    <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="9999"/>
+    <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="9999999"/>
+    <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+        <value/>
+    </option>
+    <option name="IMPORT_LAYOUT_TABLE">
+        <value>
+            <package name="" withSubpackages="true" static="true"/>
+            <emptyLine/>
+            <package name="" withSubpackages="true" static="false"/>
+            <emptyLine/>
+        </value>
+    </option>
+    <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/>
+    <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/>
+    <option name="JD_P_AT_EMPTY_LINES" value="false"/>
+    <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/>
+    <JavaCodeStyleSettings>
+        <option name="DO_NOT_WRAP_AFTER_SINGLE_ANNOTATION" value="true"/>
+        <option name="CLASS_NAMES_IN_JAVADOC" value="3"/>
+    </JavaCodeStyleSettings>
+    <XML>
+        <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
+    </XML>
+    <codeStyleSettings language="JAVA">
+        <option name="RIGHT_MARGIN" value="120"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="KEEP_FIRST_COLUMN_COMMENT" value="false"/>
+        <option name="DO_NOT_WRAP_AFTER_SINGLE_ANNOTATION" value="true"/>
+        <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+<!--        <option name="SPACE_BEFORE_ANNOTATION_ARRAY_INITIALIZER_LBRACE" value="true"/>-->
+        <option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true"/>
+        <option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true"/>
+        <option name="PLACE_ASSIGNMENT_SIGN_ON_NEXT_LINE" value="true"/>
+        <option name="IF_BRACE_FORCE" value="3"/>
+        <option name="DOWHILE_BRACE_FORCE" value="3"/>
+        <option name="WHILE_BRACE_FORCE" value="3"/>
+        <option name="FOR_BRACE_FORCE" value="3"/>
+        <option name="WRAP_LONG_LINES" value="true"/>
+        <option name="WRAP_ON_TYPING" value="1"/>
+        <indentOptions>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+</code_scheme>
\ No newline at end of file
diff --git a/java/style/spotbugs-suppressions.xml b/java/style/spotbugs-suppressions.xml
new file mode 100644
index 0000000..ca8620f
--- /dev/null
+++ b/java/style/spotbugs-suppressions.xml
@@ -0,0 +1,12 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<FindBugsFilter>
+    <Match>
+        <Bug pattern="SF_SWITCH_FALLTHROUGH,SIC_INNER_SHOULD_BE_STATIC_ANON,URF_UNREAD_FIELD,UUF_UNUSED_FIELD,SS_SHOULD_BE_STATIC,NM_CONFUSING"/>
+    </Match>
+
+    <Match>
+        <Package name="~org\.apache\.rocketmq\.client.*"/>
+        <Bug pattern="NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"/>
+    </Match>
+</FindBugsFilter>