You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:33 UTC
[rocketmq] 09/14: Remove netty dependency in RemotingClient and
Remoting Server
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit f84239ef03d057a6f309187c21c3097d3ccb504e
Author: duhenglucky <du...@gmail.com>
AuthorDate: Wed Jan 2 01:52:05 2019 +0800
Remove netty dependency in RemotingClient and Remoting Server
---
.../apache/rocketmq/broker/BrokerController.java | 20 ++---
.../org/apache/rocketmq/broker/BrokerStartup.java | 8 +-
.../rocketmq/broker/client/ClientChannelInfo.java | 9 ++-
.../broker/client/ClientHousekeepingService.java | 22 +++--
.../rocketmq/broker/client/ConsumerGroupInfo.java | 19 ++---
.../rocketmq/broker/client/ConsumerManager.java | 12 +--
.../client/DefaultConsumerIdsChangeListener.java | 5 +-
.../rocketmq/broker/client/ProducerManager.java | 42 +++++-----
.../rocketmq/broker/client/net/Broker2Client.java | 19 ++---
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 6 +-
.../processor/AbstractSendMessageProcessor.java | 4 +-
.../broker/processor/AdminBrokerProcessor.java | 21 ++---
.../broker/processor/ClientManageProcessor.java | 18 +++--
.../broker/processor/ConsumerManageProcessor.java | 13 ++-
.../broker/processor/EndTransactionProcessor.java | 15 ++--
.../broker/processor/ForwardRequestProcessor.java | 10 ++-
.../broker/processor/PullMessageProcessor.java | 11 ++-
.../broker/processor/QueryMessageProcessor.java | 12 ++-
.../broker/processor/SendMessageProcessor.java | 11 ++-
.../processor/SnodePullMessageProcessor.java | 11 ++-
.../AbstractTransactionalMessageCheckListener.java | 3 +-
.../rocketmq/broker/BrokerControllerTest.java | 8 +-
.../apache/rocketmq/broker/BrokerOuterAPITest.java | 8 +-
.../broker/client/ProducerManagerTest.java | 11 +--
.../processor/ClientManageProcessorTest.java | 25 +++---
.../processor/EndTransactionProcessorTest.java | 22 +++--
.../broker/processor/PullMessageProcessorTest.java | 39 ++++++---
.../broker/processor/SendMessageProcessorTest.java | 21 +++--
...faultTransactionalMessageCheckListenerTest.java | 8 +-
.../queue/TransactionalMessageBridgeTest.java | 8 +-
.../queue/TransactionalMessageServiceImplTest.java | 8 +-
.../NettyClientConfig.java => ClientConfig.java} | 0
.../apache/rocketmq/remoting/RemotingChannel.java | 78 ++++++++++++++++++
...RequestProcessor.java => RequestProcessor.java} | 0
.../NettyServerConfig.java => ServerConfig.java} | 0
...or.java => NettyChannelHandlerContextImpl.java} | 15 +---
.../rocketmq/remoting/netty/NettyChannelImpl.java | 94 ++++++++++++++++++++++
37 files changed, 445 insertions(+), 191 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index eff8fd4..61a5008 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -86,9 +86,9 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RemotingServerFactory;
import org.apache.rocketmq.remoting.common.TlsMode;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
@@ -107,8 +107,8 @@ public class BrokerController {
private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
private static final InternalLogger LOG_WATER_MARK = InternalLoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);
private final BrokerConfig brokerConfig;
- private final NettyServerConfig nettyServerConfig;
- private final NettyClientConfig nettyClientConfig;
+ private final ServerConfig nettyServerConfig;
+ private final ClientConfig nettyClientConfig;
private final MessageStoreConfig messageStoreConfig;
private final ConsumerOffsetManager consumerOffsetManager;
private final ConsumerManager consumerManager;
@@ -162,8 +162,8 @@ public class BrokerController {
public BrokerController(
final BrokerConfig brokerConfig,
- final NettyServerConfig nettyServerConfig,
- final NettyClientConfig nettyClientConfig,
+ final ServerConfig nettyServerConfig,
+ final ClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
this.brokerConfig = brokerConfig;
@@ -210,7 +210,7 @@ public class BrokerController {
return brokerConfig;
}
- public NettyServerConfig getNettyServerConfig() {
+ public ServerConfig getNettyServerConfig() {
return nettyServerConfig;
}
@@ -251,7 +251,7 @@ public class BrokerController {
this.remotingServer = RemotingServerFactory.createInstance();
this.remotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
- NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
+ ServerConfig fastConfig = (ServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
// this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
this.fastRemotingServer = RemotingServerFactory.createInstance();
@@ -520,7 +520,7 @@ public class BrokerController {
/**
* QueryMessageProcessor
*/
- NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
+ RequestProcessor queryProcessor = new QueryMessageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index c623d52..17d2f0e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -30,8 +30,8 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -107,8 +107,8 @@ public class BrokerStartup {
}
final BrokerConfig brokerConfig = new BrokerConfig();
- final NettyServerConfig nettyServerConfig = new NettyServerConfig();
- final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ final ServerConfig nettyServerConfig = new ServerConfig();
+ final ClientConfig nettyClientConfig = new ClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
index 7c5e25b..192a6f8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
@@ -17,27 +17,28 @@
package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
public class ClientChannelInfo {
- private final Channel channel;
+ private final RemotingChannel channel;
private final String clientId;
private final LanguageCode language;
private final int version;
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
- public ClientChannelInfo(Channel channel) {
+ public ClientChannelInfo(RemotingChannel channel) {
this(channel, null, null, 0);
}
- public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) {
+ public ClientChannelInfo(RemotingChannel channel, String clientId, LanguageCode language, int version) {
this.channel = channel;
this.clientId = clientId;
this.language = language;
this.version = version;
}
- public Channel getChannel() {
+ public RemotingChannel getChannel() {
return channel;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index 7e023dd..5a39e4f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -26,6 +26,9 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
public class ClientHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -62,26 +65,35 @@ public class ClientHousekeepingService implements ChannelEventListener {
}
@Override
- public void onChannelConnect(String remoteAddr, Channel channel) {
-
+ public void onChannelConnect(String remoteAddr, RemotingChannel channel) {
+ log.info("Remoting channel connected: {}", RemotingHelper.parseChannelRemoteAddr(channel.remoteAddress()));
}
@Override
- public void onChannelClose(String remoteAddr, Channel channel) {
+ public void onChannelClose(String remoteAddr, RemotingChannel remotingChannel) {
+ log.info("Remoting channel closed: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
+ NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel;
+ Channel channel = nettyChannel.getChannel();
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}
@Override
- public void onChannelException(String remoteAddr, Channel channel) {
+ public void onChannelException(String remoteAddr, RemotingChannel remotingChannel) {
+ log.info("Remoting channel exception: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
+ NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel;
+ Channel channel = nettyChannel.getChannel();
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}
@Override
- public void onChannelIdle(String remoteAddr, Channel channel) {
+ public void onChannelIdle(String remoteAddr, RemotingChannel remotingChannel) {
+ log.info("Remoting channel idle: {}", RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
+ NettyChannelImpl nettyChannel = (NettyChannelImpl) remotingChannel;
+ Channel channel = nettyChannel.getChannel();
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
index c90d494..407b0b6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
@@ -31,14 +31,15 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.RemotingChannel;
public class ConsumerGroupInfo {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final String groupName;
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
- private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
- new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
+ private final ConcurrentMap<RemotingChannel, ClientChannelInfo> channelInfoTable =
+ new ConcurrentHashMap<>(16);
private volatile ConsumeType consumeType;
private volatile MessageModel messageModel;
private volatile ConsumeFromWhere consumeFromWhere;
@@ -53,9 +54,9 @@ public class ConsumerGroupInfo {
}
public ClientChannelInfo findChannel(final String clientId) {
- Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
+ Iterator<Entry<RemotingChannel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
while (it.hasNext()) {
- Entry<Channel, ClientChannelInfo> next = it.next();
+ Entry<RemotingChannel, ClientChannelInfo> next = it.next();
if (next.getValue().getClientId().equals(clientId)) {
return next.getValue();
}
@@ -68,12 +69,12 @@ public class ConsumerGroupInfo {
return subscriptionTable;
}
- public ConcurrentMap<Channel, ClientChannelInfo> getChannelInfoTable() {
+ public ConcurrentMap<RemotingChannel, ClientChannelInfo> getChannelInfoTable() {
return channelInfoTable;
}
- public List<Channel> getAllChannel() {
- List<Channel> result = new ArrayList<>();
+ public List<RemotingChannel> getAllChannel() {
+ List<RemotingChannel> result = new ArrayList<>();
result.addAll(this.channelInfoTable.keySet());
@@ -83,10 +84,10 @@ public class ConsumerGroupInfo {
public List<String> getAllClientId() {
List<String> result = new ArrayList<>();
- Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
+ Iterator<Entry<RemotingChannel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
while (it.hasNext()) {
- Entry<Channel, ClientChannelInfo> entry = it.next();
+ Entry<RemotingChannel, ClientChannelInfo> entry = it.next();
ClientChannelInfo clientChannelInfo = entry.getValue();
result.add(clientChannelInfo.getClientId());
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index cb60655..f621c1d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
@@ -147,19 +148,20 @@ public class ConsumerManager {
Entry<String, ConsumerGroupInfo> next = it.next();
String group = next.getKey();
ConsumerGroupInfo consumerGroupInfo = next.getValue();
- ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
+ ConcurrentMap<RemotingChannel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable();
- Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
+ Iterator<Entry<RemotingChannel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
while (itChannel.hasNext()) {
- Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
+ Entry<RemotingChannel, ClientChannelInfo> nextChannel = itChannel.next();
ClientChannelInfo clientChannelInfo = nextChannel.getValue();
long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
log.warn(
"SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
- RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
- RemotingUtil.closeChannel(clientChannelInfo.getChannel());
+ RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel().remoteAddress()), group);
+
+ clientChannelInfo.getChannel().close();
itChannel.remove();
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
index d716a33..e2174dc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.RemotingChannel;
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
private final BrokerController brokerController;
@@ -41,9 +42,9 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen
if (args == null || args.length < 1) {
return;
}
- List<Channel> channels = (List<Channel>) args[0];
+ List<RemotingChannel> channels = (List<RemotingChannel>) args[0];
if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
- for (Channel chl : channels) {
+ for (RemotingChannel chl : channels) {
this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 61ceae5..1c9a557 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
@@ -41,15 +42,16 @@ public class ProducerManager {
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private static final int GET_AVALIABLE_CHANNEL_RETRY_COUNT = 3;
private final Lock groupChannelLock = new ReentrantLock();
- private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
- new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+ private final HashMap<String /* group name */, HashMap<RemotingChannel, ClientChannelInfo>> groupChannelTable =
+ new HashMap<>();
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
+
public ProducerManager() {
}
- public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
- HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable =
- new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+ public HashMap<String, HashMap<RemotingChannel, ClientChannelInfo>> getGroupChannelTable() {
+ HashMap<String /* group name */, HashMap<RemotingChannel, ClientChannelInfo>> newGroupChannelTable =
+ new HashMap<>();
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
@@ -68,14 +70,14 @@ public class ProducerManager {
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
- for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
+ for (final Map.Entry<String, HashMap<RemotingChannel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) {
final String group = entry.getKey();
- final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
+ final HashMap<RemotingChannel, ClientChannelInfo> chlMap = entry.getValue();
- Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
+ Iterator<Entry<RemotingChannel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
while (it.hasNext()) {
- Entry<Channel, ClientChannelInfo> item = it.next();
+ Entry<RemotingChannel, ClientChannelInfo> item = it.next();
// final Integer id = item.getKey();
final ClientChannelInfo info = item.getValue();
@@ -84,8 +86,8 @@ public class ProducerManager {
it.remove();
log.warn(
"SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
- RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
- RemotingUtil.closeChannel(info.getChannel());
+ RemotingHelper.parseChannelRemoteAddr(info.getChannel().remoteAddress()), group);
+ info.getChannel().close();
}
}
}
@@ -105,10 +107,10 @@ public class ProducerManager {
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
- for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
+ for (final Map.Entry<String, HashMap<RemotingChannel, ClientChannelInfo>> entry : this.groupChannelTable
.entrySet()) {
final String group = entry.getKey();
- final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
+ final HashMap<RemotingChannel, ClientChannelInfo> clientChannelInfoTable =
entry.getValue();
final ClientChannelInfo clientChannelInfo =
clientChannelInfoTable.remove(channel);
@@ -137,7 +139,7 @@ public class ProducerManager {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
- HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+ HashMap<RemotingChannel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null == channelTable) {
channelTable = new HashMap<>();
this.groupChannelTable.put(group, channelTable);
@@ -168,7 +170,7 @@ public class ProducerManager {
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
- HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+ HashMap<RemotingChannel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
if (old != null) {
@@ -192,11 +194,11 @@ public class ProducerManager {
}
}
- public Channel getAvaliableChannel(String groupId) {
- HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
- List<Channel> channelList = new ArrayList<Channel>();
+ public RemotingChannel getAvaliableChannel(String groupId) {
+ HashMap<RemotingChannel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
+ List<RemotingChannel> channelList = new ArrayList<>();
if (channelClientChannelInfoHashMap != null) {
- for (Channel channel : channelClientChannelInfoHashMap.keySet()) {
+ for (RemotingChannel channel : channelClientChannelInfoHashMap.keySet()) {
channelList.add(channel);
}
int size = channelList.size();
@@ -206,7 +208,7 @@ public class ProducerManager {
}
int index = positiveAtomicCounter.incrementAndGet() % size;
- Channel channel = channelList.get(index);
+ RemotingChannel channel = channelList.get(index);
int count = 0;
boolean isOk = channel.isActive() && channel.isWritable();
while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
index 4c409f2..4eee9db 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedReques
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -61,7 +62,7 @@ public class Broker2Client {
public void checkProducerTransactionState(
final String group,
- final Channel channel,
+ final RemotingChannel channel,
final CheckTransactionStateRequestHeader requestHeader,
final MessageExt messageExt) throws Exception {
RemotingCommand request =
@@ -74,14 +75,14 @@ public class Broker2Client {
}
}
- public RemotingCommand callClient(final Channel channel,
+ public RemotingCommand callClient(final RemotingChannel channel,
final RemotingCommand request
) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
}
public void notifyConsumerIdsChanged(
- final Channel channel,
+ final RemotingChannel channel,
final String consumerGroup) {
if (null == consumerGroup) {
log.error("notifyConsumerIdsChanged consumerGroup is null");
@@ -175,9 +176,9 @@ public class Broker2Client {
this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
- ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
+ ConcurrentMap<RemotingChannel, ClientChannelInfo> channelInfoTable =
consumerGroupInfo.getChannelInfoTable();
- for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
+ for (Map.Entry<RemotingChannel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
int version = entry.getValue().getVersion();
if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
try {
@@ -193,7 +194,7 @@ public class Broker2Client {
response.setRemark("the client does not support this feature. version="
+ MQVersion.getVersionDesc(version));
log.warn("[reset-offset] the client does not support this feature. version={}",
- RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
+ RemotingHelper.parseChannelRemoteAddr(entry.getKey().remoteAddress()), MQVersion.getVersionDesc(version));
return response;
}
}
@@ -238,7 +239,7 @@ public class Broker2Client {
Map<String, Map<MessageQueue, Long>> consumerStatusTable =
new HashMap<String, Map<MessageQueue, Long>>();
- ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
+ ConcurrentMap<RemotingChannel, ClientChannelInfo> channelInfoTable =
this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
if (null == channelInfoTable || channelInfoTable.isEmpty()) {
result.setCode(ResponseCode.SYSTEM_ERROR);
@@ -246,7 +247,7 @@ public class Broker2Client {
return result;
}
- for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
+ for (Map.Entry<RemotingChannel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
int version = entry.getValue().getVersion();
String clientId = entry.getValue().getClientId();
if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
@@ -254,7 +255,7 @@ public class Broker2Client {
result.setRemark("the client does not support this feature. version="
+ MQVersion.getVersionDesc(version));
log.warn("[get-consumer-status] the client does not support this feature. version={}",
- RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
+ RemotingHelper.parseChannelRemoteAddr(entry.getKey().remoteAddress()), MQVersion.getVersionDesc(version));
return result;
} else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) {
try {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 9edfcb8..2c204ce 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -55,7 +55,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class BrokerOuterAPI {
@@ -66,11 +66,11 @@ public class BrokerOuterAPI {
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
- public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
+ public BrokerOuterAPI(final ClientConfig nettyClientConfig) {
this(nettyClientConfig, null);
}
- public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
+ public BrokerOuterAPI(final ClientConfig nettyClientConfig, RPCHook rpcHook) {
this.remotingClient = RemotingClientFactory.createInstance().init(nettyClientConfig, null);
this.remotingClient.registerRPCHook(rpcHook);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index aa072e8..bd7625a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -42,7 +42,7 @@ import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.utils.ChannelUtil;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
@@ -52,7 +52,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
-public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
+public abstract class AbstractSendMessageProcessor implements RequestProcessor {
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected final static int DLQ_NUMS_PER_GROUP = 1;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 341907a..a83c488 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
import java.net.UnknownHostException;
@@ -103,10 +102,12 @@ import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsSnapshot;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.filter.util.BitsArray;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
@@ -117,7 +118,7 @@ import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
-public class AdminBrokerProcessor implements NettyRequestProcessor {
+public class AdminBrokerProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -126,8 +127,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
@Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx,
+ public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel;
+ ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
switch (request.getCode()) {
case RequestCode.UPDATE_AND_CREATE_TOPIC:
return this.updateAndCreateTopic(ctx, request);
@@ -593,14 +596,14 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
bodydata.setMessageModel(consumerGroupInfo.getMessageModel());
bodydata.getSubscriptionTable().putAll(consumerGroupInfo.getSubscriptionTable());
- Iterator<Map.Entry<Channel, ClientChannelInfo>> it = consumerGroupInfo.getChannelInfoTable().entrySet().iterator();
+ Iterator<Map.Entry<RemotingChannel, ClientChannelInfo>> it = consumerGroupInfo.getChannelInfoTable().entrySet().iterator();
while (it.hasNext()) {
ClientChannelInfo info = it.next().getValue();
Connection connection = new Connection();
connection.setClientId(info.getClientId());
connection.setLanguage(info.getLanguage());
connection.setVersion(info.getVersion());
- connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel()));
+ connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel().remoteAddress()));
bodydata.getConnectionSet().add(connection);
}
@@ -625,17 +628,17 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
(GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
ProducerConnection bodydata = new ProducerConnection();
- HashMap<Channel, ClientChannelInfo> channelInfoHashMap =
+ HashMap<RemotingChannel, ClientChannelInfo> channelInfoHashMap =
this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup());
if (channelInfoHashMap != null) {
- Iterator<Map.Entry<Channel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator();
+ Iterator<Map.Entry<RemotingChannel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator();
while (it.hasNext()) {
ClientChannelInfo info = it.next().getValue();
Connection connection = new Connection();
connection.setClientId(info.getClientId());
connection.setLanguage(info.getLanguage());
connection.setVersion(info.getVersion());
- connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel()));
+ connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(info.getChannel().remoteAddress()));
bodydata.getConnectionSet().add(connection);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index f0d155f..03dec03 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -37,12 +37,15 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.filter.FilterFactory;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
+import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
+import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-public class ClientManageProcessor implements NettyRequestProcessor {
+public class ClientManageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -51,8 +54,11 @@ public class ClientManageProcessor implements NettyRequestProcessor {
}
@Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ public RemotingCommand processRequest(RemotingChannel remotingChannel,
+ RemotingCommand request) throws RemotingCommandException {
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel;
+ ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
+
switch (request.getCode()) {
case RequestCode.HEART_BEAT:
return this.heartBeat(ctx, request);
@@ -76,7 +82,7 @@ public class ClientManageProcessor implements NettyRequestProcessor {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
log.info("heart beat request:{}", heartbeatData);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
- ctx.channel(),
+ new NettyChannelImpl(ctx.channel()),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
@@ -137,7 +143,7 @@ public class ClientManageProcessor implements NettyRequestProcessor {
.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
- ctx.channel(),
+ new NettyChannelImpl(ctx.channel()),
requestHeader.getClientID(),
request.getLanguage(),
request.getVersion());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 028d21b..421c531 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -32,12 +32,14 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHea
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-public class ConsumerManageProcessor implements NettyRequestProcessor {
+public class ConsumerManageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -47,8 +49,11 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
}
@Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ public RemotingCommand processRequest(RemotingChannel remotingChannel,
+ RemotingCommand request) throws RemotingCommandException {
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
+ ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
+
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index c9e85ed..8d72ac1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -30,9 +30,11 @@ import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
@@ -41,7 +43,7 @@ import org.apache.rocketmq.store.config.BrokerRole;
/**
* EndTransaction processor: process commit and rollback message
*/
-public class EndTransactionProcessor implements NettyRequestProcessor {
+public class EndTransactionProcessor implements RequestProcessor {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
private final BrokerController brokerController;
@@ -50,11 +52,14 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
}
@Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
- RemotingCommandException {
+ public RemotingCommand processRequest(RemotingChannel remotingChannel,
+ RemotingCommand request) throws RemotingCommandException {
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
+ ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
+
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
- (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
+ (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.info("Transaction request:{}", requestHeader);
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
index b0f0a05..f47a453 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
@@ -16,15 +16,16 @@
*/
package org.apache.rocketmq.broker.processor;
-import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-public class ForwardRequestProcessor implements NettyRequestProcessor {
+public class ForwardRequestProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -34,7 +35,8 @@ public class ForwardRequestProcessor implements NettyRequestProcessor {
}
@Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+ public RemotingCommand processRequest(RemotingChannel remotingChannel,
+ RemotingCommand request) throws RemotingCommandException {
return null;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 10c0112..391b599 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -53,10 +53,12 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
@@ -66,7 +68,7 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-public class PullMessageProcessor implements NettyRequestProcessor {
+public class PullMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private List<ConsumeMessageHook> consumeMessageHookList;
@@ -76,8 +78,11 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
@Override
- public RemotingCommand processRequest(final ChannelHandlerContext ctx,
+ public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
+ ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
+
return this.processRequest(ctx.channel(), request, true);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
index a5ca872..5d7f794 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
@@ -32,13 +32,15 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
-public class QueryMessageProcessor implements NettyRequestProcessor {
+public class QueryMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -48,8 +50,10 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
}
@Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ public RemotingCommand processRequest(RemotingChannel remotingChannel,
+ RemotingCommand request) throws RemotingCommandException {
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
+ ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
switch (request.getCode()) {
case RequestCode.QUERY_MESSAGE:
return this.queryMessage(ctx, request);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 5f1c2f1..6bb378e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -41,8 +41,10 @@ import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
@@ -53,7 +55,7 @@ import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
-public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
+public class SendMessageProcessor extends AbstractSendMessageProcessor implements RequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
@@ -62,8 +64,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
@Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx,
+ public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
+ ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
+
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
index 788c498..c08ebcd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java
@@ -48,10 +48,12 @@ import org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
@@ -61,7 +63,7 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-public class SnodePullMessageProcessor implements NettyRequestProcessor {
+public class SnodePullMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private List<ConsumeMessageHook> consumeMessageHookList;
@@ -71,8 +73,11 @@ public class SnodePullMessageProcessor implements NettyRequestProcessor {
}
@Override
- public RemotingCommand processRequest(final ChannelHandlerContext ctx,
+ public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
+ ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
+
return this.processRequest(ctx.channel(), request, true);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
index 659c6af..152e067 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.remoting.RemotingChannel;
public abstract class AbstractTransactionalMessageCheckListener {
private static final InternalLogger LOGGER = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
@@ -63,7 +64,7 @@ public abstract class AbstractTransactionalMessageCheckListener {
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
- Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
+ RemotingChannel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index 56abf08..84139b4 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -20,8 +20,8 @@ package org.apache.rocketmq.broker;
import java.io.File;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Test;
@@ -34,8 +34,8 @@ public class BrokerControllerTest {
public void testBrokerRestart() throws Exception {
BrokerController brokerController = new BrokerController(
new BrokerConfig(),
- new NettyServerConfig(),
- new NettyClientConfig(),
+ new ServerConfig(),
+ new ClientConfig(),
new MessageStoreConfig());
assertThat(brokerController.initialize());
brokerController.start();
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
index 30fe3a2..5ca8a86 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -31,8 +31,8 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.store.MessageStore;
@@ -57,7 +57,7 @@ public class BrokerOuterAPITest {
@Mock
private ChannelHandlerContext handlerContext;
@Spy
- private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+ private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(), new MessageStoreConfig());
@Mock
private MessageStore messageStore;
private String clusterName = "clusterName";
@@ -75,7 +75,7 @@ public class BrokerOuterAPITest {
private BrokerOuterAPI brokerOuterAPI;
public void init() throws Exception {
- brokerOuterAPI = new BrokerOuterAPI(new NettyClientConfig(), null);
+ brokerOuterAPI = new BrokerOuterAPI(new ClientConfig(), null);
Field field = BrokerOuterAPI.class.getDeclaredField("remotingClient");
field.setAccessible(true);
field.set(brokerOuterAPI, nettyRemotingClient);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
index 08dbb9c..2b01e7a 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
@@ -20,6 +20,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.lang.reflect.Field;
import java.util.HashMap;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -37,7 +38,7 @@ public class ProducerManagerTest {
private ClientChannelInfo clientInfo;
@Mock
- private Channel channel;
+ private RemotingChannel channel;
@Before
public void init() {
@@ -54,7 +55,7 @@ public class ProducerManagerTest {
field.setAccessible(true);
long CHANNEL_EXPIRED_TIMEOUT = field.getLong(producerManager);
clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - CHANNEL_EXPIRED_TIMEOUT - 10);
- when(channel.close()).thenReturn(mock(ChannelFuture.class));
+// when(channel.close()).thenReturn(mock(ChannelFuture.class));
producerManager.scanNotActiveChannel();
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
}
@@ -63,14 +64,14 @@ public class ProducerManagerTest {
public void doChannelCloseEvent() throws Exception {
producerManager.registerProducer(group, clientInfo);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
- producerManager.doChannelCloseEvent("127.0.0.1", channel);
+// producerManager.doChannelCloseEvent("127.0.0.1", channel);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
}
@Test
public void testRegisterProducer() throws Exception {
producerManager.registerProducer(group, clientInfo);
- HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
+ HashMap<RemotingChannel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
}
@@ -78,7 +79,7 @@ public class ProducerManagerTest {
@Test
public void unregisterProducer() throws Exception {
producerManager.registerProducer(group, clientInfo);
- HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
+ HashMap<RemotingChannel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
index 9ee9035..1be0309 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.broker.processor;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap;
import java.util.UUID;
@@ -28,9 +27,12 @@ import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
+import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -43,17 +45,16 @@ import org.mockito.junit.MockitoJUnitRunner;
import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ClientManageProcessorTest {
private ClientManageProcessor clientManageProcessor;
@Spy
- private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+ private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(), new MessageStoreConfig());
@Mock
private ChannelHandlerContext handlerContext;
@Mock
- private Channel channel;
+ private NettyChannelImpl channel;
private ClientChannelInfo clientChannelInfo;
private String clientId = UUID.randomUUID().toString();
@@ -62,7 +63,7 @@ public class ClientManageProcessorTest {
@Before
public void init() {
- when(handlerContext.channel()).thenReturn(channel);
+// when(handlerContext.channel()).thenReturn(channel);
clientManageProcessor = new ClientManageProcessor(brokerController);
clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100);
brokerController.getProducerManager().registerProducer(group, clientChannelInfo);
@@ -81,12 +82,13 @@ public class ClientManageProcessorTest {
@Test
public void processRequest_UnRegisterProducer() throws Exception {
brokerController.getProducerManager().registerProducer(group, clientChannelInfo);
- HashMap<Channel, ClientChannelInfo> channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group);
+ HashMap<RemotingChannel, ClientChannelInfo> channelMap = brokerController.getProducerManager().getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientChannelInfo);
RemotingCommand request = createUnRegisterProducerCommand();
- RemotingCommand response = clientManageProcessor.processRequest(handlerContext, request);
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = clientManageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
@@ -100,8 +102,9 @@ public class ClientManageProcessorTest {
assertThat(consumerGroupInfo).isNotNull();
RemotingCommand request = createUnRegisterConsumerCommand();
- RemotingCommand response = clientManageProcessor.processRequest(handlerContext, request);
- assertThat(response).isNotNull();
+
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = clientManageProcessor.processRequest(nettyChannelHandlerContext, request); assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
consumerGroupInfo = brokerController.getConsumerManager().getConsumerGroupInfo(group);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
index 7d8aa13..82c0a7d 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
@@ -29,8 +29,9 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
@@ -60,7 +61,7 @@ public class EndTransactionProcessorTest {
@Spy
private BrokerController
- brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(),
+ brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(),
new MessageStoreConfig());
@Mock
@@ -76,7 +77,7 @@ public class EndTransactionProcessorTest {
endTransactionProcessor = new EndTransactionProcessor(brokerController);
}
- private OperationResult createResponse(int status){
+ private OperationResult createResponse(int status) {
OperationResult response = new OperationResult();
response.setPrepareMessage(createDefaultMessageExt());
response.setResponseCode(status);
@@ -90,7 +91,9 @@ public class EndTransactionProcessorTest {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, false);
- RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
+
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = endTransactionProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@@ -100,14 +103,16 @@ public class EndTransactionProcessorTest {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult
(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, true);
- RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = endTransactionProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@Test
public void testProcessRequest_NotType() throws RemotingCommandException {
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_NOT_TYPE, true);
- RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = endTransactionProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNull();
}
@@ -115,7 +120,8 @@ public class EndTransactionProcessorTest {
public void testProcessRequest_RollBack() throws RemotingCommandException {
when(transactionMsgService.rollbackMessage(any(EndTransactionRequestHeader.class))).thenReturn(createResponse(ResponseCode.SUCCESS));
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE, true);
- RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = endTransactionProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index dc7b567..48d09bc 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.broker.processor;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -38,9 +37,11 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
@@ -65,7 +66,7 @@ import static org.mockito.Mockito.when;
public class PullMessageProcessorTest {
private PullMessageProcessor pullMessageProcessor;
@Spy
- private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+ private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(), new MessageStoreConfig());
@Mock
private ChannelHandlerContext handlerContext;
@Mock
@@ -78,9 +79,9 @@ public class PullMessageProcessorTest {
public void init() {
brokerController.setMessageStore(messageStore);
pullMessageProcessor = new PullMessageProcessor(brokerController);
- Channel mockChannel = mock(Channel.class);
+ RemotingChannel mockChannel = mock(RemotingChannel.class);
when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
- when(handlerContext.channel()).thenReturn(mockChannel);
+// when(handlerContext.channel()).thenReturn(mockChannel);
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig());
clientChannelInfo = new ClientChannelInfo(mockChannel);
ConsumerData consumerData = createConsumerData(group, topic);
@@ -98,7 +99,8 @@ public class PullMessageProcessorTest {
public void testProcessRequest_TopicNotExist() throws RemotingCommandException {
brokerController.getTopicConfigManager().getTopicConfigTable().remove(topic);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
- RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
assertThat(response.getRemark()).contains("topic[" + topic + "] not exist");
@@ -108,7 +110,9 @@ public class PullMessageProcessorTest {
public void testProcessRequest_SubNotExist() throws RemotingCommandException {
brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, false);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
- RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
+
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_NOT_EXIST);
assertThat(response.getRemark()).contains("consumer's group info not exist");
@@ -118,7 +122,8 @@ public class PullMessageProcessorTest {
public void testProcessRequest_SubNotLatest() throws RemotingCommandException {
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
request.addExtField("subVersion", String.valueOf(101));
- RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_NOT_LATEST);
assertThat(response.getRemark()).contains("subscription not latest");
@@ -130,7 +135,9 @@ public class PullMessageProcessorTest {
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
- RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
+
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@@ -159,7 +166,9 @@ public class PullMessageProcessorTest {
consumeMessageHookList.add(consumeMessageHook);
pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
- RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
+
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(messageContext[0]).isNotNull();
@@ -175,7 +184,9 @@ public class PullMessageProcessorTest {
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
- RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
+
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_RETRY_IMMEDIATELY);
}
@@ -187,7 +198,9 @@ public class PullMessageProcessorTest {
when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
- RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
+
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = pullMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.PULL_OFFSET_MOVED);
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index 2f56422..ac8a106 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -33,8 +33,9 @@ import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHead
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
@@ -70,7 +71,7 @@ public class SendMessageProcessorTest {
@Mock
private ChannelHandlerContext handlerContext;
@Spy
- private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+ private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(), new ClientConfig(), new MessageStoreConfig());
@Mock
private MessageStore messageStore;
@@ -181,7 +182,9 @@ public class SendMessageProcessorTest {
final RemotingCommand request = createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK);
sendMessageProcessor = new SendMessageProcessor(brokerController);
- final RemotingCommand response = sendMessageProcessor.processRequest(handlerContext, request);
+
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand response = sendMessageProcessor.processRequest(nettyChannelHandlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@@ -199,7 +202,10 @@ public class SendMessageProcessorTest {
return null;
}
}).when(handlerContext).writeAndFlush(any(Object.class));
- RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
+
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand responseToReturn = sendMessageProcessor.processRequest(nettyChannelHandlerContext, request);
+
if (responseToReturn != null) {
assertThat(response[0]).isNull();
response[0] = responseToReturn;
@@ -207,6 +213,7 @@ public class SendMessageProcessorTest {
assertThat(response[0].getCode()).isEqualTo(ResponseCode.SUCCESS);
}
+
private RemotingCommand createSendTransactionMsgCommand(int requestCode) {
SendMessageRequestHeader header = createSendMsgRequestHeader();
int sysFlag = header.getSysFlag();
@@ -267,7 +274,9 @@ public class SendMessageProcessorTest {
return null;
}
}).when(handlerContext).writeAndFlush(any(Object.class));
- RemotingCommand responseToReturn = sendMessageProcessor.processRequest(handlerContext, request);
+
+ NettyChannelHandlerContextImpl nettyChannelHandlerContext = new NettyChannelHandlerContextImpl(handlerContext);
+ RemotingCommand responseToReturn = sendMessageProcessor.processRequest(nettyChannelHandlerContext, request);
if (responseToReturn != null) {
assertThat(response[0]).isNull();
response[0] = responseToReturn;
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java
index 17bf00b..63844f9 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListenerTest.java
@@ -21,8 +21,8 @@ import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
@@ -37,8 +37,8 @@ public class DefaultTransactionalMessageCheckListenerTest {
private DefaultTransactionalMessageCheckListener listener;
@Spy
- private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(),
- new NettyClientConfig(), new MessageStoreConfig());
+ private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(),
+ new ClientConfig(), new MessageStoreConfig());
@Before
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java
index b1c669c..f605986 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java
@@ -25,8 +25,8 @@ import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.GetMessageResult;
@@ -61,8 +61,8 @@ public class TransactionalMessageBridgeTest {
private TransactionalMessageBridge transactionBridge;
@Spy
- private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(),
- new NettyClientConfig(), new MessageStoreConfig());
+ private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(),
+ new ClientConfig(), new MessageStoreConfig());
@Mock
private MessageStore messageStore;
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImplTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImplTest.java
index 47eccbe..d7df525 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImplTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImplTest.java
@@ -30,8 +30,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner;
@@ -70,8 +70,8 @@ public class TransactionalMessageServiceImplTest {
private TransactionalMessageBridge bridge;
@Spy
- private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(),
- new NettyClientConfig(), new MessageStoreConfig());
+ private BrokerController brokerController = new BrokerController(new BrokerConfig(), new ServerConfig(),
+ new ClientConfig(), new MessageStoreConfig());
@Mock
private AbstractTransactionalMessageCheckListener listener;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/ClientConfig.java
similarity index 100%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/ClientConfig.java
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingChannel.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingChannel.java
new file mode 100644
index 0000000..c8f00f7
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingChannel.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+
+public interface RemotingChannel {
+ /**
+ * Returns the local address where this {@code RemotingChannel} is bound to. The returned
+ * {@link SocketAddress} is supposed to be down-cast into more concrete
+ * type such as {@link InetSocketAddress} to retrieve the detailed
+ * information.
+ *
+ * @return the local address of this channel.
+ * {@code null} if this channel is not bound.
+ */
+ SocketAddress localAddress();
+
+ /**
+ * Returns the remote address where this {@code RemotingChannel} is connected to. The
+ * returned {@link SocketAddress} is supposed to be down-cast into more
+ * concrete type such as {@link InetSocketAddress} to retrieve the detailed
+ * information.
+ *
+ * @return the remote address of this channel.
+ * {@code null} if this channel is not connected.
+ */
+ SocketAddress remoteAddress();
+
+ /**
+ * Returns {@code true} if and only if the I/O thread will perform the
+ * requested write operation immediately. Any write requests made when
+ * this method returns {@code false} are queued until the I/O thread is
+ * ready to process the queued write requests.
+ */
+ boolean isWritable();
+
+ /**
+ * Returns {@code true} if the {@code RemotingChannel} is active and so connected.
+ */
+ boolean isActive();
+
+ /**
+ * Requests to close the {@code RemotingChannel} immediately.
+ */
+ void close();
+
+ /**
+ * Writes a response {@code RemotingCommand} to remote.
+ *
+ * @param command the response command
+ */
+ void reply(RemotingCommand command);
+
+ /**
+ * Writes a response {@code ChunkRegion} to remote.
+ *
+ * @param fileRegion the response chunk file region
+ */
+ void reply(ChunkRegion fileRegion);
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RequestProcessor.java
similarity index 100%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
copy to remoting/src/main/java/org/apache/rocketmq/remoting/RequestProcessor.java
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/ServerConfig.java
similarity index 100%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/ServerConfig.java
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java
similarity index 67%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java
index 040f768..9eb489a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java
@@ -1,4 +1,4 @@
-/*
+package org.apache.rocketmq.remoting.netty;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,17 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.netty;
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
-/**
- * Common remoting command processor
- */
-public interface NettyRequestProcessor {
- RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
- throws Exception;
-
- boolean rejectRequest();
+public class NettyChannelHandlerContextImpl {
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java
new file mode 100644
index 0000000..e4be7ca
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.channel.Channel;
+import java.net.SocketAddress;
+import org.apache.rocketmq.remoting.api.channel.ChunkRegion;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+
+public class NettyChannelImpl implements RemotingChannel {
+ private final io.netty.channel.Channel channel;
+
+ public NettyChannelImpl(Channel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public SocketAddress localAddress() {
+ return channel.localAddress();
+ }
+
+ @Override
+ public SocketAddress remoteAddress() {
+ return channel.remoteAddress();
+ }
+
+ @Override
+ public boolean isWritable() {
+ return channel.isWritable();
+ }
+
+ @Override
+ public boolean isActive() {
+ return channel.isActive();
+ }
+
+ @Override
+ public void close() {
+ channel.close();
+ }
+
+ @Override
+ public void reply(final RemotingCommand command) {
+ channel.writeAndFlush(command);
+ }
+
+ @Override
+ public void reply(final ChunkRegion fileRegion) {
+ channel.writeAndFlush(fileRegion);
+ }
+
+ public io.netty.channel.Channel getChannel() {
+ return channel;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ final NettyChannelImpl that = (NettyChannelImpl) o;
+
+ return channel != null ? channel.equals(that.channel) : that.channel == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ return channel != null ? channel.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return "NettyChannelImpl [channel=" + channel + "]";
+ }
+}