You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/09 11:54:00 UTC
[rocketmq] 19/26: [ISSUE #5392] Adapt for logging and module refector
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 416427de1ec70e00a8bcd881ebd0571906a10d49
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Wed Nov 23 09:49:32 2022 +0800
[ISSUE #5392] Adapt for logging and module refector
---
.../rocketmq/broker/client/ConsumerManager.java | 2 +-
.../broker/client/ConsumerManagerInterface.java | 6 ++--
.../rocketmq/broker/client/ProducerManager.java | 2 +-
.../broker/client/ProducerManagerInterface.java | 2 +-
.../apache/rocketmq/proxy/config/ProxyConfig.java | 3 +-
.../processor/channel/RemoteChannelSerializer.java | 6 ++--
.../remoting/MultiProtocolRemotingServer.java | 4 +--
.../proxy/remoting/MultiProtocolTlsHelper.java | 4 +--
.../proxy/remoting/RemotingProtocolServer.java | 8 +++---
.../activity/AbstractRemotingActivity.java | 14 +++++-----
.../remoting/activity/ClientManagerActivity.java | 14 +++++-----
.../remoting/activity/ConsumerManagerActivity.java | 14 +++++-----
.../remoting/activity/GetTopicRouteActivity.java | 6 ++--
.../remoting/activity/PopMessageActivity.java | 2 +-
.../remoting/activity/PullMessageActivity.java | 6 ++--
.../remoting/activity/SendMessageActivity.java | 6 ++--
.../remoting/activity/TransactionActivity.java | 4 +--
.../proxy/remoting/channel/RemotingChannel.java | 32 +++++++++++-----------
.../remoting/channel/RemotingChannelManager.java | 6 ++--
.../proxy/remoting/common/RemotingConverter.java | 6 ++--
.../http2proxy/Http2ProtocolProxyHandler.java | 6 ++--
.../http2proxy/Http2ProxyBackendHandler.java | 6 ++--
.../http2proxy/Http2ProxyFrontendHandler.java | 6 ++--
.../rocketmq/proxy/service/admin/AdminService.java | 2 +-
.../proxy/service/admin/DefaultAdminService.java | 10 +++----
.../service/client/ClusterConsumerManager.java | 6 ++--
.../sysmessage/AbstractSystemMessageSyncer.java | 10 +++----
.../proxy/service/sysmessage/HeartbeatSyncer.java | 6 ++--
.../service/sysmessage/HeartbeatSyncerData.java | 6 ++--
.../grpc/v2/channel/GrpcClientChannelTest.java | 4 +--
.../activity/AbstractRemotingActivityTest.java | 8 +++---
.../remoting/activity/PullMessageActivityTest.java | 12 ++++----
.../remoting/activity/SendMessageActivityTest.java | 10 +++----
.../remoting/channel/RemotingChannelTest.java | 14 +++++-----
.../service/admin/DefaultAdminServiceTest.java | 6 ++--
.../service/sysmessage/HeartbeatSyncerTest.java | 18 ++++++------
36 files changed, 139 insertions(+), 138 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index 0582ce75e..a70e8579e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -38,7 +38,7 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-public class ConsumerManager {
+public class ConsumerManager implements ConsumerManagerInterface {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ConcurrentMap<String, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<>(1024);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java
index 895a2e491..6998f60e7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManagerInterface.java
@@ -20,9 +20,9 @@ package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
import java.util.Set;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
public interface ConsumerManagerInterface {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 047aa8be9..a3ed9c590 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -35,7 +35,7 @@ import org.apache.rocketmq.remoting.protocol.body.ProducerInfo;
import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-public class ProducerManager {
+public class ProducerManager implements ProducerManagerInterface {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java
index 3f2ece7cd..5e2e7e5b0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManagerInterface.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
-import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
+import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
public interface ProducerManagerInterface {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index bd7cf1113..e0f971202 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.ProxyMode;
@@ -241,7 +242,7 @@ public class ProxyConfig implements ConfigFile {
public void initData() {
parseDelayLevel();
if (StringUtils.isEmpty(localServeAddr)) {
- this.localServeAddr = RemotingUtil.getLocalAddress();
+ this.localServeAddr = NetworkUtil.getLocalAddress();
}
if (StringUtils.isBlank(localServeAddr)) {
throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "get local serve ip failed");
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java
index 8fd216219..a22401a5f 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/channel/RemoteChannelSerializer.java
@@ -23,11 +23,11 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public class RemoteChannelSerializer {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
private static final String REMOTE_PROXY_IP_KEY = "remoteProxyIp";
private static final String REMOTE_ADDRESS_KEY = "remoteAddress";
private static final String LOCAL_ADDRESS_KEY = "localAddress";
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
index 02e3a545e..74fb3616c 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolRemotingServer.java
@@ -23,6 +23,8 @@ import io.netty.handler.timeout.IdleStateHandler;
import java.io.IOException;
import java.security.cert.CertificateException;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.remoting.protocol.ProtocolNegotiationHandler;
@@ -33,8 +35,6 @@ import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* support remoting and http2 protocol at one port
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java
index 54af7bc9e..59342ca3c 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/MultiProtocolTlsHelper.java
@@ -34,9 +34,9 @@ import java.nio.file.Paths;
import java.security.cert.CertificateException;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.netty.TlsHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerAuthClient;
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.tlsServerCertPath;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
index adbc21169..a7cc7af47 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/RemotingProtocolServer.java
@@ -32,10 +32,10 @@ import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.future.FutureTaskExt;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.thread.ThreadPoolStatusMonitor;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
@@ -59,8 +59,8 @@ import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
public class RemotingProtocolServer implements StartAndShutdown, RemotingProxyOutClient {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
index 650c38614..a66ee6e04 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java
@@ -25,8 +25,9 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
@@ -35,11 +36,10 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
public abstract class AbstractRemotingActivity implements NettyRequestProcessor {
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -119,8 +119,8 @@ public abstract class AbstractRemotingActivity implements NettyRequestProcessor
context.setAction("Remoting" + request.getCode())
.setLanguage(request.getLanguage().name())
.setChannel(ctx.channel())
- .setLocalAddress(RemotingUtil.socketAddress2String(ctx.channel().localAddress()))
- .setRemoteAddress(RemotingUtil.socketAddress2String(ctx.channel().remoteAddress()));
+ .setLocalAddress(NetworkUtil.socketAddress2String(ctx.channel().localAddress()))
+ .setRemoteAddress(NetworkUtil.socketAddress2String(ctx.channel().remoteAddress()));
return context;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
index 10f7fa324..1009e4204 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.java
@@ -25,13 +25,13 @@ import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerChangeListener;
import org.apache.rocketmq.broker.client.ProducerGroupEvent;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
-import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
-import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
-import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
-import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.UnregisterClientResponseHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
index 734b1dad1..1c1993ff0 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.java
@@ -24,13 +24,13 @@ import java.util.List;
import java.util.Set;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
-import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
-import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
-import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
-import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
+import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody;
+import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseHeader;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
index 670d9735c..9972c26c9 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java
@@ -23,9 +23,9 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.MQVersion;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestHeader;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java
index d52b84b12..a635e55cc 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PopMessageActivity.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.proxy.remoting.activity;
import io.netty.channel.ChannelHandlerContext;
import java.time.Duration;
-import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
index eb744676a..d548ddc0d 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java
@@ -20,14 +20,14 @@ package org.apache.rocketmq.proxy.remoting.activity;
import io.netty.channel.ChannelHandlerContext;
import java.time.Duration;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
public class PullMessageActivity extends AbstractRemotingActivity {
public PullMessageActivity(RequestPipeline requestPipeline,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
index 20fab6e57..618d45874 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
@@ -22,9 +22,9 @@ import java.time.Duration;
import java.util.Map;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java
index 24f98a875..bc5e0ca35 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/TransactionActivity.java
@@ -18,8 +18,8 @@
package org.apache.rocketmq.proxy.remoting.activity;
import io.netty.channel.ChannelHandlerContext;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
index 806b35de2..368330115 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannel.java
@@ -27,16 +27,9 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
-import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
-import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
@@ -50,12 +43,19 @@ import org.apache.rocketmq.proxy.service.relay.ProxyChannel;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayResult;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.proxy.service.transaction.TransactionData;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
public class RemotingChannel extends ProxyChannel implements RemoteChannelConverter, ChannelExtendAttributeGetter {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
private static final long DEFAULT_MQ_CLIENT_TIMEOUT = Duration.ofSeconds(3).toMillis();
private final String clientId;
private final String remoteAddress;
@@ -67,12 +67,12 @@ public class RemotingChannel extends ProxyChannel implements RemoteChannelConver
Channel parent,
String clientId, Set<SubscriptionData> subscriptionData) {
super(proxyRelayService, parent, parent.id(),
- RemotingUtil.socketAddress2String(parent.remoteAddress()),
- RemotingUtil.socketAddress2String(parent.localAddress()));
+ NetworkUtil.socketAddress2String(parent.remoteAddress()),
+ NetworkUtil.socketAddress2String(parent.localAddress()));
this.remotingProxyOutClient = remotingProxyOutClient;
this.clientId = clientId;
- this.remoteAddress = RemotingUtil.socketAddress2String(parent.remoteAddress());
- this.localAddress = RemotingUtil.socketAddress2String(parent.localAddress());
+ this.remoteAddress = NetworkUtil.socketAddress2String(parent.remoteAddress());
+ this.localAddress = NetworkUtil.socketAddress2String(parent.localAddress());
this.subscriptionData = subscriptionData;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
index d47884b61..6913fc670 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelManager.java
@@ -26,12 +26,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
public class RemotingChannelManager implements StartAndShutdown {
protected final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java
index a08abbba2..2bd53d8de 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/common/RemotingConverter.java
@@ -20,11 +20,11 @@ package org.apache.rocketmq.proxy.remoting.common;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public class RemotingConverter {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected static final Object INSTANCE_CREATE_LOCK = new Object();
protected static volatile RemotingConverter instance;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
index 86f1ee921..2ba2d3463 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProtocolProxyHandler.java
@@ -31,15 +31,15 @@ import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import javax.net.ssl.SSLException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.remoting.protocol.ProtocolHandler;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Http2ProtocolProxyHandler implements ProtocolHandler {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static final String LOCAL_HOST = "127.0.0.1";
/**
* The int value of "PRI ". Now use 4 bytes to judge protocol, may be has potential risks if there is a new protocol
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
index 53bddfc31..dfcd144af 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyBackendHandler.java
@@ -22,12 +22,12 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Http2ProxyBackendHandler extends ChannelInboundHandlerAdapter {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private final Channel inboundChannel;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
index 8bffdc6d0..775c047b8 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/http2proxy/Http2ProxyFrontendHandler.java
@@ -23,12 +23,12 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Http2ProxyFrontendHandler extends ChannelInboundHandlerAdapter {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
// As we use inboundChannel.eventLoop() when building the Bootstrap this does not need to be volatile as
// the outboundChannel will use the same EventLoop (and therefore Thread) as the inboundChannel.
private final Channel outboundChannel;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java
index d98d17ff7..a9e6686b4 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/AdminService.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.proxy.service.admin;
import java.util.List;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
public interface AdminService {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java
index e94c0879b..4dbf21a98 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminService.java
@@ -25,17 +25,17 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIExt;
import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.service.route.TopicRouteHelper;
public class DefaultAdminService implements AdminService {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
private final MQClientAPIFactory mqClientAPIFactory;
public DefaultAdminService(MQClientAPIFactory mqClientAPIFactory) {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
index 3bb65b03e..3a98b5ee1 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/client/ClusterConsumerManager.java
@@ -23,9 +23,9 @@ import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
import org.apache.rocketmq.proxy.service.admin.AdminService;
import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
index 72593525e..e0a9fd702 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
@@ -29,11 +29,9 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.common.StartAndShutdown;
@@ -44,9 +42,11 @@ import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
public abstract class AbstractSystemMessageSyncer implements StartAndShutdown, MessageListenerConcurrently {
- protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ protected static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
protected final TopicRouteService topicRouteService;
protected final AdminService adminService;
protected final String systemResourceName;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
index 12504a2f0..041cbcee6 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncer.java
@@ -33,9 +33,9 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.proxy.common.channel.ChannelHelper;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
index f3b96ac9a..10c6f1206 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerData.java
@@ -20,9 +20,9 @@ package org.apache.rocketmq.proxy.service.sysmessage;
import com.google.common.base.MoreObjects;
import java.util.Set;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class HeartbeatSyncerData {
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
index 70e10bc2b..1bdbdd9be 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannelTest.java
@@ -22,7 +22,7 @@ import apache.rocketmq.v2.Resource;
import apache.rocketmq.v2.Settings;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.config.InitConfigTest;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
@@ -41,7 +41,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class GrpcClientChannelTest extends InitConfigAndLoggerTest {
+public class GrpcClientChannelTest extends InitConfigTest {
@Mock
private ProxyRelayService proxyRelayService;
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
index 74eb3cbd8..ecdc1deaf 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java
@@ -24,12 +24,12 @@ import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
-import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.config.InitConfigTest;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
@@ -53,7 +53,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class AbstractRemotingActivityTest extends InitConfigAndLoggerTest {
+public class AbstractRemotingActivityTest extends InitConfigTest {
AbstractRemotingActivity remotingActivity;
@Mock
MessagingProcessor messagingProcessorMock;
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
index ffbe2ffac..5798e883b 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java
@@ -22,16 +22,16 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
-import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.config.InitConfigTest;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -49,7 +49,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class PullMessageActivityTest extends InitConfigAndLoggerTest {
+public class PullMessageActivityTest extends InitConfigTest {
PullMessageActivity pullMessageActivity;
@Mock
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
index e03bc26e0..b88f6677e 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
@@ -24,10 +24,10 @@ import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.proxy.config.InitConfigTest;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext;
@@ -49,7 +49,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class SendMessageActivityTest extends InitConfigAndLoggerTest {
+public class SendMessageActivityTest extends InitConfigTest {
SendMessageActivity sendMessageActivity;
@Mock
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java
index 840f3e40f..d947fa5d5 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/channel/RemotingChannelTest.java
@@ -21,15 +21,15 @@ import io.netty.channel.Channel;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.rocketmq.common.filter.FilterAPI;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.proxy.config.InitConfigTest;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
import org.apache.rocketmq.proxy.processor.channel.ChannelProtocolType;
import org.apache.rocketmq.proxy.processor.channel.RemoteChannel;
import org.apache.rocketmq.proxy.remoting.RemotingProxyOutClient;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,7 +42,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class RemotingChannelTest extends InitConfigAndLoggerTest {
+public class RemotingChannelTest extends InitConfigTest {
@Mock
private RemotingProxyOutClient remotingProxyOutClient;
@Mock
@@ -61,8 +61,8 @@ public class RemotingChannelTest extends InitConfigAndLoggerTest {
public void before() throws Throwable {
super.before();
this.clientId = RandomStringUtils.randomAlphabetic(10);
- when(parent.remoteAddress()).thenReturn(RemotingUtil.string2SocketAddress(remoteAddress));
- when(parent.localAddress()).thenReturn(RemotingUtil.string2SocketAddress(localAddress));
+ when(parent.remoteAddress()).thenReturn(NetworkUtil.string2SocketAddress(remoteAddress));
+ when(parent.localAddress()).thenReturn(NetworkUtil.string2SocketAddress(localAddress));
this.subscriptionData = new HashSet<>();
this.subscriptionData.add(FilterAPI.buildSubscriptionData("topic", "subTag"));
this.remotingChannel = new RemotingChannel(remotingProxyOutClient, proxyRelayService,
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java
index 039efd8b4..f0e618d11 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/admin/DefaultAdminServiceTest.java
@@ -22,9 +22,9 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIExt;
import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
import org.junit.Before;
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 8ac74f533..45e3942d6 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -38,18 +38,11 @@ import org.apache.rocketmq.broker.client.ConsumerManagerInterface;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
-import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest;
+import org.apache.rocketmq.proxy.config.InitConfigTest;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
@@ -63,6 +56,13 @@ import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.assertj.core.util.Lists;
import org.jetbrains.annotations.NotNull;
import org.junit.Before;
@@ -88,7 +88,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class HeartbeatSyncerTest extends InitConfigAndLoggerTest {
+public class HeartbeatSyncerTest extends InitConfigTest {
@Mock
private TopicRouteService topicRouteService;
@Mock