You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:25 UTC
[rocketmq] 01/14: Refactor remoting module
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit fa2f611470e9d1ca168270bb0d18217c96001897
Author: duhengforever <du...@gmail.com>
AuthorDate: Wed Dec 12 11:30:56 2018 +0800
Refactor remoting module
---
.../apache/rocketmq/broker/BrokerController.java | 10 +-
.../rocketmq/broker/client/ClientChannelInfo.java | 2 +-
.../broker/filter/ConsumerFilterManager.java | 2 +-
.../broker/offset/ConsumerOffsetManager.java | 2 +-
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 12 +-
.../broker/pagecache/ManyMessageTransfer.java | 30 +
.../broker/pagecache/OneMessageTransfer.java | 29 +
.../broker/pagecache/QueryMessageTransfer.java | 29 +
.../broker/processor/AdminBrokerProcessor.java | 4 +-
.../subscription/SubscriptionGroupManager.java | 2 +-
.../apache/rocketmq/broker/BrokerOuterAPITest.java | 2 +-
.../processor/ClientManageProcessorTest.java | 6 +-
.../processor/EndTransactionProcessorTest.java | 2 +-
.../broker/processor/PullMessageProcessorTest.java | 2 +-
.../broker/processor/SendMessageProcessorTest.java | 6 +-
.../org/apache/rocketmq/client/ClientConfig.java | 2 +-
.../consumer/store/OffsetSerializeWrapper.java | 2 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 7 +-
.../client/producer/DefaultMQProducer.java | 1 -
.../client/producer/DefaultMQProducerTest.java | 2 +-
.../org/apache/rocketmq/common/DataVersion.java | 2 +-
.../apache/rocketmq/common/admin/ConsumeStats.java | 2 +-
.../rocketmq/common/admin/TopicStatsTable.java | 2 +-
.../rocketmq/common/protocol/MQProtosHelper.java | 49 --
.../common/protocol/body/BrokerStatsData.java | 2 +-
.../protocol/body/CheckClientRequestBody.java | 2 +-
.../rocketmq/common/protocol/body/ClusterInfo.java | 2 +-
.../rocketmq/common/protocol/body/Connection.java | 2 +-
.../common/protocol/body/ConsumeByWho.java | 2 +-
.../body/ConsumeMessageDirectlyResult.java | 2 +-
.../common/protocol/body/ConsumeStatsList.java | 2 +-
.../common/protocol/body/ConsumerConnection.java | 2 +-
.../body/ConsumerOffsetSerializeWrapper.java | 2 +-
.../common/protocol/body/ConsumerRunningInfo.java | 2 +-
.../protocol/body/GetConsumerStatusBody.java | 2 +-
.../rocketmq/common/protocol/body/GroupList.java | 2 +-
.../rocketmq/common/protocol/body/KVTable.java | 2 +-
.../common/protocol/body/LockBatchRequestBody.java | 2 +-
.../protocol/body/LockBatchResponseBody.java | 2 +-
.../common/protocol/body/ProducerConnection.java | 2 +-
.../body/QueryConsumeQueueResponseBody.java | 2 +-
.../protocol/body/QueryConsumeTimeSpanBody.java | 2 +-
.../protocol/body/QueryCorrectionOffsetBody.java | 2 +-
.../common/protocol/body/RegisterBrokerBody.java | 2 +-
.../common/protocol/body/ResetOffsetBody.java | 2 +-
.../common/protocol/body/ResetOffsetBodyForC.java | 2 +-
.../protocol/body/SubscriptionGroupWrapper.java | 2 +-
.../protocol/body/TopicConfigSerializeWrapper.java | 2 +-
.../rocketmq/common/protocol/body/TopicList.java | 2 +-
.../protocol/body/UnlockBatchRequestBody.java | 2 +-
.../header/GetConsumerListByGroupResponseBody.java | 2 +-
.../common/protocol/heartbeat/HeartbeatData.java | 2 +-
.../common/protocol/route/TopicRouteData.java | 2 +-
.../common/protocol/topic/OffsetMovedEvent.java | 2 +-
.../common/protocol/ConsumeStatusTest.java | 2 +-
.../apache/rocketmq/namesrv/NamesrvController.java | 10 +-
.../namesrv/kvconfig/KVConfigSerializeWrapper.java | 2 +-
.../namesrv/processor/DefaultRequestProcessor.java | 4 +-
.../rocketmq/consumer/PullConsumerImpl.java | 2 +-
.../rocketmq/consumer/PushConsumerImpl.java | 2 +-
.../rocketmq/producer/AbstractOMSProducer.java | 2 +-
pom.xml | 7 +-
remoting/pom.xml | 4 +
.../apache/rocketmq/remoting/RemotingClient.java | 3 +
.../rocketmq/remoting/RemotingClientFactory.java | 33 +
.../apache/rocketmq/remoting/RemotingServer.java | 5 +
.../rocketmq/remoting/RemotingServerFactory.java | 40 ++
.../rocketmq/remoting/common/RemotingHelper.java | 92 ---
.../ChannelMetrics.java} | 13 +-
.../remoting/netty/ChannelStatisticsHandler.java | 60 ++
.../rocketmq/remoting/netty/CodecHelper.java | 283 ++++++++
.../rocketmq/remoting/netty/FileRegionEncoder.java | 2 +-
.../rocketmq/remoting/netty/NettyLogger.java | 28 +-
.../remoting/netty/NettyRemotingAbstract.java | 67 +-
.../remoting/netty/NettyRemotingClient.java | 710 ---------------------
.../rocketmq/remoting/netty/NettyServerConfig.java | 29 +-
.../remoting/protocol/RemotingCommand.java | 346 ++--------
.../remoting/protocol/RemotingCommandType.java | 2 +-
.../{protocol => serialize}/LanguageCode.java | 5 +-
.../remoting/serialize/MsgPackSerializable.java | 40 ++
.../RemotingSerializable.java | 25 +-
.../RocketMQSerializable.java | 26 +-
.../{protocol => serialize}/SerializeType.java | 8 +-
.../Serializer.java} | 16 +-
.../remoting/serialize/SerializerFactory.java | 26 +
.../transport/NettyRemotingClientAbstract.java | 411 ++++++++++++
.../transport/NettyRemotingServerAbstract.java | 93 +++
.../remoting/transport/http2/Http2ClientImpl.java | 293 +++++++++
.../remoting/transport/http2/Http2Handler.java | 143 +++++
.../remoting/transport/http2/Http2ServerImpl.java | 242 +++++++
.../rocketmq}/NettyDecoder.java | 6 +-
.../rocketmq}/NettyEncoder.java | 10 +-
.../transport/rocketmq/NettyRemotingClient.java | 315 +++++++++
.../rocketmq}/NettyRemotingServer.java | 249 ++------
.../apache/rocketmq/remoting/util/JvmUtils.java | 94 +++
.../rocketmq/remoting/util/RemotingUtil.java | 6 +
.../rocketmq/remoting/util/ServiceProvider.java | 200 ++++++
.../apache/rocketmq/remoting/util/ThreadUtils.java | 181 ++++++
.../org.apache.rocketmq.remoting.RemotingClient | 2 +
.../org.apache.rocketmq.remoting.RemotingServer | 2 +
.../rocketmq/remoting/RemotingServerTest.java | 6 +-
.../java/org/apache/rocketmq/remoting/TlsTest.java | 4 +-
.../remoting/netty/NettyRemotingAbstractTest.java | 1 +
.../remoting/netty/NettyRemotingClientTest.java | 1 +
.../remoting/protocol/RemotingCommandTest.java | 95 +--
.../protocol/RemotingSerializableTest.java | 1 +
.../protocol/RocketMQSerializableTest.java | 3 +
.../schedule/DelayOffsetSerializeWrapper.java | 2 +-
.../tools/command/topic/AllocateMQSubCommand.java | 2 +-
.../rocketmq/tools/monitor/MonitorServiceTest.java | 2 +-
110 files changed, 2993 insertions(+), 1534 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index e7ef46d..3e9762a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -84,13 +84,14 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.RemotingServerFactory;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
import org.apache.rocketmq.srvutil.FileWatchService;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
@@ -245,10 +246,15 @@ public class BrokerController {
result = result && this.messageStore.load();
if (result) {
- this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
+ this.remotingServer = RemotingServerFactory.getRemotingServer();
+ this.remotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
+// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
+// this.fastRemotingServer = RemotingServerFactory.getRemotingServer();
+// this.fastRemotingServer.init(this.nettyServerConfig, this.clientHousekeepingService);
+
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
index edcba96..7c5e25b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
public class ClientChannelInfo {
private final Channel channel;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
index e9c5286..fbaf337 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
@@ -29,7 +29,7 @@ import org.apache.rocketmq.filter.FilterFactory;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.filter.util.BloomFilter;
import org.apache.rocketmq.filter.util.BloomFilterData;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import java.util.Collection;
import java.util.HashSet;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index ebc9dd8..7c415f3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -31,7 +31,7 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumerOffsetManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 4dee01c..d157021 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -31,8 +31,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
@@ -47,15 +45,17 @@ import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRespon
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.RemotingClientFactory;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class BrokerOuterAPI {
@@ -71,7 +71,9 @@ public class BrokerOuterAPI {
}
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
- this.remotingClient = new NettyRemotingClient(nettyClientConfig);
+ this.remotingClient = RemotingClientFactory.getClient();
+ this.remotingClient.init(nettyClientConfig, null);
+// this.remotingClient = new NettyRemotingClient(nettyClientConfig);
this.remotingClient.registerRPCHook(rpcHook);
}
@@ -147,7 +149,7 @@ public class BrokerOuterAPI {
@Override
public void run() {
try {
- RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
+ RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
index 968bcfb..849d205 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
@@ -84,4 +84,34 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil
protected void deallocate() {
this.getMessageResult.release();
}
+
+
+
+ @Override
+ public long transferred() {
+ return transferred;
+ }
+
+
+ @Override
+ public FileRegion retain() {
+ super.retain();
+ return this;
+ }
+
+ @Override
+ public FileRegion retain(int increment) {
+ super.retain(increment);
+ return this;
+ }
+
+ @Override
+ public FileRegion touch() {
+ return this;
+ }
+
+ @Override
+ public FileRegion touch(Object hint) {
+ return this;
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
index b795d2d..18b57ad 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
@@ -73,4 +73,33 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File
protected void deallocate() {
this.selectMappedBufferResult.release();
}
+
+
+ @Override
+ public long transferred() {
+ return transferred;
+ }
+
+
+ @Override
+ public FileRegion retain() {
+ super.retain();
+ return this;
+ }
+
+ @Override
+ public FileRegion retain(int increment) {
+ super.retain(increment);
+ return this;
+ }
+
+ @Override
+ public FileRegion touch() {
+ return this;
+ }
+
+ @Override
+ public FileRegion touch(Object hint) {
+ return this;
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
index e8f3099..b02fb07 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
@@ -84,4 +84,33 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi
protected void deallocate() {
this.queryMessageResult.release();
}
+
+
+ @Override
+ public long transferred() {
+ return transferred;
+ }
+
+
+ @Override
+ public FileRegion retain() {
+ super.retain();
+ return this;
+ }
+
+ @Override
+ public FileRegion retain(int increment) {
+ super.retain(increment);
+ return this;
+ }
+
+ @Override
+ public FileRegion touch() {
+ return this;
+ }
+
+ @Override
+ public FileRegion touch(Object hint) {
+ return this;
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 73fe439..341907a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -107,9 +107,9 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index 41f7a8a..3df07f8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -29,7 +29,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class SubscriptionGroupManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
index 68d58ef..30fe3a2 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -32,9 +32,9 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Test;
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
index 147c732..9ee9035 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
@@ -31,7 +31,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Before;
@@ -115,7 +115,7 @@ public class ClientManageProcessorTest {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
request.setLanguage(LanguageCode.JAVA);
request.setVersion(100);
- request.makeCustomHeaderToNet();
+// request.makeCustomHeaderToNet();
return request;
}
@@ -126,7 +126,7 @@ public class ClientManageProcessorTest {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, requestHeader);
request.setLanguage(LanguageCode.JAVA);
request.setVersion(100);
- request.makeCustomHeaderToNet();
+// request.makeCustomHeaderToNet();
return request;
}
}
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
index 019cc6a..7d8aa13 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/EndTransactionProcessorTest.java
@@ -146,7 +146,7 @@ public class EndTransactionProcessorTest {
private RemotingCommand createEndTransactionMsgCommand(int status, boolean isCheckMsg) {
EndTransactionRequestHeader header = createEndTransactionRequestHeader(status, isCheckMsg);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, header);
- request.makeCustomHeaderToNet();
+// request.makeCustomHeaderToNet();
return request;
}
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index c96f708..dc7b567 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -204,7 +204,7 @@ public class PullMessageProcessorTest {
requestHeader.setSysFlag(0);
requestHeader.setSubVersion(100L);
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
- request.makeCustomHeaderToNet();
+// request.makeCustomHeaderToNet();
return request;
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index 792fd0f..2f56422 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -217,7 +217,7 @@ public class SendMessageProcessorTest {
header.setSysFlag(sysFlag);
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, header);
request.setBody(new byte[] {'a'});
- request.makeCustomHeaderToNet();
+// request.makeCustomHeaderToNet();
return request;
}
@@ -240,7 +240,7 @@ public class SendMessageProcessorTest {
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
request.setBody(new byte[] {'a'});
- request.makeCustomHeaderToNet();
+// request.makeCustomHeaderToNet();
return request;
}
@@ -253,7 +253,7 @@ public class SendMessageProcessorTest {
requestHeader.setOffset(123L);
RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
- request.makeCustomHeaderToNet();
+// request.makeCustomHeaderToNet();
return request;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index d798164..562810f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
/**
* Client Common configuration
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
index 7dfd97a..2c8d973 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
@@ -20,7 +20,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
/**
* Wrapper class for offset serialization
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 1837204..0fa1ae7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -146,11 +146,11 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
public class MQClientAPIImpl {
@@ -1210,6 +1210,7 @@ public class MQClientAPIImpl {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
+ log.info("getTopicRouteInfoFromNameServer response: " + response);
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 9732d0e..2e6078f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -35,7 +35,6 @@ import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
/**
* This class is the entry point for applications intending to send messages.
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index c225afd..49223f0 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -45,7 +45,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
diff --git a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
index e54000d..a2756e8 100644
--- a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
+++ b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.common;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class DataVersion extends RemotingSerializable {
private long timestamp = System.currentTimeMillis();
diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java b/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
index 6b1c492..a4d14a6 100644
--- a/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
@@ -20,7 +20,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumeStats extends RemotingSerializable {
private HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<MessageQueue, OffsetWrapper>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java b/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
index 729075c..e30fd78 100644
--- a/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.common.admin;
import java.util.HashMap;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class TopicStatsTable extends RemotingSerializable {
private HashMap<MessageQueue, TopicOffset> offsetTable = new HashMap<MessageQueue, TopicOffset>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java
deleted file mode 100644
index d8c1ced..0000000
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.common.protocol;
-
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
-public class MQProtosHelper {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
-
- public static boolean registerBrokerToNameServer(final String nsaddr, final String brokerAddr,
- final long timeoutMillis) {
- RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
- requestHeader.setBrokerAddr(brokerAddr);
-
- RemotingCommand request =
- RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
-
- try {
- RemotingCommand response = RemotingHelper.invokeSync(nsaddr, request, timeoutMillis);
- if (response != null) {
- return ResponseCode.SUCCESS == response.getCode();
- }
- } catch (Exception e) {
- log.error("Failed to register broker", e);
- }
-
- return false;
- }
-}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java
index c4ff63d..4a6e0ae 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.common.protocol.body;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class BrokerStatsData extends RemotingSerializable {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java
index a78ce55..b98ce95 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class CheckClientRequestBody extends RemotingSerializable {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
index 76c64a8..566bf93 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ClusterInfo extends RemotingSerializable {
private HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java
index b42737f..dd0bf81 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.common.protocol.body;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
public class Connection {
private String clientId;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java
index 7b20d76..f600991 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumeByWho extends RemotingSerializable {
private HashSet<String> consumedGroup = new HashSet<String>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
index 674df60..02e37af 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.common.protocol.body;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumeMessageDirectlyResult extends RemotingSerializable {
private boolean order = false;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java
index 7b35a80..28c5a7d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java
@@ -20,7 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.common.admin.ConsumeStats;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumeStatsList extends RemotingSerializable {
private List<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>> consumeStatsList = new ArrayList<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java
index 3a0356c..d3ac81e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java
@@ -24,7 +24,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumerConnection extends RemotingSerializable {
private HashSet<Connection> connectionSet = new HashSet<Connection>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
index 5b08d78..9fe7382 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumerOffsetSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
index d7942eb..c0be419 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
@@ -25,7 +25,7 @@ import java.util.TreeSet;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ConsumerRunningInfo extends RemotingSerializable {
public static final String PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR";
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java
index 6234a77..f654843 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
@Deprecated
public class GetConsumerStatusBody extends RemotingSerializable {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java
index 862a739..97ffb74 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class GroupList extends RemotingSerializable {
private HashSet<String> groupList = new HashSet<String>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/KVTable.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/KVTable.java
index 28aafc4..e1290fd 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/KVTable.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/KVTable.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.common.protocol.body;
import java.util.HashMap;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class KVTable extends RemotingSerializable {
private HashMap<String, String> table = new HashMap<String, String>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchRequestBody.java
index 480862b..0851687 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchRequestBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchRequestBody.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class LockBatchRequestBody extends RemotingSerializable {
private String consumerGroup;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchResponseBody.java
index 5018f20..56bd1a4 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchResponseBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchResponseBody.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class LockBatchResponseBody extends RemotingSerializable {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerConnection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerConnection.java
index 14d7110..b5d3a2e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerConnection.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerConnection.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ProducerConnection extends RemotingSerializable {
private HashSet<Connection> connectionSet = new HashSet<Connection>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java
index be93da9..7f53e05 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import java.util.List;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
index cae2109..82b4e85 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.ArrayList;
import java.util.List;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class QueryConsumeTimeSpanBody extends RemotingSerializable {
List<QueueTimeSpan> consumeTimeSpanSet = new ArrayList<QueueTimeSpan>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
index f7c866a..c7ccd12 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.HashMap;
import java.util.Map;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class QueryCorrectionOffsetBody extends RemotingSerializable {
private Map<Integer, Long> correctionOffsets = new HashMap<Integer, Long>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
index 4065c08..d3f8833 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -36,7 +36,7 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class RegisterBrokerBody extends RemotingSerializable {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
index b28e74b..df72758 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.Map;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ResetOffsetBody extends RemotingSerializable {
private Map<MessageQueue, Long> offsetTable;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBodyForC.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
index fa812ed..e15322f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.List;
import org.apache.rocketmq.common.message.MessageQueueForC;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ResetOffsetBodyForC extends RemotingSerializable {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
index e05f759..d966980 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
@@ -21,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class SubscriptionGroupWrapper extends RemotingSerializable {
private ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
index ce12302..d944b27 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
@@ -21,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class TopicConfigSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<String, TopicConfig> topicConfigTable =
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
index baf8312..3a3e13e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
import java.util.Set;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class TopicList extends RemotingSerializable {
private Set<String> topicList = new HashSet<String>();
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/UnlockBatchRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
index baf4071..d8c73a2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.common.protocol.body;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class UnlockBatchRequestBody extends RemotingSerializable {
private String consumerGroup;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseBody.java
index da39e77..2ab1578 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseBody.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseBody.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol.header;
import java.util.List;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class GetConsumerListByGroupResponseBody extends RemotingSerializable {
private List<String> consumerIdList;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
index 47ae542..03151f5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java
@@ -22,7 +22,7 @@ package org.apache.rocketmq.common.protocol.heartbeat;
import java.util.HashSet;
import java.util.Set;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class HeartbeatData extends RemotingSerializable {
private String clientID;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
index e8f54b8..ed9d644 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java
@@ -23,7 +23,7 @@ package org.apache.rocketmq.common.protocol.route;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java b/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java
index 1c03c07..a769f95 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/topic/OffsetMovedEvent.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol.topic;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class OffsetMovedEvent extends RemotingSerializable {
private String consumerGroup;
diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
index 4a2e790..5f22d82 100644
--- a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.common.protocol;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
index a6654f2..6faccf7 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java
@@ -32,13 +32,13 @@ import org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor;
import org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService;
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.RemotingServerFactory;
import org.apache.rocketmq.remoting.common.TlsMode;
-import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
import org.apache.rocketmq.srvutil.FileWatchService;
-
public class NamesrvController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
@@ -77,7 +77,9 @@ public class NamesrvController {
this.kvConfigManager.load();
- this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
+ this.remotingServer = RemotingServerFactory.getRemotingServer();
+ this.remotingServer.init(this.nettyServerConfig, this.brokerHousekeepingService);
+// this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
@@ -111,6 +113,7 @@ public class NamesrvController {
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
+
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
@@ -129,6 +132,7 @@ public class NamesrvController {
reloadServerSslContext();
}
}
+
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
index 3b53e19..59b2161 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.namesrv.kvconfig;
import java.util.HashMap;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class KVConfigSerializeWrapper extends RemotingSerializable {
private HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable;
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index 467078c..dc32445 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -69,7 +69,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
-
+ log.info("receive remoting command: " + request);
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
@@ -280,7 +280,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor {
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
-
+ log.info("requestHeader: " + requestHeader );
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
index d673510..1ce127f 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -37,7 +37,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
public class PullConsumerImpl implements PullConsumer {
private final DefaultMQPullConsumer rocketmqPullConsumer;
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
index d5d394a..46f9a45 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -39,7 +39,7 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
public class PushConsumerImpl implements PushConsumer {
private final DefaultMQPushConsumer rocketmqPushConsumer;
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
index 3db8590..95daef3 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -37,7 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
diff --git a/pom.xml b/pom.xml
index 0a8fef8..0a8a41e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -544,7 +544,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
- <version>4.0.42.Final</version>
+ <version>4.1.32.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
@@ -552,6 +552,11 @@
<version>1.2.51</version>
</dependency>
<dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack</artifactId>
+ <version>0.6.12</version>
+ </dependency>
+ <dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.20.0-GA</version>
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 55d92f3..26e5bfc 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -38,6 +38,10 @@
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
+ <groupId>org.msgpack</groupId>
+ <artifactId>msgpack</artifactId>
+ </dependency>
+ <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index c0754db..ab4e914 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -22,6 +22,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -51,4 +52,6 @@ public interface RemotingClient extends RemotingService {
ExecutorService getCallbackExecutor();
boolean isChannelWritable(final String addr);
+
+ void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
new file mode 100644
index 0000000..5e87ec9
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
@@ -0,0 +1,33 @@
+package org.apache.rocketmq.remoting;
+
+import java.util.Map;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.util.RemotingUtil;
+import org.apache.rocketmq.remoting.util.ServiceProvider;
+
+public class RemotingClientFactory {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+ private RemotingClientFactory() {
+ }
+
+ private static Map<String, RemotingClient> clients;
+
+ private static final String CLIENT_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingClient";
+
+ static {
+ log.info("begin load client");
+ clients = ServiceProvider.load(CLIENT_LOCATION, RemotingClient.class);
+ log.info("end load client, size:{}", clients.size());
+ }
+
+ public static RemotingClient getClient(String protocolType) {
+ return clients.get(protocolType);
+ }
+
+ public static RemotingClient getClient() {
+ return clients.get(RemotingUtil.DEFAULT_PROTOCOL);
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
index a12c089..6a5fb91 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingServer extends RemotingService {
@@ -36,6 +37,8 @@ public interface RemotingServer extends RemotingService {
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
+ void push(final String addr, final String sessionId, RemotingCommand remotingCommand);
+
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
@@ -48,4 +51,6 @@ public interface RemotingServer extends RemotingService {
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
+ void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener);
+
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
new file mode 100644
index 0000000..e7a7700
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
@@ -0,0 +1,40 @@
+package org.apache.rocketmq.remoting;
+
+import java.util.Map;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.util.RemotingUtil;
+import org.apache.rocketmq.remoting.util.ServiceProvider;
+
+public class RemotingServerFactory {
+
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+ private RemotingServerFactory() {
+ }
+
+ private static Map<String, RemotingServer> servers;
+
+// private static Map<String/*protocolType*/, String/*path*/ >
+
+ private static final String SERVER_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingServer";
+
+ static {
+ log.info("begin load server");
+ servers = ServiceProvider.load(SERVER_LOCATION, RemotingClient.class);
+ log.info("end load server, size:{}", servers.size());
+ }
+
+ public static RemotingServer getRemotingServer() {
+ return getRemotingServer(RemotingUtil.DEFAULT_PROTOCOL);
+ }
+
+ public static RemotingServer getRemotingServer(String protocolType) {
+ return servers.get(protocolType);
+ }
+
+// public static RemotingServer createNewInstance(String protocolType){
+// return ServiceProvider.load()
+// }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 585b60b..0414996 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -58,98 +58,6 @@ public class RemotingHelper {
return isa;
}
- public static RemotingCommand invokeSync(final String addr, final RemotingCommand request,
- final long timeoutMillis) throws InterruptedException, RemotingConnectException,
- RemotingSendRequestException, RemotingTimeoutException {
- long beginTime = System.currentTimeMillis();
- SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
- SocketChannel socketChannel = RemotingUtil.connect(socketAddress);
- if (socketChannel != null) {
- boolean sendRequestOK = false;
-
- try {
-
- socketChannel.configureBlocking(true);
-
- //bugfix http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802
- socketChannel.socket().setSoTimeout((int) timeoutMillis);
-
- ByteBuffer byteBufferRequest = request.encode();
- while (byteBufferRequest.hasRemaining()) {
- int length = socketChannel.write(byteBufferRequest);
- if (length > 0) {
- if (byteBufferRequest.hasRemaining()) {
- if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
-
- throw new RemotingSendRequestException(addr);
- }
- }
- } else {
- throw new RemotingSendRequestException(addr);
- }
-
- Thread.sleep(1);
- }
-
- sendRequestOK = true;
-
- ByteBuffer byteBufferSize = ByteBuffer.allocate(4);
- while (byteBufferSize.hasRemaining()) {
- int length = socketChannel.read(byteBufferSize);
- if (length > 0) {
- if (byteBufferSize.hasRemaining()) {
- if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
-
- throw new RemotingTimeoutException(addr, timeoutMillis);
- }
- }
- } else {
- throw new RemotingTimeoutException(addr, timeoutMillis);
- }
-
- Thread.sleep(1);
- }
-
- int size = byteBufferSize.getInt(0);
- ByteBuffer byteBufferBody = ByteBuffer.allocate(size);
- while (byteBufferBody.hasRemaining()) {
- int length = socketChannel.read(byteBufferBody);
- if (length > 0) {
- if (byteBufferBody.hasRemaining()) {
- if ((System.currentTimeMillis() - beginTime) > timeoutMillis) {
-
- throw new RemotingTimeoutException(addr, timeoutMillis);
- }
- }
- } else {
- throw new RemotingTimeoutException(addr, timeoutMillis);
- }
-
- Thread.sleep(1);
- }
-
- byteBufferBody.flip();
- return RemotingCommand.decode(byteBufferBody);
- } catch (IOException e) {
- log.error("invokeSync failure", e);
-
- if (sendRequestOK) {
- throw new RemotingTimeoutException(addr, timeoutMillis);
- } else {
- throw new RemotingSendRequestException(addr);
- }
- } finally {
- try {
- socketChannel.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- } else {
- throw new RemotingConnectException(addr);
- }
- }
-
public static String parseChannelRemoteAddr(final Channel channel) {
if (null == channel) {
return "";
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ChannelMetrics.java
old mode 100644
new mode 100755
similarity index 80%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
copy to remoting/src/main/java/org/apache/rocketmq/remoting/netty/ChannelMetrics.java
index 01c853b..c56b93d
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ChannelMetrics.java
@@ -14,9 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.protocol;
-public enum RemotingCommandType {
- REQUEST_COMMAND,
- RESPONSE_COMMAND;
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.channel.group.ChannelGroup;
+
+public interface ChannelMetrics {
+
+ Integer getChannelCount();
+
+ ChannelGroup getChannels();
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ChannelStatisticsHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ChannelStatisticsHandler.java
new file mode 100755
index 0000000..6aa5fd9
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ChannelStatisticsHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.group.ChannelGroup;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ChannelStatisticsHandler extends ChannelDuplexHandler implements ChannelMetrics {
+ public static final String NAME = ChannelStatisticsHandler.class.getSimpleName();
+ private final AtomicInteger channelCount = new AtomicInteger(0);
+ private final ChannelGroup allChannels;
+
+ public ChannelStatisticsHandler(ChannelGroup allChannels) {
+ this.allChannels = allChannels;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ // connect
+ channelCount.incrementAndGet();
+ allChannels.add(ctx.channel());
+ super.channelActive(ctx);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ // disconnect
+ channelCount.decrementAndGet();
+ allChannels.remove(ctx.channel());
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public Integer getChannelCount() {
+ return channelCount.get();
+ }
+
+ @Override
+ public ChannelGroup getChannels() {
+ return allChannels;
+ }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java
new file mode 100644
index 0000000..d0e3632
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java
@@ -0,0 +1,283 @@
+package org.apache.rocketmq.remoting.netty;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RocketMQSerializable;
+import org.apache.rocketmq.remoting.serialize.SerializeType;
+import org.apache.rocketmq.remoting.serialize.Serializer;
+import org.apache.rocketmq.remoting.serialize.SerializerFactory;
+
+public class CodecHelper {
+ public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
+ protected static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
+ new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
+ protected static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
+ private static final Map<Field, Boolean> NULLABLE_FIELD_CACHE = new HashMap<Field, Boolean>();
+ private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
+ private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
+ private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
+ private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();
+ private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();
+ private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();
+ private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
+ private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
+ private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
+
+ public static RemotingCommand decode(final byte[] array) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(array);
+ return decode(byteBuffer);
+ }
+
+ public static RemotingCommand decode(final ByteBuffer byteBuffer) {
+ int length = byteBuffer.limit();
+ int oriHeaderLen = byteBuffer.getInt();
+ int headerLength = getHeaderLength(oriHeaderLen);
+
+ byte[] headerData = new byte[headerLength];
+ byteBuffer.get(headerData);
+ RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
+ System.out.println("cmd: " + cmd);
+ int bodyLength = length - 4 - headerLength;
+ byte[] bodyData = null;
+ if (bodyLength > 0) {
+ bodyData = new byte[bodyLength];
+ byteBuffer.get(bodyData);
+ }
+ cmd.setBody(bodyData);
+ return cmd;
+ }
+
+ public static int getHeaderLength(int length) {
+ return length & 0xFFFFFF;
+ }
+
+ private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
+ Serializer serializer = SerializerFactory.get(type);
+ if (serializer != null) {
+ RemotingCommand remotingCommand = serializer.deserializer(headerData);
+ return remotingCommand;
+ }
+ return null;
+ }
+
+ public static SerializeType getProtocolType(int source) {
+ return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
+ }
+
+ public static byte[] markProtocolType(int source, SerializeType type) {
+ byte[] result = new byte[4];
+
+ result[0] = type.getCode();
+ result[1] = (byte) ((source >> 16) & 0xFF);
+ result[2] = (byte) ((source >> 8) & 0xFF);
+ result[3] = (byte) (source & 0xFF);
+ return result;
+ }
+
+ public static CommandCustomHeader decodeCommandCustomHeader(RemotingCommand remotingCommand,
+ Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
+ CommandCustomHeader objectHeader;
+ try {
+ objectHeader = classHeader.newInstance();
+ } catch (InstantiationException e) {
+ return null;
+ } catch (IllegalAccessException e) {
+ return null;
+ }
+
+ if (remotingCommand.getExtFields() != null) {
+ Field[] fields = getClazzFields(classHeader);
+ for (Field field : fields) {
+ if (!Modifier.isStatic(field.getModifiers())) {
+ String fieldName = field.getName();
+ if (!fieldName.startsWith("this")) {
+ try {
+ String value = remotingCommand.getExtFields().get(fieldName);
+ if (null == value) {
+ if (!isFieldNullable(field)) {
+ throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
+ }
+ continue;
+ }
+
+ field.setAccessible(true);
+ String type = getCanonicalName(field.getType());
+ Object valueParsed;
+
+ if (type.equals(STRING_CANONICAL_NAME)) {
+ valueParsed = value;
+ } else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) {
+ valueParsed = Integer.parseInt(value);
+ } else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) {
+ valueParsed = Long.parseLong(value);
+ } else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) {
+ valueParsed = Boolean.parseBoolean(value);
+ } else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
+ valueParsed = Double.parseDouble(value);
+ } else {
+ throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
+ }
+
+ field.set(objectHeader, valueParsed);
+
+ } catch (Throwable e) {
+ }
+ }
+ }
+ }
+
+ objectHeader.checkFields();
+ }
+
+ return objectHeader;
+ }
+
+ private static Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
+ Field[] field = CLASS_HASH_MAP.get(classHeader);
+
+ if (field == null) {
+ field = classHeader.getDeclaredFields();
+ synchronized (CLASS_HASH_MAP) {
+ CLASS_HASH_MAP.put(classHeader, field);
+ }
+ }
+ return field;
+ }
+
+ private static boolean isFieldNullable(Field field) {
+ if (!NULLABLE_FIELD_CACHE.containsKey(field)) {
+ Annotation annotation = field.getAnnotation(CFNotNull.class);
+ synchronized (NULLABLE_FIELD_CACHE) {
+ NULLABLE_FIELD_CACHE.put(field, annotation == null);
+ }
+ }
+ return NULLABLE_FIELD_CACHE.get(field);
+ }
+
+ private static String getCanonicalName(Class clazz) {
+ String name = CANONICAL_NAME_CACHE.get(clazz);
+
+ if (name == null) {
+ name = clazz.getCanonicalName();
+ synchronized (CANONICAL_NAME_CACHE) {
+ CANONICAL_NAME_CACHE.put(clazz, name);
+ }
+ }
+ return name;
+ }
+
+ public static ByteBuffer encode(RemotingCommand remotingCommand) {
+ // 1> header length size
+ int length = 4;
+
+ // 2> header data length
+ byte[] headerData = headerEncode(remotingCommand);
+ length += headerData.length;
+
+ // 3> body data length
+ if (remotingCommand.getBody() != null) {
+ length += remotingCommand.getBody().length;
+ }
+
+ ByteBuffer result = ByteBuffer.allocate(4 + length);
+
+ // length
+ result.putInt(length);
+
+ // header length
+ result.put(markProtocolType(headerData.length, remotingCommand.getSerializeTypeCurrentRPC()));
+
+ // header data
+ result.put(headerData);
+
+ // body data;
+ if (remotingCommand.getBody() != null) {
+ result.put(remotingCommand.getBody());
+ }
+
+ result.flip();
+
+ return result;
+ }
+
+ private static byte[] headerEncode(RemotingCommand remotingCommand) {
+ makeCustomHeaderToNet(remotingCommand);
+ Serializer serializer = SerializerFactory.get(remotingCommand.getSerializeTypeCurrentRPC());
+ if (serializer == null) {
+ serializer = SerializerFactory.get(SerializeType.JSON);
+ }
+ return serializer.serializer(remotingCommand);
+ }
+
+ public static void makeCustomHeaderToNet(RemotingCommand remotingCommand) {
+ if (remotingCommand.getCustomHeader() != null) {
+ Field[] fields = getClazzFields(remotingCommand.getCustomHeader().getClass());
+ HashMap extFields = remotingCommand.getExtFields();
+
+ if (null == extFields) {
+ remotingCommand.setExtFields(new HashMap<String, String>());
+ }
+ for (Field field : fields) {
+ if (!Modifier.isStatic(field.getModifiers())) {
+ String name = field.getName();
+ if (!name.startsWith("this")) {
+ Object value = null;
+ try {
+ field.setAccessible(true);
+ value = field.get(remotingCommand.getCustomHeader());
+ } catch (Exception e) {
+ }
+
+ if (value != null) {
+ remotingCommand.getExtFields().put(name, value.toString());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public static ByteBuffer encodeHeader(RemotingCommand remotingCommand) {
+ int bodyLength = remotingCommand.getBody() != null ? remotingCommand.getBody().length : 0;
+ return encodeHeader(bodyLength, remotingCommand);
+ }
+
+ public static ByteBuffer encodeHeader(final int bodyLength, RemotingCommand remotingCommand) {
+ // 1> header length size
+ int length = 4;
+
+ // 2> header data length
+ byte[] headerData;
+ headerData = headerEncode(remotingCommand);
+
+ length += headerData.length;
+
+ // 3> body data length
+ length += bodyLength;
+
+ ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
+
+ // length
+ result.putInt(length);
+
+ // header length
+ result.put(markProtocolType(headerData.length, remotingCommand.getSerializeTypeCurrentRPC()));
+
+ // header data
+ result.put(headerData);
+
+ result.flip();
+
+ return result;
+ }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
index 2bd15ae..0c5e52d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/FileRegionEncoder.java
@@ -68,7 +68,7 @@ public class FileRegionEncoder extends MessageToByteEncoder<FileRegion> {
long toTransfer = msg.count();
while (true) {
- long transferred = msg.transfered();
+ long transferred = msg.transferred();
if (toTransfer - transferred <= 0) {
break;
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
index 4b4e86e..ccd1b14 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
@@ -17,13 +17,11 @@
package org.apache.rocketmq.remoting.netty;
-
import io.netty.util.internal.logging.InternalLogLevel;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public class NettyLogger {
private static AtomicBoolean nettyLoggerSeted = new AtomicBoolean(false);
@@ -52,6 +50,30 @@ public class NettyLogger {
private InternalLogger logger = null;
+ @Override public void trace(Throwable throwable) {
+
+ }
+
+ @Override public void debug(Throwable throwable) {
+
+ }
+
+ @Override public void info(Throwable throwable) {
+
+ }
+
+ @Override public void warn(Throwable throwable) {
+
+ }
+
+ @Override public void error(Throwable throwable) {
+
+ }
+
+ @Override public void log(InternalLogLevel level, Throwable throwable) {
+
+ }
+
public NettyBridgeLogger(String name) {
logger = InternalLoggerFactory.getLogger(name);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 8dccebc..17053ff 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -17,11 +17,15 @@
package org.apache.rocketmq.remoting.netty;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Iterator;
@@ -33,13 +37,16 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
import org.apache.rocketmq.remoting.common.ServiceThread;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -49,6 +56,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
+import org.apache.rocketmq.remoting.util.ThreadUtils;
public abstract class NettyRemotingAbstract {
@@ -60,12 +68,12 @@ public abstract class NettyRemotingAbstract {
/**
* Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
*/
- protected final Semaphore semaphoreOneway;
+ protected Semaphore semaphoreOneway;
/**
* Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.
*/
- protected final Semaphore semaphoreAsync;
+ protected Semaphore semaphoreAsync;
/**
* This map caches all on-going requests.
@@ -83,10 +91,11 @@ public abstract class NettyRemotingAbstract {
/**
* Executor to feed netty events to user defined {@link ChannelEventListener}.
*/
- protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
+ protected NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
/**
- * The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
+ * The default request processor to use in case there is no exact match in {@link #processorTable} per request
+ * code.
*/
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
@@ -99,6 +108,13 @@ public abstract class NettyRemotingAbstract {
NettyLogger.initNettyLogger();
}
+ protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true);
+
+ public NettyRemotingAbstract() {
+ this.semaphoreOneway = new Semaphore(65535, true);
+ this.semaphoreAsync = new Semaphore(65535, true);
+ }
+
/**
* Constructor, specifying capacity of one-way and asynchronous semaphores.
*
@@ -110,6 +126,11 @@ public abstract class NettyRemotingAbstract {
this.semaphoreAsync = new Semaphore(permitsAsync, true);
}
+ public void init(final int permitsOneway, final int permitsAsync) {
+ this.semaphoreOneway = new Semaphore(permitsOneway, true);
+ this.semaphoreAsync = new Semaphore(permitsAsync, true);
+ }
+
/**
* Custom channel event listener.
*
@@ -329,6 +350,15 @@ public abstract class NettyRemotingAbstract {
*/
public abstract ExecutorService getCallbackExecutor();
+ protected void startUpHouseKeepingService() {
+ this.houseKeepingService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ scanResponseTable();
+ }
+ }, 3000, 1000, TimeUnit.MICROSECONDS);
+ }
+
/**
* <p>
* This method is periodically invoked to scan and expire deprecated request.
@@ -358,6 +388,21 @@ public abstract class NettyRemotingAbstract {
}
}
+ public void start() {
+ if (getChannelEventListener() != null) {
+ nettyEventExecutor.start();
+ }
+ }
+
+ public void shutdown() {
+ if (this.nettyEventExecutor != null) {
+ this.nettyEventExecutor.shutdown();
+ }
+ if (this.houseKeepingService != null) {
+ this.houseKeepingService.shutdown();
+ }
+ }
+
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
@@ -410,7 +455,7 @@ public abstract class NettyRemotingAbstract {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
- throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
+ throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
}
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
@@ -465,6 +510,7 @@ public abstract class NettyRemotingAbstract {
/**
* mark the request of the specified channel as fail and to invoke fail callback immediately
+ *
* @param channel the channel which is close already
*/
protected void failFast(final Channel channel) {
@@ -570,4 +616,15 @@ public abstract class NettyRemotingAbstract {
return NettyEventExecutor.class.getSimpleName();
}
}
+
+ public class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+ processMessageReceived(ctx, msg);
+ }
+ }
+
+
+
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
deleted file mode 100644
index 33c2eed..0000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ /dev/null
@@ -1,710 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.remoting.netty;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.security.cert.CertificateException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.rocketmq.remoting.ChannelEventListener;
-import org.apache.rocketmq.remoting.InvokeCallback;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.common.Pair;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
-public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
-
- private static final long LOCK_TIMEOUT_MILLIS = 3000;
-
- private final NettyClientConfig nettyClientConfig;
- private final Bootstrap bootstrap = new Bootstrap();
- private final EventLoopGroup eventLoopGroupWorker;
- private final Lock lockChannelTables = new ReentrantLock();
- private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
-
- private final Timer timer = new Timer("ClientHouseKeepingService", true);
-
- private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
- private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
- private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
- private final Lock lockNamesrvChannel = new ReentrantLock();
-
- private final ExecutorService publicExecutor;
-
- /**
- * Invoke the callback methods in this executor when process response.
- */
- private ExecutorService callbackExecutor;
- private final ChannelEventListener channelEventListener;
- private DefaultEventExecutorGroup defaultEventExecutorGroup;
- private RPCHook rpcHook;
-
- public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
- this(nettyClientConfig, null);
- }
-
- public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
- final ChannelEventListener channelEventListener) {
- super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
- this.nettyClientConfig = nettyClientConfig;
- this.channelEventListener = channelEventListener;
-
- int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
- if (publicThreadNums <= 0) {
- publicThreadNums = 4;
- }
-
- this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
- }
- });
-
- this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
- }
- });
-
- if (nettyClientConfig.isUseTLS()) {
- try {
- sslContext = TlsHelper.buildSslContext(true);
- log.info("SSL enabled for client");
- } catch (IOException e) {
- log.error("Failed to create SSLContext", e);
- } catch (CertificateException e) {
- log.error("Failed to create SSLContext", e);
- throw new RuntimeException("Failed to create SSLContext", e);
- }
- }
- }
-
- private static int initValueIndex() {
- Random r = new Random();
-
- return Math.abs(r.nextInt() % 999) % 999;
- }
-
- @Override
- public void start() {
- this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
- nettyClientConfig.getClientWorkerThreads(),
- new ThreadFactory() {
-
- private AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
- }
- });
-
- Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.SO_KEEPALIVE, false)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
- .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
- .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- if (nettyClientConfig.isUseTLS()) {
- if (null != sslContext) {
- pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
- log.info("Prepend SSL handler");
- } else {
- log.warn("Connections are insecure as SSLContext is null!");
- }
- }
- pipeline.addLast(
- defaultEventExecutorGroup,
- new NettyEncoder(),
- new NettyDecoder(),
- new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
- new NettyConnectManageHandler(),
- new NettyClientHandler());
- }
- });
-
- this.timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- try {
- NettyRemotingClient.this.scanResponseTable();
- } catch (Throwable e) {
- log.error("scanResponseTable exception", e);
- }
- }
- }, 1000 * 3, 1000);
-
- if (this.channelEventListener != null) {
- this.nettyEventExecutor.start();
- }
- }
-
- @Override
- public void shutdown() {
- try {
- this.timer.cancel();
-
- for (ChannelWrapper cw : this.channelTables.values()) {
- this.closeChannel(null, cw.getChannel());
- }
-
- this.channelTables.clear();
-
- this.eventLoopGroupWorker.shutdownGracefully();
-
- if (this.nettyEventExecutor != null) {
- this.nettyEventExecutor.shutdown();
- }
-
- if (this.defaultEventExecutorGroup != null) {
- this.defaultEventExecutorGroup.shutdownGracefully();
- }
- } catch (Exception e) {
- log.error("NettyRemotingClient shutdown exception, ", e);
- }
-
- if (this.publicExecutor != null) {
- try {
- this.publicExecutor.shutdown();
- } catch (Exception e) {
- log.error("NettyRemotingServer shutdown exception, ", e);
- }
- }
- }
-
- public void closeChannel(final String addr, final Channel channel) {
- if (null == channel)
- return;
-
- final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr;
-
- try {
- if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- boolean removeItemFromTable = true;
- final ChannelWrapper prevCW = this.channelTables.get(addrRemote);
-
- log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
-
- if (null == prevCW) {
- log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
- removeItemFromTable = false;
- } else if (prevCW.getChannel() != channel) {
- log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
- addrRemote);
- removeItemFromTable = false;
- }
-
- if (removeItemFromTable) {
- this.channelTables.remove(addrRemote);
- log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
- }
-
- RemotingUtil.closeChannel(channel);
- } catch (Exception e) {
- log.error("closeChannel: close the channel exception", e);
- } finally {
- this.lockChannelTables.unlock();
- }
- } else {
- log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
- }
- } catch (InterruptedException e) {
- log.error("closeChannel exception", e);
- }
- }
-
- @Override
- public void registerRPCHook(RPCHook rpcHook) {
- this.rpcHook = rpcHook;
- }
-
- public void closeChannel(final Channel channel) {
- if (null == channel)
- return;
-
- try {
- if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- boolean removeItemFromTable = true;
- ChannelWrapper prevCW = null;
- String addrRemote = null;
- for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) {
- String key = entry.getKey();
- ChannelWrapper prev = entry.getValue();
- if (prev.getChannel() != null) {
- if (prev.getChannel() == channel) {
- prevCW = prev;
- addrRemote = key;
- break;
- }
- }
- }
-
- if (null == prevCW) {
- log.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
- removeItemFromTable = false;
- }
-
- if (removeItemFromTable) {
- this.channelTables.remove(addrRemote);
- log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
- RemotingUtil.closeChannel(channel);
- }
- } catch (Exception e) {
- log.error("closeChannel: close the channel exception", e);
- } finally {
- this.lockChannelTables.unlock();
- }
- } else {
- log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
- }
- } catch (InterruptedException e) {
- log.error("closeChannel exception", e);
- }
- }
-
- @Override
- public void updateNameServerAddressList(List<String> addrs) {
- List<String> old = this.namesrvAddrList.get();
- boolean update = false;
-
- if (!addrs.isEmpty()) {
- if (null == old) {
- update = true;
- } else if (addrs.size() != old.size()) {
- update = true;
- } else {
- for (int i = 0; i < addrs.size() && !update; i++) {
- if (!old.contains(addrs.get(i))) {
- update = true;
- }
- }
- }
-
- if (update) {
- Collections.shuffle(addrs);
- log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
- this.namesrvAddrList.set(addrs);
- }
- }
- }
-
- @Override
- public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
- throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
- long beginStartTime = System.currentTimeMillis();
- final Channel channel = this.getAndCreateChannel(addr);
- if (channel != null && channel.isActive()) {
- try {
- if (this.rpcHook != null) {
- this.rpcHook.doBeforeRequest(addr, request);
- }
- long costTime = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTime) {
- throw new RemotingTimeoutException("invokeSync call timeout");
- }
- RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
- if (this.rpcHook != null) {
- this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
- }
- return response;
- } catch (RemotingSendRequestException e) {
- log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
- this.closeChannel(addr, channel);
- throw e;
- } catch (RemotingTimeoutException e) {
- if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
- this.closeChannel(addr, channel);
- log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
- }
- log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
- throw e;
- }
- } else {
- this.closeChannel(addr, channel);
- throw new RemotingConnectException(addr);
- }
- }
-
- private Channel getAndCreateChannel(final String addr) throws InterruptedException {
- if (null == addr) {
- return getAndCreateNameserverChannel();
- }
-
- ChannelWrapper cw = this.channelTables.get(addr);
- if (cw != null && cw.isOK()) {
- return cw.getChannel();
- }
-
- return this.createChannel(addr);
- }
-
- private Channel getAndCreateNameserverChannel() throws InterruptedException {
- String addr = this.namesrvAddrChoosed.get();
- if (addr != null) {
- ChannelWrapper cw = this.channelTables.get(addr);
- if (cw != null && cw.isOK()) {
- return cw.getChannel();
- }
- }
-
- final List<String> addrList = this.namesrvAddrList.get();
- if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- addr = this.namesrvAddrChoosed.get();
- if (addr != null) {
- ChannelWrapper cw = this.channelTables.get(addr);
- if (cw != null && cw.isOK()) {
- return cw.getChannel();
- }
- }
-
- if (addrList != null && !addrList.isEmpty()) {
- for (int i = 0; i < addrList.size(); i++) {
- int index = this.namesrvIndex.incrementAndGet();
- index = Math.abs(index);
- index = index % addrList.size();
- String newAddr = addrList.get(index);
-
- this.namesrvAddrChoosed.set(newAddr);
- log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
- Channel channelNew = this.createChannel(newAddr);
- if (channelNew != null) {
- return channelNew;
- }
- }
- }
- } catch (Exception e) {
- log.error("getAndCreateNameserverChannel: create name server channel exception", e);
- } finally {
- this.lockNamesrvChannel.unlock();
- }
- } else {
- log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
- }
-
- return null;
- }
-
- private Channel createChannel(final String addr) throws InterruptedException {
- ChannelWrapper cw = this.channelTables.get(addr);
- if (cw != null && cw.isOK()) {
- cw.getChannel().close();
- channelTables.remove(addr);
- }
-
- if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- boolean createNewConnection;
- cw = this.channelTables.get(addr);
- if (cw != null) {
-
- if (cw.isOK()) {
- cw.getChannel().close();
- this.channelTables.remove(addr);
- createNewConnection = true;
- } else if (!cw.getChannelFuture().isDone()) {
- createNewConnection = false;
- } else {
- this.channelTables.remove(addr);
- createNewConnection = true;
- }
- } else {
- createNewConnection = true;
- }
-
- if (createNewConnection) {
- ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
- log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
- cw = new ChannelWrapper(channelFuture);
- this.channelTables.put(addr, cw);
- }
- } catch (Exception e) {
- log.error("createChannel: create channel exception", e);
- } finally {
- this.lockChannelTables.unlock();
- }
- } else {
- log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
- }
-
- if (cw != null) {
- ChannelFuture channelFuture = cw.getChannelFuture();
- if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
- if (cw.isOK()) {
- log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
- return cw.getChannel();
- } else {
- log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
- }
- } else {
- log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
- channelFuture.toString());
- }
- }
-
- return null;
- }
-
- @Override
- public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
- throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
- RemotingSendRequestException {
- long beginStartTime = System.currentTimeMillis();
- final Channel channel = this.getAndCreateChannel(addr);
- if (channel != null && channel.isActive()) {
- try {
- if (this.rpcHook != null) {
- this.rpcHook.doBeforeRequest(addr, request);
- }
- long costTime = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTime) {
- throw new RemotingTooMuchRequestException("invokeAsync call timeout");
- }
- this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
- } catch (RemotingSendRequestException e) {
- log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
- this.closeChannel(addr, channel);
- throw e;
- }
- } else {
- this.closeChannel(addr, channel);
- throw new RemotingConnectException(addr);
- }
- }
-
- @Override
- public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
- RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
- final Channel channel = this.getAndCreateChannel(addr);
- if (channel != null && channel.isActive()) {
- try {
- if (this.rpcHook != null) {
- this.rpcHook.doBeforeRequest(addr, request);
- }
- this.invokeOnewayImpl(channel, request, timeoutMillis);
- } catch (RemotingSendRequestException e) {
- log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
- this.closeChannel(addr, channel);
- throw e;
- }
- } else {
- this.closeChannel(addr, channel);
- throw new RemotingConnectException(addr);
- }
- }
-
- @Override
- public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
- ExecutorService executorThis = executor;
- if (null == executor) {
- executorThis = this.publicExecutor;
- }
-
- Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
- this.processorTable.put(requestCode, pair);
- }
-
- @Override
- public boolean isChannelWritable(String addr) {
- ChannelWrapper cw = this.channelTables.get(addr);
- if (cw != null && cw.isOK()) {
- return cw.isWritable();
- }
- return true;
- }
-
- @Override
- public List<String> getNameServerAddressList() {
- return this.namesrvAddrList.get();
- }
-
- @Override
- public ChannelEventListener getChannelEventListener() {
- return channelEventListener;
- }
-
- @Override
- public RPCHook getRPCHook() {
- return this.rpcHook;
- }
-
- @Override
- public ExecutorService getCallbackExecutor() {
- return callbackExecutor != null ? callbackExecutor : publicExecutor;
- }
-
- @Override
- public void setCallbackExecutor(final ExecutorService callbackExecutor) {
- this.callbackExecutor = callbackExecutor;
- }
-
- static class ChannelWrapper {
- private final ChannelFuture channelFuture;
-
- public ChannelWrapper(ChannelFuture channelFuture) {
- this.channelFuture = channelFuture;
- }
-
- public boolean isOK() {
- return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
- }
-
- public boolean isWritable() {
- return this.channelFuture.channel().isWritable();
- }
-
- private Channel getChannel() {
- return this.channelFuture.channel();
- }
-
- public ChannelFuture getChannelFuture() {
- return channelFuture;
- }
- }
-
- class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
- processMessageReceived(ctx, msg);
- }
- }
-
- class NettyConnectManageHandler extends ChannelDuplexHandler {
- @Override
- public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
- ChannelPromise promise) throws Exception {
- final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
- final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
- log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
-
- super.connect(ctx, remoteAddress, localAddress, promise);
-
- if (NettyRemotingClient.this.channelEventListener != null) {
- NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));
- }
- }
-
- @Override
- public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
- final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
- closeChannel(ctx.channel());
- super.disconnect(ctx, promise);
-
- if (NettyRemotingClient.this.channelEventListener != null) {
- NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
- }
- }
-
- @Override
- public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
- final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
- closeChannel(ctx.channel());
- super.close(ctx, promise);
- NettyRemotingClient.this.failFast(ctx.channel());
- if (NettyRemotingClient.this.channelEventListener != null) {
- NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
- }
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state().equals(IdleState.ALL_IDLE)) {
- final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
- closeChannel(ctx.channel());
- if (NettyRemotingClient.this.channelEventListener != null) {
- NettyRemotingClient.this
- .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
- }
- }
- }
-
- ctx.fireUserEventTriggered(evt);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
- log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
- closeChannel(ctx.channel());
- if (NettyRemotingClient.this.channelEventListener != null) {
- NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
- }
- }
- }
-}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
index a5e2a23..b3f55cd 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.remoting.netty;
public class NettyServerConfig implements Cloneable {
private int listenPort = 8888;
private int serverWorkerThreads = 8;
- private int serverCallbackExecutorThreads = 0;
+ private int serverCallbackExecutorThreads = 8;
private int serverSelectorThreads = 3;
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
@@ -28,6 +28,33 @@ public class NettyServerConfig implements Cloneable {
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
private boolean serverPooledByteBufAllocatorEnable = true;
+ private int serverAcceptorThreads = 1;
+ private int connectionChannelReaderIdleSeconds = 0;
+ private int connectionChannelWriterIdleSeconds = 0;
+
+ public int getConnectionChannelReaderIdleSeconds() {
+ return connectionChannelReaderIdleSeconds;
+ }
+
+ public void setConnectionChannelReaderIdleSeconds(int connectionChannelReaderIdleSeconds) {
+ this.connectionChannelReaderIdleSeconds = connectionChannelReaderIdleSeconds;
+ }
+
+ public int getConnectionChannelWriterIdleSeconds() {
+ return connectionChannelWriterIdleSeconds;
+ }
+
+ public void setConnectionChannelWriterIdleSeconds(int connectionChannelWriterIdleSeconds) {
+ this.connectionChannelWriterIdleSeconds = connectionChannelWriterIdleSeconds;
+ }
+
+ public int getServerAcceptorThreads() {
+ return serverAcceptorThreads;
+ }
+
+ public void setServerAcceptorThreads(int serverAcceptorThreads) {
+ this.serverAcceptorThreads = serverAcceptorThreads;
+ }
/**
* make make install
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index cadcab1..d392e79 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -17,43 +17,41 @@
package org.apache.rocketmq.remoting.protocol;
import com.alibaba.fastjson.annotation.JSONField;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.CodecHelper;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.SerializeType;
+import org.msgpack.annotation.Message;
+@Message
public class RemotingCommand {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
- private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
- private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
- private static final int RPC_ONEWAY = 1; // 0, RPC
- private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
- new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
- private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
- // 1, Oneway
- // 1, RESPONSE_COMMAND
- private static final Map<Field, Boolean> NULLABLE_FIELD_CACHE = new HashMap<Field, Boolean>();
- private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
- private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
- private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
- private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();
- private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();
- private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();
- private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
- private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
- private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
+
+ /**
+ * REQUEST_COMMAND
+ */
+ private static final int RPC_TYPE = 0;
+
+ /**
+ * One way
+ */
+ private static final int RPC_ONEWAY = 1;
+
+
+
private static volatile int configVersion = -1;
+
private static AtomicInteger requestId = new AtomicInteger(0);
private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;
@@ -74,15 +72,16 @@ public class RemotingCommand {
private int version = 0;
private int opaque = requestId.getAndIncrement();
private int flag = 0;
+
private String remark;
private HashMap<String, String> extFields;
- private transient CommandCustomHeader customHeader;
+ private CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private transient byte[] body;
- protected RemotingCommand() {
+ public RemotingCommand() {
}
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
@@ -132,59 +131,12 @@ public class RemotingCommand {
return cmd;
}
- public static RemotingCommand createResponseCommand(int code, String remark) {
- return createResponseCommand(code, remark, null);
- }
-
- public static RemotingCommand decode(final byte[] array) {
- ByteBuffer byteBuffer = ByteBuffer.wrap(array);
- return decode(byteBuffer);
- }
-
- public static RemotingCommand decode(final ByteBuffer byteBuffer) {
- int length = byteBuffer.limit();
- int oriHeaderLen = byteBuffer.getInt();
- int headerLength = getHeaderLength(oriHeaderLen);
-
- byte[] headerData = new byte[headerLength];
- byteBuffer.get(headerData);
-
- RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
-
- int bodyLength = length - 4 - headerLength;
- byte[] bodyData = null;
- if (bodyLength > 0) {
- bodyData = new byte[bodyLength];
- byteBuffer.get(bodyData);
- }
- cmd.body = bodyData;
-
- return cmd;
- }
-
- public static int getHeaderLength(int length) {
- return length & 0xFFFFFF;
- }
-
- private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
- switch (type) {
- case JSON:
- RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
- resultJson.setSerializeTypeCurrentRPC(type);
- return resultJson;
- case ROCKETMQ:
- RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
- resultRMQ.setSerializeTypeCurrentRPC(type);
- return resultRMQ;
- default:
- break;
- }
-
- return null;
+ public CommandCustomHeader getCustomHeader() {
+ return customHeader;
}
- public static SerializeType getProtocolType(int source) {
- return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
+ public static RemotingCommand createResponseCommand(int code, String remark) {
+ return createResponseCommand(code, remark, null);
}
public static int createNewRequestId() {
@@ -208,16 +160,6 @@ public class RemotingCommand {
return true;
}
- public static byte[] markProtocolType(int source, SerializeType type) {
- byte[] result = new byte[4];
-
- result[0] = type.getCode();
- result[1] = (byte) ((source >> 16) & 0xFF);
- result[2] = (byte) ((source >> 8) & 0xFF);
- result[3] = (byte) (source & 0xFF);
- return result;
- }
-
public void markResponseType() {
int bits = 1 << RPC_TYPE;
this.flag |= bits;
@@ -227,206 +169,13 @@ public class RemotingCommand {
return customHeader;
}
- public void writeCustomHeader(CommandCustomHeader customHeader) {
- this.customHeader = customHeader;
- }
-
public CommandCustomHeader decodeCommandCustomHeader(
Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
- CommandCustomHeader objectHeader;
- try {
- objectHeader = classHeader.newInstance();
- } catch (InstantiationException e) {
- return null;
- } catch (IllegalAccessException e) {
- return null;
- }
-
- if (this.extFields != null) {
-
- Field[] fields = getClazzFields(classHeader);
- for (Field field : fields) {
- if (!Modifier.isStatic(field.getModifiers())) {
- String fieldName = field.getName();
- if (!fieldName.startsWith("this")) {
- try {
- String value = this.extFields.get(fieldName);
- if (null == value) {
- if (!isFieldNullable(field)) {
- throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
- }
- continue;
- }
-
- field.setAccessible(true);
- String type = getCanonicalName(field.getType());
- Object valueParsed;
-
- if (type.equals(STRING_CANONICAL_NAME)) {
- valueParsed = value;
- } else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) {
- valueParsed = Integer.parseInt(value);
- } else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) {
- valueParsed = Long.parseLong(value);
- } else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) {
- valueParsed = Boolean.parseBoolean(value);
- } else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
- valueParsed = Double.parseDouble(value);
- } else {
- throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
- }
-
- field.set(objectHeader, valueParsed);
-
- } catch (Throwable e) {
- log.error("Failed field [{}] decoding", fieldName, e);
- }
- }
- }
- }
-
- objectHeader.checkFields();
- }
-
- return objectHeader;
- }
-
- private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
- Field[] field = CLASS_HASH_MAP.get(classHeader);
-
- if (field == null) {
- field = classHeader.getDeclaredFields();
- synchronized (CLASS_HASH_MAP) {
- CLASS_HASH_MAP.put(classHeader, field);
- }
- }
- return field;
- }
-
- private boolean isFieldNullable(Field field) {
- if (!NULLABLE_FIELD_CACHE.containsKey(field)) {
- Annotation annotation = field.getAnnotation(CFNotNull.class);
- synchronized (NULLABLE_FIELD_CACHE) {
- NULLABLE_FIELD_CACHE.put(field, annotation == null);
- }
- }
- return NULLABLE_FIELD_CACHE.get(field);
- }
-
- private String getCanonicalName(Class clazz) {
- String name = CANONICAL_NAME_CACHE.get(clazz);
-
- if (name == null) {
- name = clazz.getCanonicalName();
- synchronized (CANONICAL_NAME_CACHE) {
- CANONICAL_NAME_CACHE.put(clazz, name);
- }
- }
- return name;
- }
-
- public ByteBuffer encode() {
- // 1> header length size
- int length = 4;
-
- // 2> header data length
- byte[] headerData = this.headerEncode();
- length += headerData.length;
-
- // 3> body data length
- if (this.body != null) {
- length += body.length;
- }
-
- ByteBuffer result = ByteBuffer.allocate(4 + length);
-
- // length
- result.putInt(length);
-
- // header length
- result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
-
- // header data
- result.put(headerData);
-
- // body data;
- if (this.body != null) {
- result.put(this.body);
- }
-
- result.flip();
-
- return result;
- }
-
- private byte[] headerEncode() {
- this.makeCustomHeaderToNet();
- if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
- return RocketMQSerializable.rocketMQProtocolEncode(this);
- } else {
- return RemotingSerializable.encode(this);
- }
- }
-
- public void makeCustomHeaderToNet() {
- if (this.customHeader != null) {
- Field[] fields = getClazzFields(customHeader.getClass());
- if (null == this.extFields) {
- this.extFields = new HashMap<String, String>();
- }
-
- for (Field field : fields) {
- if (!Modifier.isStatic(field.getModifiers())) {
- String name = field.getName();
- if (!name.startsWith("this")) {
- Object value = null;
- try {
- field.setAccessible(true);
- value = field.get(this.customHeader);
- } catch (Exception e) {
- log.error("Failed to access field [{}]", name, e);
- }
-
- if (value != null) {
- this.extFields.put(name, value.toString());
- }
- }
- }
- }
- }
- }
-
- public ByteBuffer encodeHeader() {
- return encodeHeader(this.body != null ? this.body.length : 0);
+ return CodecHelper.decodeCommandCustomHeader(this, classHeader);
}
public ByteBuffer encodeHeader(final int bodyLength) {
- // 1> header length size
- int length = 4;
-
- // 2> header data length
- byte[] headerData;
- headerData = this.headerEncode();
-
- length += headerData.length;
-
- // 3> body data length
- length += bodyLength;
-
- ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
-
- // length
- result.putInt(length);
-
- // header length
- result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
-
- // header data
- result.put(headerData);
-
- result.flip();
-
- return result;
+ return CodecHelper.encodeHeader(bodyLength, this);
}
public void markOnewayRPC() {
@@ -440,14 +189,6 @@ public class RemotingCommand {
return (this.flag & bits) == bits;
}
- public int getCode() {
- return code;
- }
-
- public void setCode(int code) {
- this.code = code;
- }
-
@JSONField(serialize = false)
public RemotingCommandType getType() {
if (this.isResponseType()) {
@@ -463,6 +204,14 @@ public class RemotingCommand {
return (this.flag & bits) == bits;
}
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
public LanguageCode getLanguage() {
return language;
}
@@ -526,11 +275,8 @@ public class RemotingCommand {
extFields.put(key, value);
}
- @Override
- public String toString() {
- return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
- + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
- + serializeTypeCurrentRPC + "]";
+ public void setCustomHeader(CommandCustomHeader customHeader) {
+ this.customHeader = customHeader;
}
public SerializeType getSerializeTypeCurrentRPC() {
@@ -540,4 +286,12 @@ public class RemotingCommand {
public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) {
this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
}
+
+ @Override
+ public String toString() {
+ return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
+ + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
+ + serializeTypeCurrentRPC + "]";
+ }
+
}
\ No newline at end of file
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
index 01c853b..a6a7ec4 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
@@ -18,5 +18,5 @@ package org.apache.rocketmq.remoting.protocol;
public enum RemotingCommandType {
REQUEST_COMMAND,
- RESPONSE_COMMAND;
+ RESPONSE_COMMAND
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/LanguageCode.java
similarity index 92%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/serialize/LanguageCode.java
index 4382af3..246a17b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/LanguageCode.java
@@ -15,8 +15,11 @@
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.protocol;
+package org.apache.rocketmq.remoting.serialize;
+import org.msgpack.annotation.MessagePackOrdinalEnum;
+
+@MessagePackOrdinalEnum
public enum LanguageCode {
JAVA((byte) 0),
CPP((byte) 1),
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java
new file mode 100644
index 0000000..6c7ba78
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java
@@ -0,0 +1,40 @@
+package org.apache.rocketmq.remoting.serialize;
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.msgpack.MessagePack;
+
+public class MsgPackSerializable implements Serializer {
+ private final MessagePack messagePack = new MessagePack();
+
+// public MsgPackSerializable(){
+// messagePack.register(LanguageCode.class);
+// messagePack.register(SerializeType.class);
+// }
+ @Override
+ public SerializeType type() {
+ return SerializeType.MSGPACK;
+ }
+
+ @Override
+ public <T> T deserializer(byte[] content, Class<T> c) {
+ try {
+ return messagePack.read(content, c);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public RemotingCommand deserializer(byte[] content) {
+ return deserializer(content, RemotingCommand.class);
+ }
+
+ @Override
+ public byte[] serializer(Object object) {
+ try {
+ return messagePack.write(object);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/RemotingSerializable.java
similarity index 76%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/serialize/RemotingSerializable.java
index f80ff14..01e9b5c 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/RemotingSerializable.java
@@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.protocol;
+package org.apache.rocketmq.remoting.serialize;
import com.alibaba.fastjson.JSON;
import java.nio.charset.Charset;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-public abstract class RemotingSerializable {
+public class RemotingSerializable implements Serializer {
private final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
public static byte[] encode(final Object obj) {
@@ -58,4 +59,24 @@ public abstract class RemotingSerializable {
public String toJson(final boolean prettyFormat) {
return toJson(this, prettyFormat);
}
+
+ @Override
+ public SerializeType type() {
+ return SerializeType.JSON;
+ }
+
+ @Override
+ public <T> T deserializer(byte[] content, Class<T> c) {
+ return decode(content, c);
+ }
+
+ @Override
+ public byte[] serializer(Object object) {
+ return encode(object);
+ }
+
+ @Override
+ public RemotingCommand deserializer(byte[] content) {
+ return decode(content, RemotingCommand.class);
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/RocketMQSerializable.java
similarity index 91%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/serialize/RocketMQSerializable.java
index 66119e0..f903bde 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/RocketMQSerializable.java
@@ -14,15 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.protocol;
+package org.apache.rocketmq.remoting.serialize;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-public class RocketMQSerializable {
+public class RocketMQSerializable implements Serializer {
private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
@@ -201,4 +202,25 @@ public class RocketMQSerializable {
}
return true;
}
+
+ @Override
+ public SerializeType type() {
+ return SerializeType.ROCKETMQ;
+ }
+
+ @Override
+ public <T> T deserializer(byte[] content, Class<T> c) {
+ //Fixme
+ return null;
+ }
+
+ @Override
+ public byte[] serializer(Object object) {
+ return rocketMQProtocolEncode((RemotingCommand) object);
+ }
+
+ @Override
+ public RemotingCommand deserializer(byte[] content) {
+ return rocketMQProtocolDecode(content);
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializeType.java
similarity index 87%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializeType.java
index b040f8f..fc5ddba 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializeType.java
@@ -15,11 +15,15 @@
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.protocol;
+package org.apache.rocketmq.remoting.serialize;
+import org.msgpack.annotation.MessagePackOrdinalEnum;
+
+@MessagePackOrdinalEnum
public enum SerializeType {
JSON((byte) 0),
- ROCKETMQ((byte) 1);
+ ROCKETMQ((byte) 1),
+ MSGPACK((byte) 2);
private byte code;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/Serializer.java
similarity index 70%
copy from remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
copy to remoting/src/main/java/org/apache/rocketmq/remoting/serialize/Serializer.java
index 01c853b..b3f5888 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommandType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/Serializer.java
@@ -14,9 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.protocol;
-public enum RemotingCommandType {
- REQUEST_COMMAND,
- RESPONSE_COMMAND;
+package org.apache.rocketmq.remoting.serialize;
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+public interface Serializer {
+ SerializeType type();
+
+ <T> T deserializer(final byte[] content, final Class<T> c);
+
+ byte[] serializer(final Object object);
+
+ RemotingCommand deserializer(final byte[] content);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java
new file mode 100644
index 0000000..e015478
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java
@@ -0,0 +1,26 @@
+package org.apache.rocketmq.remoting.serialize;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SerializerFactory {
+
+ private static Map<Enum<SerializeType>, Serializer> serializerMap = new ConcurrentHashMap<Enum<SerializeType>, Serializer>();
+
+ private SerializerFactory() {
+ }
+
+ static {
+ register(SerializeType.JSON, new RemotingSerializable());
+ register(SerializeType.ROCKETMQ, new RocketMQSerializable());
+ register(SerializeType.MSGPACK, new MsgPackSerializable());
+ }
+
+ public static void register(SerializeType type, Serializer serialization) {
+ serializerMap.putIfAbsent(type, serialization);
+ }
+
+ public static Serializer get(SerializeType type) {
+ return serializerMap.get(type);
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
new file mode 100644
index 0000000..6dab218
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.transport;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Timer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettyEvent;
+import org.apache.rocketmq.remoting.netty.NettyEventType;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
+
+public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ protected final ConcurrentMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
+
+ protected final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
+ private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
+ private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
+ private final Lock lockNamesrvChannel = new ReentrantLock();
+
+ private final Lock lockChannelTables = new ReentrantLock();
+ private static final long LOCK_TIMEOUT_MILLIS = 3000;
+
+ public abstract Bootstrap getBootstrap();
+
+ public NettyRemotingClientAbstract() {
+ super();
+ }
+
+ public NettyRemotingClientAbstract(final int permitsOneway, final int permitsAsync) {
+ super(permitsOneway, permitsAsync);
+ }
+
+ private static int initValueIndex() {
+ Random r = new Random();
+
+ return Math.abs(r.nextInt() % 999) % 999;
+ }
+
+ public void closeChannel(final String addr, final Channel channel) {
+ if (null == channel) {
+ return;
+ }
+
+ final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr;
+
+ try {
+ if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean removeItemFromTable = true;
+ final ChannelWrapper prevCW = this.channelTables.get(addrRemote);
+
+ log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
+
+ if (null == prevCW) {
+ log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+ removeItemFromTable = false;
+ } else if (prevCW.getChannel() != channel) {
+ log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
+ addrRemote);
+ removeItemFromTable = false;
+ }
+
+ if (removeItemFromTable) {
+ this.channelTables.remove(addrRemote);
+ log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+ }
+
+ RemotingUtil.closeChannel(channel);
+ } catch (Exception e) {
+ log.error("closeChannel: close the channel exception", e);
+ } finally {
+ this.lockChannelTables.unlock();
+ }
+ } else {
+ log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+ } catch (InterruptedException e) {
+ log.error("closeChannel exception", e);
+ }
+ }
+
+ public void clearChannels() {
+ for (ChannelWrapper cw : this.channelTables.values()) {
+ this.closeChannel(null, cw.getChannel());
+ }
+ this.channelTables.clear();
+ }
+
+ public void closeChannel(final Channel channel) {
+ if (null == channel) {
+ return;
+ }
+
+ try {
+ if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean removeItemFromTable = true;
+ ChannelWrapper prevCW = null;
+ String addrRemote = null;
+ for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) {
+ String key = entry.getKey();
+ ChannelWrapper prev = entry.getValue();
+ if (prev.getChannel() != null) {
+ if (prev.getChannel() == channel) {
+ prevCW = prev;
+ addrRemote = key;
+ break;
+ }
+ }
+ }
+
+ if (null == prevCW) {
+ log.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+ removeItemFromTable = false;
+ }
+
+ if (removeItemFromTable) {
+ this.channelTables.remove(addrRemote);
+ log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+ RemotingUtil.closeChannel(channel);
+ }
+ } catch (Exception e) {
+ log.error("closeChannel: close the channel exception", e);
+ } finally {
+ this.lockChannelTables.unlock();
+ }
+ } else {
+ log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+ } catch (InterruptedException e) {
+ log.error("closeChannel exception", e);
+ }
+ }
+
+ protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException {
+ if (null == addr) {
+ return getAndCreateNameserverChannel(timeout);
+ }
+
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isOK()) {
+ return cw.getChannel();
+ }
+
+ return this.createChannel(addr, timeout);
+ }
+
+ private Channel getAndCreateNameserverChannel(long timeout) throws InterruptedException {
+ String addr = this.namesrvAddrChoosed.get();
+ if (addr != null) {
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isOK()) {
+ return cw.getChannel();
+ }
+ }
+
+ final List<String> addrList = this.namesrvAddrList.get();
+ if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ addr = this.namesrvAddrChoosed.get();
+ if (addr != null) {
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isOK()) {
+ return cw.getChannel();
+ }
+ }
+
+ if (addrList != null && !addrList.isEmpty()) {
+ for (int i = 0; i < addrList.size(); i++) {
+ int index = this.namesrvIndex.incrementAndGet();
+ index = Math.abs(index);
+ index = index % addrList.size();
+ String newAddr = addrList.get(index);
+
+ this.namesrvAddrChoosed.set(newAddr);
+ log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
+ Channel channelNew = this.createChannel(newAddr, timeout);
+ if (channelNew != null) {
+ return channelNew;
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("getAndCreateNameserverChannel: create name server channel exception", e);
+ } finally {
+ this.lockNamesrvChannel.unlock();
+ }
+ } else {
+ log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+
+ return null;
+ }
+
+ private Channel createChannel(final String addr, long timeout) throws InterruptedException {
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isOK()) {
+ cw.getChannel().close();
+ channelTables.remove(addr);
+ }
+
+ if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean createNewConnection;
+ cw = this.channelTables.get(addr);
+ if (cw != null) {
+
+ if (cw.isOK()) {
+ cw.getChannel().close();
+ this.channelTables.remove(addr);
+ createNewConnection = true;
+ } else if (!cw.getChannelFuture().isDone()) {
+ createNewConnection = false;
+ } else {
+ this.channelTables.remove(addr);
+ createNewConnection = true;
+ }
+ } else {
+ createNewConnection = true;
+ }
+
+ if (createNewConnection) {
+ if (getBootstrap() != null) {
+ ChannelFuture channelFuture = getBootstrap().connect(RemotingHelper.string2SocketAddress(addr));
+ log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
+ cw = new ChannelWrapper(channelFuture);
+ this.channelTables.put(addr, cw);
+ }
+ }
+ } catch (Exception e) {
+ log.error("createChannel: create channel exception", e);
+ } finally {
+ this.lockChannelTables.unlock();
+ }
+ } else {
+ log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+
+ if (cw != null) {
+ ChannelFuture channelFuture = cw.getChannelFuture();
+ if (channelFuture.awaitUninterruptibly(timeout)) {
+ if (cw.isOK()) {
+ log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
+ return cw.getChannel();
+ } else {
+ log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
+ }
+ } else {
+ log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, timeout,
+ channelFuture.toString());
+ }
+ }
+
+ return null;
+ }
+
+ public void updateNameServerAddressList(List<String> addrs) {
+ List<String> old = this.namesrvAddrList.get();
+ boolean update = false;
+
+ if (!addrs.isEmpty()) {
+ if (null == old) {
+ update = true;
+ } else if (addrs.size() != old.size()) {
+ update = true;
+ } else {
+ for (int i = 0; i < addrs.size() && !update; i++) {
+ if (!old.contains(addrs.get(i))) {
+ update = true;
+ }
+ }
+ }
+
+ if (update) {
+ Collections.shuffle(addrs);
+ log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);
+ this.namesrvAddrList.set(addrs);
+ }
+ }
+ }
+
+ public static class ChannelWrapper {
+ private final ChannelFuture channelFuture;
+
+ public ChannelWrapper(ChannelFuture channelFuture) {
+ this.channelFuture = channelFuture;
+ }
+
+ public boolean isOK() {
+ return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
+ }
+
+ public boolean isWritable() {
+ return this.channelFuture.channel().isWritable();
+ }
+
+ public Channel getChannel() {
+ return this.channelFuture.channel();
+ }
+
+ public ChannelFuture getChannelFuture() {
+ return channelFuture;
+ }
+ }
+
+ public class NettyConnectManageHandler extends ChannelDuplexHandler {
+ @Override
+ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
+ ChannelPromise promise) throws Exception {
+ final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
+ final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
+ log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
+
+ super.connect(ctx, remoteAddress, localAddress, promise);
+
+ if (getChannelEventListener() != null) {
+ putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));
+ }
+ }
+
+ @Override
+ public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
+ closeChannel(ctx.channel());
+ super.disconnect(ctx, promise);
+
+ if (getChannelEventListener() != null) {
+ putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
+ }
+ }
+
+ @Override
+ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
+ closeChannel(ctx.channel());
+ super.close(ctx, promise);
+ failFast(ctx.channel());
+ if (getChannelEventListener() != null) {
+ putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
+ }
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent) {
+ IdleStateEvent event = (IdleStateEvent) evt;
+ if (event.state().equals(IdleState.ALL_IDLE)) {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
+ closeChannel(ctx.channel());
+ if (getChannelEventListener() != null) {
+ putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
+ }
+ }
+ }
+
+ ctx.fireUserEventTriggered(evt);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
+ log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
+ closeChannel(ctx.channel());
+ if (getChannelEventListener() != null) {
+ putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
+ }
+ }
+ }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
new file mode 100644
index 0000000..e8d8471
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
@@ -0,0 +1,93 @@
+package org.apache.rocketmq.remoting.transport;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.netty.NettyEvent;
+import org.apache.rocketmq.remoting.netty.NettyEventType;
+import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
+
+public abstract class NettyRemotingServerAbstract extends NettyRemotingAbstract {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+ public NettyRemotingServerAbstract() {
+ super();
+ }
+
+ public NettyRemotingServerAbstract(final int permitsOneway, final int permitsAsync) {
+ super(permitsOneway, permitsAsync);
+ }
+
+ public class NettyConnectManageHandler extends ChannelDuplexHandler {
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
+ super.channelRegistered(ctx);
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
+ super.channelUnregistered(ctx);
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
+ super.channelActive(ctx);
+
+ if (getChannelEventListener() != null) {
+ putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
+ super.channelInactive(ctx);
+
+ if (getChannelEventListener() != null) {
+ putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
+ }
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent) {
+ IdleStateEvent event = (IdleStateEvent) evt;
+ if (event.state().equals(IdleState.ALL_IDLE)) {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
+ RemotingUtil.closeChannel(ctx.channel());
+ if (getChannelEventListener() != null) {
+ putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
+ }
+ }
+ }
+
+ ctx.fireUserEventTriggered(evt);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
+ log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
+
+ if (getChannelEventListener() != null) {
+ putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
+ }
+
+ RemotingUtil.closeChannel(ctx.channel());
+ }
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
new file mode 100644
index 0000000..b294958
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
@@ -0,0 +1,293 @@
+package org.apache.rocketmq.remoting.transport.http2;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.SupportedCipherSuiteFilter;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import javax.net.ssl.SSLException;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.common.Pair;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.NettyRemotingClientAbstract;
+import org.apache.rocketmq.remoting.util.ThreadUtils;
+
+public class Http2ClientImpl extends NettyRemotingClientAbstract implements RemotingClient {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private NettyClientConfig nettyClientConfig;
+ private final Bootstrap bootstrap = new Bootstrap();
+ private EventLoopGroup ioGroup;
+ private ExecutorService publicExecutor;
+ private ExecutorService callbackExecutor;
+ private ChannelEventListener channelEventListener;
+ private DefaultEventExecutorGroup defaultEventExecutorGroup;
+ private RPCHook rpcHook;
+
+ public Http2ClientImpl(final NettyClientConfig clientConfig,
+ final ChannelEventListener channelEventListener) {
+ super(clientConfig.getClientOnewaySemaphoreValue(), clientConfig.getClientAsyncSemaphoreValue());
+ init(clientConfig, channelEventListener);
+ }
+
+ public Http2ClientImpl(){
+ super();
+ }
+
+ @Override
+ public void init(NettyClientConfig clientConfig, ChannelEventListener channelEventListener) {
+ this.nettyClientConfig = clientConfig;
+ this.channelEventListener = channelEventListener;
+ this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
+ clientConfig.getClientWorkerThreads()));
+ this.publicExecutor = ThreadUtils.newFixedThreadPool(
+ clientConfig.getClientCallbackExecutorThreads(),
+ 10000, "Remoting-PublicExecutor", true);
+ this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
+ ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
+ buildSslContext();
+ }
+
+ private void buildSslContext() {
+ SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
+ try {
+ sslContext = SslContextBuilder.forClient()
+ .sslProvider(provider)
+ /* NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
+ * Please refer to the HTTP/2 specification for cipher requirements. */
+ .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
+ .trustManager(InsecureTrustManagerFactory.INSTANCE)
+ .build();
+ } catch (SSLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ this.bootstrap.group(this.ioGroup).channel(NioSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_KEEPALIVE, false)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
+ .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
+ .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(
+ defaultEventExecutorGroup,
+ Http2Handler.newHandler(false),
+ new NettyEncoder(),
+ new NettyDecoder(),
+ new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
+ new NettyConnectManageHandler(),
+ new NettyClientHandler());
+ }
+ });
+ startUpHouseKeepingService();
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ try {
+
+ for (ChannelWrapper cw : this.channelTables.values()) {
+ this.closeChannel(null, cw.getChannel());
+ }
+
+ this.channelTables.clear();
+ if (this.ioGroup != null) {
+ this.ioGroup.shutdownGracefully();
+ }
+
+ if (this.defaultEventExecutorGroup != null) {
+ this.defaultEventExecutorGroup.shutdownGracefully();
+ }
+ } catch (Exception e) {
+ log.error("Http2ClientImpl shutdown exception, ", e);
+ }
+
+ if (this.publicExecutor != null) {
+ try {
+ this.publicExecutor.shutdown();
+ } catch (Exception e) {
+ log.error("NettyRemotingServer shutdown exception, ", e);
+ }
+ }
+ }
+
+ @Override
+ public void registerRPCHook(RPCHook rpcHook) {
+ this.rpcHook = rpcHook;
+ }
+
+ @Override
+ public void updateNameServerAddressList(List<String> addrs) {
+ super.updateNameServerAddressList(addrs);
+ }
+
+ @Override
+ public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
+ throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
+ final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+ if (channel != null && channel.isActive()) {
+ try {
+ if (this.rpcHook != null) {
+ this.rpcHook.doBeforeRequest(addr, request);
+ }
+ RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
+ if (this.rpcHook != null) {
+ this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
+ }
+ return response;
+ } catch (RemotingSendRequestException e) {
+ log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
+ this.closeChannel(addr, channel);
+ throw e;
+ } catch (RemotingTimeoutException e) {
+ if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
+ this.closeChannel(addr, channel);
+ log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
+ }
+ log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
+ throw e;
+ }
+ } else {
+ this.closeChannel(addr, channel);
+ throw new RemotingConnectException(addr);
+ }
+ }
+
+ @Override
+ public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
+ throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
+ RemotingSendRequestException {
+ final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+ if (channel != null && channel.isActive()) {
+ try {
+ if (this.rpcHook != null) {
+ this.rpcHook.doBeforeRequest(addr, request);
+ }
+ this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
+ } catch (RemotingSendRequestException e) {
+ log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
+ this.closeChannel(addr, channel);
+ throw e;
+ }
+ } else {
+ this.closeChannel(addr, channel);
+ throw new RemotingConnectException(addr);
+ }
+ }
+
+ @Override
+ public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
+ RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+ if (channel != null && channel.isActive()) {
+ try {
+ if (this.rpcHook != null) {
+ this.rpcHook.doBeforeRequest(addr, request);
+ }
+ this.invokeOnewayImpl(channel, request, timeoutMillis);
+ } catch (RemotingSendRequestException e) {
+ log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
+ this.closeChannel(addr, channel);
+ throw e;
+ }
+ } else {
+ this.closeChannel(addr, channel);
+ throw new RemotingConnectException(addr);
+ }
+ }
+
+ @Override
+ public Bootstrap getBootstrap() {
+ return this.bootstrap;
+ }
+
+ @Override
+ public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
+ ExecutorService executorThis = executor;
+ if (null == executor) {
+ executorThis = this.publicExecutor;
+ }
+
+ Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+ this.processorTable.put(requestCode, pair);
+ }
+
+ @Override
+ public boolean isChannelWritable(String addr) {
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isOK()) {
+ return cw.isWritable();
+ }
+ return true;
+ }
+
+ @Override
+ public List<String> getNameServerAddressList() {
+ return this.namesrvAddrList.get();
+ }
+
+ @Override
+ public ChannelEventListener getChannelEventListener() {
+ return channelEventListener;
+ }
+
+ @Override
+ public RPCHook getRPCHook() {
+ return this.rpcHook;
+ }
+
+ @Override
+ public ExecutorService getCallbackExecutor() {
+ return callbackExecutor != null ? callbackExecutor : publicExecutor;
+ }
+
+ @Override
+ public void setCallbackExecutor(final ExecutorService callbackExecutor) {
+ this.callbackExecutor = callbackExecutor;
+ }
+
+ class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+ processMessageReceived(ctx, msg);
+ }
+ }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2Handler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2Handler.java
new file mode 100644
index 0000000..2f4c8a1
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2Handler.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.transport.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.DefaultHttp2Connection;
+import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
+import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
+import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
+import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder;
+import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2ConnectionDecoder;
+import io.netty.handler.codec.http2.Http2ConnectionEncoder;
+import io.netty.handler.codec.http2.Http2ConnectionHandler;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2FrameAdapter;
+import io.netty.handler.codec.http2.Http2FrameReader;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2HeadersDecoder;
+import io.netty.handler.codec.http2.Http2Settings;
+import io.netty.handler.codec.http2.StreamBufferingEncoder;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
+
+public class Http2Handler extends Http2ConnectionHandler {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+ private boolean isServer;
+ private int lastStreamId;
+
+ private Http2Handler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
+ final Http2Settings initialSettings, final boolean isServer) {
+ super(decoder, encoder, initialSettings);
+ decoder.frameListener(new FrameListener());
+ this.isServer = isServer;
+ }
+
+ public static Http2Handler newHandler(final boolean isServer) {
+ log.info("isServer: " + isServer);
+ Http2HeadersDecoder headersDecoder = new DefaultHttp2HeadersDecoder(true);
+ Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
+ Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
+
+ Http2Connection connection = new DefaultHttp2Connection(isServer);
+
+ Http2ConnectionEncoder encoder = new StreamBufferingEncoder(
+ new DefaultHttp2ConnectionEncoder(connection, frameWriter));
+
+ connection.local().flowController(new DefaultHttp2LocalFlowController(connection,
+ DEFAULT_WINDOW_UPDATE_RATIO, true));
+
+ Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
+ frameReader);
+
+ Http2Settings settings = new Http2Settings();
+
+ if (!isServer) {
+ settings.pushEnabled(true);
+ }
+
+ settings.initialWindowSize(1048576 * 10); //10MiB
+ settings.maxConcurrentStreams(Integer.MAX_VALUE);
+
+ return newHandler(decoder, encoder, settings, isServer);
+ }
+
+ private static Http2Handler newHandler(final Http2ConnectionDecoder decoder, final Http2ConnectionEncoder encoder,
+ final Http2Settings settings, boolean isServer) {
+ return new Http2Handler(decoder, encoder, settings, isServer);
+ }
+
+ @Override
+ public void write(final ChannelHandlerContext ctx, final Object msg,
+ final ChannelPromise promise) throws Exception {
+ if (isServer) {
+ assert msg instanceof ByteBuf;
+ sendAPushPromise(ctx, lastStreamId, lastStreamId + 1, (ByteBuf) msg);
+ } else {
+
+ final Http2Headers headers = new DefaultHttp2Headers();
+
+ try {
+ long threadId = Thread.currentThread().getId();
+ long streamId = (threadId % 2 == 0) ? threadId + 1 : threadId + 2;
+ encoder().writeHeaders(ctx, (int) streamId, headers, 0, false, promise);
+ encoder().writeData(ctx, (int) streamId, (ByteBuf) msg, 0, false, ctx.newPromise());
+ ctx.flush();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ }
+
+ private void sendAPushPromise(ChannelHandlerContext ctx, int streamId, int pushPromiseStreamId,
+ ByteBuf payload) throws Http2Exception {
+
+ encoder().writePushPromise(ctx, streamId, pushPromiseStreamId,
+ new DefaultHttp2Headers().status(OK.codeAsText()), 0, ctx.newPromise());
+
+ Http2Headers headers = new DefaultHttp2Headers();
+ headers.status(OK.codeAsText());
+ encoder().writeHeaders(ctx, pushPromiseStreamId, headers, 0, false, ctx.newPromise());
+ encoder().writeData(ctx, pushPromiseStreamId, payload, 0, false, ctx.newPromise());
+ }
+
+ private class FrameListener extends Http2FrameAdapter {
+ @Override
+ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
+ boolean endOfStream) throws Http2Exception {
+ //Http2Handler.this.onDataRead(ctx, streamId, data, endOfStream);
+ data.retain();
+ Http2Handler.this.lastStreamId = streamId;
+ ctx.fireChannelRead(data);
+ return data.readableBytes() + padding;
+ }
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
new file mode 100644
index 0000000..02c4fb6
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
@@ -0,0 +1,242 @@
+package org.apache.rocketmq.remoting.transport.http2;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.SupportedCipherSuiteFilter;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.common.Pair;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.netty.ChannelStatisticsHandler;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.NettyRemotingServerAbstract;
+import org.apache.rocketmq.remoting.util.JvmUtils;
+import org.apache.rocketmq.remoting.util.ThreadUtils;
+
+public class Http2ServerImpl extends NettyRemotingServerAbstract implements RemotingServer {
+ private static final InternalLogger LOG = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+ private ServerBootstrap serverBootstrap;
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup ioGroup;
+ private EventExecutorGroup workerGroup;
+ private Class<? extends ServerSocketChannel> socketChannelClass;
+ private NettyServerConfig serverConfig;
+ private ChannelEventListener channelEventListener;
+ private ExecutorService publicExecutor;
+ private int port;
+ private RPCHook rpcHook;
+
+ public Http2ServerImpl(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
+ super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
+ init(nettyServerConfig, channelEventListener);
+ }
+
+ public Http2ServerImpl() {
+ super();
+ }
+
+ @Override
+ public void init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
+ super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
+ this.serverBootstrap = new ServerBootstrap();
+ this.serverConfig = nettyServerConfig;
+ this.channelEventListener = channelEventListener;
+ this.publicExecutor = ThreadUtils.newFixedThreadPool(
+ serverConfig.getServerCallbackExecutorThreads(),
+ 10000, "Remoting-PublicExecutor", true);
+ if (JvmUtils.isLinux() && this.serverConfig.isUseEpollNativeSelector()) {
+ this.ioGroup = new EpollEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads",
+ serverConfig.getServerSelectorThreads()));
+ this.bossGroup = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
+ serverConfig.getServerAcceptorThreads()));
+ this.socketChannelClass = EpollServerSocketChannel.class;
+ } else {
+ this.bossGroup = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
+ serverConfig.getServerAcceptorThreads()));
+ this.ioGroup = new NioEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyNioIoThreads",
+ serverConfig.getServerSelectorThreads()));
+ this.socketChannelClass = NioServerSocketChannel.class;
+ }
+
+ this.workerGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
+ ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
+ this.port = nettyServerConfig.getListenPort();
+ buildHttp2SslContext();
+ }
+
+ private void buildHttp2SslContext() {
+ try {
+ SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
+ SelfSignedCertificate ssc;
+ //NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
+ //Please refer to the HTTP/2 specification for cipher requirements.
+ ssc = new SelfSignedCertificate();
+ sslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
+ .sslProvider(provider)
+ .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE).build();
+ } catch (Exception e) {
+ LOG.error("Can not build SSL context !", e);
+ }
+ }
+
+ @Override
+ public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
+ ExecutorService executorThis = executor;
+ if (null == executor) {
+ executorThis = this.publicExecutor;
+ }
+ Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+ this.processorTable.put(requestCode, pair);
+ }
+
+ @Override
+ public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
+ this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
+ }
+
+ @Override
+ public int localListenPort() {
+ return this.port;
+ }
+
+ @Override
+ public Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(int requestCode) {
+ return processorTable.get(requestCode);
+ }
+
+ @Override
+ public void push(String addr, String sessionId, RemotingCommand remotingCommand) {
+
+ }
+
+ @Override
+ public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
+ throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
+ return this.invokeSyncImpl(channel, request, timeoutMillis);
+ }
+
+ @Override
+ public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
+ throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
+ }
+
+ @Override
+ public void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException,
+ RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ this.invokeOnewayImpl(channel, request, timeoutMillis);
+ }
+
+ private void applyOptions(ServerBootstrap bootstrap) {
+ if (null != serverConfig) {
+ bootstrap.option(ChannelOption.SO_BACKLOG, 1024)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.SO_KEEPALIVE, false)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.SO_SNDBUF, serverConfig.getServerSocketSndBufSize())
+ .childOption(ChannelOption.SO_RCVBUF, serverConfig.getServerSocketRcvBufSize());
+ }
+
+ if (serverConfig.isServerPooledByteBufAllocatorEnable()) {
+ bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ }
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ this.serverBootstrap.group(this.bossGroup, this.ioGroup).
+ channel(socketChannelClass).childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ channels.add(ch);
+
+ ChannelPipeline cp = ch.pipeline();
+
+ cp.addLast(ChannelStatisticsHandler.NAME, new ChannelStatisticsHandler(channels));
+
+ cp.addLast(workerGroup,
+ Http2Handler.newHandler(true),
+ new NettyEncoder(),
+ new NettyDecoder(),
+ new IdleStateHandler(serverConfig.getConnectionChannelReaderIdleSeconds(),
+ serverConfig.getConnectionChannelWriterIdleSeconds(),
+ serverConfig.getServerChannelMaxIdleTimeSeconds()),
+ new NettyConnectManageHandler(),
+ new NettyServerHandler());
+ }
+ });
+
+ applyOptions(serverBootstrap);
+
+ ChannelFuture channelFuture = this.serverBootstrap.bind(this.port).syncUninterruptibly();
+ this.port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
+ startUpHouseKeepingService();
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ ThreadUtils.shutdownGracefully(publicExecutor, 2000, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void registerRPCHook(RPCHook rpcHook) {
+ this.rpcHook = rpcHook;
+ }
+
+ @Override
+ public RPCHook getRPCHook() {
+ return this.rpcHook;
+ }
+
+ @Override
+ public ExecutorService getCallbackExecutor() {
+ return this.publicExecutor;
+ }
+
+ @Override
+ public ChannelEventListener getChannelEventListener() {
+ return this.channelEventListener;
+ }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java
similarity index 93%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java
index f64ab2d..70858a9 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyDecoder.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.netty;
+package org.apache.rocketmq.remoting.transport.rocketmq;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@@ -24,6 +24,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
@@ -46,8 +47,7 @@ public class NettyDecoder extends LengthFieldBasedFrameDecoder {
}
ByteBuffer byteBuffer = frame.nioBuffer();
-
- return RemotingCommand.decode(byteBuffer);
+ return CodecHelper.decode(byteBuffer);
} catch (Exception e) {
log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
RemotingUtil.closeChannel(ctx.channel());
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyEncoder.java
similarity index 88%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyEncoder.java
index 8c3c56a..8285dee 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyEncoder.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.netty;
+package org.apache.rocketmq.remoting.transport.rocketmq;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
@@ -24,17 +24,17 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@Override
- public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
- throws Exception {
+ public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) {
try {
- ByteBuffer header = remotingCommand.encodeHeader();
- out.writeBytes(header);
+ ByteBuffer byteBuffer = CodecHelper.encodeHeader(remotingCommand);
+ out.writeBytes(byteBuffer);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
new file mode 100644
index 0000000..14666d2
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.transport.rocketmq;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import java.io.IOException;
+import java.security.cert.CertificateException;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import javax.xml.ws.AsyncHandler;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ChannelEventListener;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.common.Pair;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.TlsHelper;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.NettyRemotingClientAbstract;
+import org.apache.rocketmq.remoting.util.ThreadUtils;
+
+public class NettyRemotingClient extends NettyRemotingClientAbstract implements RemotingClient {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+ private NettyClientConfig nettyClientConfig;
+ private Bootstrap bootstrap = new Bootstrap();
+ private EventLoopGroup eventLoopGroupWorker;
+
+ private ExecutorService publicExecutor;
+
+ private ExecutorService asyncExecutor;
+ /**
+ * Invoke the callback methods in this executor when process response.
+ */
+ private ExecutorService callbackExecutor;
+ private ChannelEventListener channelEventListener;
+ private DefaultEventExecutorGroup defaultEventExecutorGroup;
+ private RPCHook rpcHook;
+
+ public NettyRemotingClient() {
+ super();
+ }
+
+ public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
+ this(nettyClientConfig, null);
+ }
+
+ public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
+ final ChannelEventListener channelEventListener) {
+ super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
+ init(nettyClientConfig, channelEventListener);
+ }
+
+ @Override
+ public void init(NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
+ this.nettyClientConfig = nettyClientConfig;
+ this.channelEventListener = channelEventListener;
+ this.eventLoopGroupWorker = new NioEventLoopGroup(nettyClientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
+ nettyClientConfig.getClientWorkerThreads()));
+ this.publicExecutor = ThreadUtils.newFixedThreadPool(
+ nettyClientConfig.getClientCallbackExecutorThreads(),
+ 10000, "Remoting-PublicExecutor", true);
+ this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
+ ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", nettyClientConfig.getClientWorkerThreads()));
+ if (nettyClientConfig.isUseTLS()) {
+ try {
+ sslContext = TlsHelper.buildSslContext(true);
+ log.info("SSL enabled for client");
+ } catch (IOException e) {
+ log.error("Failed to create SSLContext", e);
+ } catch (CertificateException e) {
+ log.error("Failed to create SSLContext", e);
+ throw new RuntimeException("Failed to create SSLContext", e);
+ }
+ }
+ }
+
+ @Override
+ public Bootstrap getBootstrap() {
+ return this.bootstrap;
+ }
+
+ @Override
+ public void start() {
+ bootstrap = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_KEEPALIVE, false)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
+ .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
+ .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (nettyClientConfig.isUseTLS()) {
+ if (null != sslContext) {
+ pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
+ log.info("Prepend SSL handler");
+ } else {
+ log.warn("Connections are insecure as SSLContext is null!");
+ }
+ }
+ pipeline.addLast(
+ defaultEventExecutorGroup,
+ new NettyEncoder(),
+ new NettyDecoder(),
+ new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
+ new NettyConnectManageHandler(),
+ new NettyClientHandler());
+ }
+ });
+ startUpHouseKeepingService();
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ try {
+ clearChannels();
+ if (this.eventLoopGroupWorker != null) {
+ this.eventLoopGroupWorker.shutdownGracefully();
+ }
+ if (this.defaultEventExecutorGroup != null) {
+ this.defaultEventExecutorGroup.shutdownGracefully();
+ }
+ if (this.publicExecutor != null) {
+ this.publicExecutor.shutdown();
+ }
+ } catch (Exception e) {
+ log.error("NettyRemotingClient shutdown exception, ", e);
+ }
+ }
+
+ @Override
+ public void registerRPCHook(RPCHook rpcHook) {
+ this.rpcHook = rpcHook;
+ }
+
+ @Override
+ public void updateNameServerAddressList(List<String> addrs) {
+ super.updateNameServerAddressList(addrs);
+ }
+
+ @Override
+ public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
+ throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
+ long beginStartTime = System.currentTimeMillis();
+ final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+ if (channel != null && channel.isActive()) {
+ try {
+ if (this.rpcHook != null) {
+ this.rpcHook.doBeforeRequest(addr, request);
+ }
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeoutMillis < costTime) {
+ throw new RemotingTimeoutException("invokeSync call timeout");
+ }
+ RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
+ if (this.rpcHook != null) {
+ this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
+ }
+ return response;
+ } catch (RemotingSendRequestException e) {
+ log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
+ this.closeChannel(addr, channel);
+ throw e;
+ } catch (RemotingTimeoutException e) {
+ if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
+ this.closeChannel(addr, channel);
+ log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
+ }
+ log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
+ throw e;
+ }
+ } else {
+ this.closeChannel(addr, channel);
+ throw new RemotingConnectException(addr);
+ }
+ }
+
+ @Override
+ public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
+ throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
+ RemotingSendRequestException {
+ long beginStartTime = System.currentTimeMillis();
+ final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+ if (channel != null && channel.isActive()) {
+ try {
+ if (this.rpcHook != null) {
+ this.rpcHook.doBeforeRequest(addr, request);
+ }
+ long costTime = System.currentTimeMillis() - beginStartTime;
+ if (timeoutMillis < costTime) {
+ throw new RemotingTooMuchRequestException("invokeAsync call timeout");
+ }
+ this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
+ } catch (RemotingSendRequestException e) {
+ log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
+ this.closeChannel(addr, channel);
+ throw e;
+ }
+ } else {
+ this.closeChannel(addr, channel);
+ throw new RemotingConnectException(addr);
+ }
+ }
+
+ @Override
+ public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
+ RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ final Channel channel = this.getAndCreateChannel(addr, timeoutMillis);
+ if (channel != null && channel.isActive()) {
+ try {
+ if (this.rpcHook != null) {
+ this.rpcHook.doBeforeRequest(addr, request);
+ }
+ this.invokeOnewayImpl(channel, request, timeoutMillis);
+ } catch (RemotingSendRequestException e) {
+ log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
+ this.closeChannel(addr, channel);
+ throw e;
+ }
+ } else {
+ this.closeChannel(addr, channel);
+ throw new RemotingConnectException(addr);
+ }
+ }
+
+ @Override
+ public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
+ ExecutorService executorThis = executor;
+ if (null == executor) {
+ executorThis = this.publicExecutor;
+ }
+
+ Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+ this.processorTable.put(requestCode, pair);
+ }
+
+ @Override
+ public boolean isChannelWritable(String addr) {
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isOK()) {
+ return cw.isWritable();
+ }
+ return true;
+ }
+
+ @Override
+ public List<String> getNameServerAddressList() {
+ return this.namesrvAddrList.get();
+ }
+
+ @Override
+ public ChannelEventListener getChannelEventListener() {
+ return channelEventListener;
+ }
+
+ @Override
+ public RPCHook getRPCHook() {
+ return this.rpcHook;
+ }
+
+ @Override
+ public ExecutorService getCallbackExecutor() {
+ return callbackExecutor != null ? callbackExecutor : publicExecutor;
+ }
+
+ @Override
+ public void setCallbackExecutor(final ExecutorService callbackExecutor) {
+ this.callbackExecutor = callbackExecutor;
+ }
+
+ class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+ processMessageReceived(ctx, msg);
+ }
+ }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
similarity index 59%
rename from remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
rename to remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
index 1984842..9e7d419 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
@@ -14,66 +14,64 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.remoting.netty;
+package org.apache.rocketmq.remoting.transport.rocketmq;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.timeout.IdleState;
-import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.cert.CertificateException;
import java.util.NoSuchElementException;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.netty.FileRegionEncoder;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.TlsHelper;
+import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.NettyRemotingServerAbstract;
+import org.apache.rocketmq.remoting.util.JvmUtils;
+import org.apache.rocketmq.remoting.util.ThreadUtils;
-public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
+public class NettyRemotingServer extends NettyRemotingServerAbstract implements RemotingServer {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
- private final ServerBootstrap serverBootstrap;
- private final EventLoopGroup eventLoopGroupSelector;
- private final EventLoopGroup eventLoopGroupBoss;
- private final NettyServerConfig nettyServerConfig;
-
- private final ExecutorService publicExecutor;
- private final ChannelEventListener channelEventListener;
+ private ServerBootstrap serverBootstrap;
+ private EventLoopGroup eventLoopGroupSelector;
+ private EventLoopGroup eventLoopGroupBoss;
+ private NettyServerConfig nettyServerConfig;
- private final Timer timer = new Timer("ServerHouseKeepingService", true);
+ private ExecutorService publicExecutor;
+ private ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup;
+ private Class<? extends ServerSocketChannel> socketChannelClass;
private RPCHook rpcHook;
@@ -83,69 +81,56 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
private static final String TLS_HANDLER_NAME = "sslHandler";
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
+ public NettyRemotingServer() {
+ super();
+ }
+
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
- super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
+ init(nettyServerConfig, channelEventListener);
+ }
+
+ @Override
+ public void init(NettyServerConfig serverConfig, ChannelEventListener channelEventListener) {
+ this.nettyServerConfig = serverConfig;
+ super.init(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
- this.nettyServerConfig = nettyServerConfig;
+ this.nettyServerConfig = serverConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
-
- this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
- }
- });
-
- this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
- }
- });
-
- if (useEpoll()) {
- this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
- private int threadTotal = nettyServerConfig.getServerSelectorThreads();
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
- }
- });
+ this.publicExecutor = ThreadUtils.newFixedThreadPool(
+ publicThreadNums,
+ 10000, "Remoting-PublicExecutor", true);
+ if (JvmUtils.isUseEpoll() && this.nettyServerConfig.isUseEpollNativeSelector()) {
+ this.eventLoopGroupSelector = new EpollEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads",
+ serverConfig.getServerSelectorThreads()));
+ this.eventLoopGroupBoss = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
+ serverConfig.getServerAcceptorThreads()));
+ this.socketChannelClass = EpollServerSocketChannel.class;
} else {
- this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
- private AtomicInteger threadIndex = new AtomicInteger(0);
- private int threadTotal = nettyServerConfig.getServerSelectorThreads();
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
- }
- });
+ this.eventLoopGroupBoss = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
+ serverConfig.getServerAcceptorThreads()));
+ this.eventLoopGroupSelector = new NioEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyNioIoThreads",
+ serverConfig.getServerSelectorThreads()));
+ this.socketChannelClass = NioServerSocketChannel.class;
}
-
+ this.port = nettyServerConfig.getListenPort();
+ this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
+ ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
loadSslContext();
}
public void loadSslContext() {
TlsMode tlsMode = TlsSystemConfig.tlsMode;
log.info("Server is running in TLS {} mode", tlsMode.getName());
-
if (tlsMode != TlsMode.DISABLED) {
try {
sslContext = TlsHelper.buildSslContext(false);
@@ -158,36 +143,19 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
}
- private boolean useEpoll() {
- return RemotingUtil.isLinuxPlatform()
- && nettyServerConfig.isUseEpollNativeSelector()
- && Epoll.isAvailable();
- }
-
@Override
public void start() {
- this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
- nettyServerConfig.getServerWorkerThreads(),
- new ThreadFactory() {
-
- private AtomicInteger threadIndex = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
- }
- });
-
+ super.start();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
- .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
+ .channel(socketChannelClass)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
- .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
+ .localAddress(new InetSocketAddress(this.port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
@@ -197,7 +165,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
- new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+ new IdleStateHandler(nettyServerConfig.getConnectionChannelReaderIdleSeconds(),
+ nettyServerConfig.getConnectionChannelWriterIdleSeconds(),
+ nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
@@ -215,46 +185,25 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
-
- if (this.channelEventListener != null) {
- this.nettyEventExecutor.start();
- }
-
- this.timer.scheduleAtFixedRate(new TimerTask() {
-
- @Override
- public void run() {
- try {
- NettyRemotingServer.this.scanResponseTable();
- } catch (Throwable e) {
- log.error("scanResponseTable exception", e);
- }
- }
- }, 1000 * 3, 1000);
+ startUpHouseKeepingService();
}
@Override
public void shutdown() {
try {
- if (this.timer != null) {
- this.timer.cancel();
+ super.shutdown();
+ if (this.eventLoopGroupBoss != null) {
+ this.eventLoopGroupBoss.shutdownGracefully();
}
-
- this.eventLoopGroupBoss.shutdownGracefully();
-
- this.eventLoopGroupSelector.shutdownGracefully();
-
- if (this.nettyEventExecutor != null) {
- this.nettyEventExecutor.shutdown();
+ if (this.eventLoopGroupSelector != null) {
+ this.eventLoopGroupSelector.shutdownGracefully();
}
-
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();
}
} catch (Exception e) {
log.error("NettyRemotingServer shutdown exception, ", e);
}
-
if (this.publicExecutor != null) {
try {
this.publicExecutor.shutdown();
@@ -390,80 +339,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
}
- class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
- processMessageReceived(ctx, msg);
- }
- }
-
- class NettyConnectManageHandler extends ChannelDuplexHandler {
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
- final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
- super.channelRegistered(ctx);
- }
-
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
- final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
- super.channelUnregistered(ctx);
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
- super.channelActive(ctx);
-
- if (NettyRemotingServer.this.channelEventListener != null) {
- NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
- }
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
- super.channelInactive(ctx);
-
- if (NettyRemotingServer.this.channelEventListener != null) {
- NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
- }
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state().equals(IdleState.ALL_IDLE)) {
- final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
- RemotingUtil.closeChannel(ctx.channel());
- if (NettyRemotingServer.this.channelEventListener != null) {
- NettyRemotingServer.this
- .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
- }
- }
- }
-
- ctx.fireUserEventTriggered(evt);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
- log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
-
- if (NettyRemotingServer.this.channelEventListener != null) {
- NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
- }
+ @Override
+ public void push(String addr, String sessionId, RemotingCommand remotingCommand) {
- RemotingUtil.closeChannel(ctx.channel());
- }
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/JvmUtils.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/JvmUtils.java
new file mode 100644
index 0000000..e889a91
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/JvmUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.util;
+
+import io.netty.channel.epoll.Epoll;
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Random;
+
+public final class JvmUtils {
+ public static final String OS_NAME = System.getProperty("os.name").toLowerCase();
+ //public static final String OS_VERSION = System.getProperty("os.version").toLowerCase();
+
+ /**
+ * A constructor to stop this class being constructed.
+ */
+ private JvmUtils() {
+ // Unused
+ }
+
+ public static boolean isWindows() {
+ return OS_NAME.startsWith("win");
+ }
+
+ public static boolean isWindows10() {
+ return OS_NAME.startsWith("win") && OS_NAME.endsWith("10");
+ }
+
+ public static boolean isMacOSX() {
+ return OS_NAME.contains("mac");
+ }
+
+ public static boolean isLinux() {
+ return OS_NAME.startsWith("linux");
+ }
+
+ public static boolean isUseEpoll() {
+ return isLinux() && Epoll.isAvailable();
+ }
+
+ public static boolean isUnix() {
+ return OS_NAME.contains("nix") ||
+ OS_NAME.contains("nux") ||
+ OS_NAME.contains("aix") ||
+ OS_NAME.contains("bsd") ||
+ OS_NAME.contains("sun") ||
+ OS_NAME.contains("hpux");
+ }
+
+ public static boolean isSolaris() {
+ return OS_NAME.startsWith("sun");
+ }
+
+ public static int getProcessId() {
+ String pid = null;
+ final File self = new File("/proc/self");
+ try {
+ if (self.exists()) {
+ pid = self.getCanonicalFile().getName();
+ }
+ } catch (IOException ignored) {
+ //Ignore it
+ }
+
+ if (pid == null) {
+ pid = ManagementFactory.getRuntimeMXBean().getName().split("@", 0)[0];
+ }
+
+ if (pid == null) {
+ int rpid = new Random().nextInt(1 << 16);
+ return rpid;
+ } else {
+ return Integer.parseInt(pid);
+ }
+ }
+
+}
+
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java
new file mode 100644
index 0000000..ccd037f
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/RemotingUtil.java
@@ -0,0 +1,6 @@
+package org.apache.rocketmq.remoting.util;
+
+public class RemotingUtil {
+ public static final String REMOTING_CHARSET = "UTF-8";
+ public static final String DEFAULT_PROTOCOL = "rocketmq";
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
new file mode 100644
index 0000000..d3a2312
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ServiceProvider.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.rocketmq.remoting.util;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class ServiceProvider {
+
+ private static final InternalLogger LOG = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+ /**
+ * A reference to the classloader that loaded this class. It's more efficient to compute it once and cache it here.
+ */
+ private static ClassLoader thisClassLoader;
+
+ /**
+ * JDK1.3+ <a href= "http://java.sun.com/j2se/1.3/docs/guide/jar/jar.html#Service%20Provider" > 'Service Provider' specification</a>.
+ */
+
+ static {
+ thisClassLoader = getClassLoader(ServiceProvider.class);
+ }
+
+ /**
+ * Returns a string that uniquely identifies the specified object, including its class.
+ * <p>
+ * The returned string is of form "classname@hashcode", ie is the same as the return value of the Object.toString()
+ * method, but works even when the specified object's class has overidden the toString method.
+ *
+ * @param o may be null.
+ * @return a string of form classname@hashcode, or "null" if param o is null.
+ */
+ protected static String objectId(Object o) {
+ if (o == null) {
+ return "null";
+ } else {
+ return o.getClass().getName() + "@" + System.identityHashCode(o);
+ }
+ }
+
+ protected static ClassLoader getClassLoader(Class<?> clazz) {
+ try {
+ return clazz.getClassLoader();
+ } catch (SecurityException e) {
+ LOG.error("Unable to get classloader for class {} due to security restrictions !",
+ clazz, e.getMessage());
+ throw e;
+ }
+ }
+
+ public static ClassLoader getContextClassLoader() {
+ ClassLoader classLoader = null;
+ try {
+ classLoader = Thread.currentThread().getContextClassLoader();
+ } catch (SecurityException ex) {
+ /**
+ * The getContextClassLoader() method throws SecurityException when the context
+ * class loader isn't an ancestor of the calling class's class
+ * loader, or if security permissions are restricted.
+ */
+ }
+ return classLoader;
+ }
+
+ public static InputStream getResourceAsStream(ClassLoader loader, String name) {
+ if (loader != null) {
+ return loader.getResourceAsStream(name);
+ } else {
+ return ClassLoader.getSystemResourceAsStream(name);
+ }
+ }
+
+ public static <T> Map<String, T> load(String path, Class<?> clazz) {
+ LOG.info("Looking for a resource file of name [{}] ...", path);
+ Map<String, T> services = new ConcurrentHashMap<String, T>();
+ try {
+
+ final InputStream is = getResourceAsStream(getContextClassLoader(), path);
+ if (is != null) {
+ BufferedReader reader;
+ try {
+ reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+ } catch (java.io.UnsupportedEncodingException e) {
+ reader = new BufferedReader(new InputStreamReader(is));
+ }
+ String serviceName = reader.readLine();
+ while (serviceName != null && !"".equals(serviceName)) {
+ LOG.info(
+ "Creating an instance as specified by file {} which was present in the path of the context classloader.",
+ path);
+ String[] service = serviceName.split("=");
+ if (service.length != 2) {
+ continue;
+ } else {
+ if (services.containsKey(service[0])) {
+ continue;
+ } else {
+ LOG.info("Begin to load protocol: " + service[0]);
+ services.put(service[0], (T) initService(getContextClassLoader(), service[1], clazz));
+ }
+ }
+ serviceName = reader.readLine();
+ }
+ reader.close();
+ } else {
+ // is == null
+ LOG.warn("No resource file with name [{}] found.", path);
+ }
+ } catch (Exception e) {
+ LOG.error("Error occured when looking for resource file " + path, e);
+ }
+ return services;
+ }
+
+ public static <T> T loadClass(String name, String path, Class<?> clazz) {
+ final InputStream is = getResourceAsStream(getContextClassLoader(), name);
+ if (is != null) {
+ BufferedReader reader;
+ try {
+ try {
+ reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+ } catch (java.io.UnsupportedEncodingException e) {
+ reader = new BufferedReader(new InputStreamReader(is));
+ }
+ String serviceName = reader.readLine();
+ reader.close();
+ if (serviceName != null && !"".equals(serviceName)) {
+ return initService(getContextClassLoader(), serviceName, clazz);
+ } else {
+ LOG.warn("ServiceName is empty!");
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Error occurred when looking for resource file " + name, e);
+ }
+ }
+ return null;
+ }
+
+ protected static <T> T initService(ClassLoader classLoader, String serviceName, Class<?> clazz) {
+ Class<?> serviceClazz = null;
+ try {
+ if (classLoader != null) {
+ try {
+ // Warning: must typecast here & allow exception to be generated/caught & recast properly
+ serviceClazz = classLoader.loadClass(serviceName);
+ if (clazz.isAssignableFrom(serviceClazz)) {
+ LOG.info("Loaded class {} from classloader {}", serviceClazz.getName(),
+ objectId(classLoader));
+ } else {
+ // This indicates a problem with the ClassLoader tree. An incompatible ClassLoader was used to load the implementation.
+ LOG.error(
+ "Class {} loaded from classloader {} does not extend {} as loaded by this classloader.",
+ new Object[] {
+ serviceClazz.getName(),
+ objectId(serviceClazz.getClassLoader()), clazz.getName()});
+ }
+ return (T) serviceClazz.newInstance();
+ } catch (ClassNotFoundException ex) {
+ if (classLoader == thisClassLoader) {
+ // Nothing more to try, onwards.
+ LOG.warn("Unable to locate any class {} via classloader", serviceName,
+ objectId(classLoader));
+ throw ex;
+ }
+ // Ignore exception, continue
+ } catch (NoClassDefFoundError e) {
+ if (classLoader == thisClassLoader) {
+ // Nothing more to try, onwards.
+ LOG.warn(
+ "Class {} cannot be loaded via classloader {}.it depends on some other class that cannot be found.",
+ serviceClazz, objectId(classLoader));
+ throw e;
+ }
+ // Ignore exception, continue
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Unable to init service.", e);
+ }
+ return (T) serviceClazz;
+ }
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/util/ThreadUtils.java b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ThreadUtils.java
new file mode 100644
index 0000000..954e692
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/util/ThreadUtils.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.util;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class ThreadUtils {
+
+ /**
+ * A constructor to stop this class being constructed.
+ */
+ private ThreadUtils() {
+ // Unused
+
+ }
+
+ public static ExecutorService newThreadPoolExecutor(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ String processName, boolean isDaemon) {
+ return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newGenericThreadFactory(processName, isDaemon));
+ }
+
+ public static ExecutorService newFixedThreadPool(int nThreads, int workQueueCapacity, String processName, boolean isDaemon) {
+ return new ThreadPoolExecutor(
+ nThreads,
+ nThreads,
+ 0,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(workQueueCapacity),
+ newGenericThreadFactory(processName, isDaemon));
+ }
+
+ public static ExecutorService newSingleThreadExecutor(String processName, boolean isDaemon) {
+ return Executors.newSingleThreadExecutor(newGenericThreadFactory(processName, isDaemon));
+ }
+
+ public static ScheduledExecutorService newSingleThreadScheduledExecutor(String processName, boolean isDaemon) {
+ return Executors.newSingleThreadScheduledExecutor(newGenericThreadFactory(processName, isDaemon));
+ }
+
+ public static ScheduledExecutorService newFixedThreadScheduledPool(int nThreads, String processName,
+ boolean isDaemon) {
+ return Executors.newScheduledThreadPool(nThreads, newGenericThreadFactory(processName, isDaemon));
+ }
+
+ public static ThreadFactory newGenericThreadFactory(String processName) {
+ return newGenericThreadFactory(processName, false);
+ }
+
+ public static ThreadFactory newGenericThreadFactory(String processName, int threads) {
+ return newGenericThreadFactory(processName, threads, false);
+ }
+
+ public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) {
+ return new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet()));
+ thread.setDaemon(isDaemon);
+ return thread;
+ }
+ };
+ }
+
+ public static ThreadFactory newGenericThreadFactory(final String processName, final int threads,
+ final boolean isDaemon) {
+ return new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, String.format("%s_%d_%d", processName, threads, this.threadIndex.incrementAndGet()));
+ thread.setDaemon(isDaemon);
+ return thread;
+ }
+ };
+ }
+
+ /**
+ * Create a new thread
+ *
+ * @param name The name of the thread
+ * @param runnable The work for the thread to do
+ * @param daemon Should the thread block JVM stop?
+ * @return The unstarted thread
+ */
+ public static Thread newThread(String name, Runnable runnable, boolean daemon) {
+ Thread thread = new Thread(runnable, name);
+ thread.setDaemon(daemon);
+ thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread t, Throwable e) {
+ }
+ });
+ return thread;
+ }
+
+ /**
+ * Shutdown passed thread using isAlive and join.
+ *
+ * @param t Thread to stop
+ */
+ public static void shutdownGracefully(final Thread t) {
+ shutdownGracefully(t, 0);
+ }
+
+ /**
+ * Shutdown passed thread using isAlive and join.
+ *
+ * @param millis Pass 0 if we're to wait forever.
+ * @param t Thread to stop
+ */
+ public static void shutdownGracefully(final Thread t, final long millis) {
+ if (t == null)
+ return;
+ while (t.isAlive()) {
+ try {
+ t.interrupt();
+ t.join(millis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * An implementation of the graceful stop sequence recommended by
+ * {@link ExecutorService}.
+ *
+ * @param executor executor
+ * @param timeout timeout
+ * @param timeUnit timeUnit
+ */
+ public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
+ // Disable new tasks from being submitted.
+ executor.shutdown();
+ try {
+ // Wait a while for existing tasks to terminate.
+ if (!executor
+ .awaitTermination(timeout, timeUnit)) {
+ executor.shutdownNow();
+ // Wait a while for tasks to respond to being cancelled.
+ if (!executor.awaitTermination(timeout, timeUnit)) {
+
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted.
+ executor.shutdownNow();
+ // Preserve interrupt status.
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingClient b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingClient
new file mode 100644
index 0000000..b0ca8ec
--- /dev/null
+++ b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingClient
@@ -0,0 +1,2 @@
+rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient
+http2=org.apache.rocketmq.remoting.transport.http2.Http2ClientImpl
\ No newline at end of file
diff --git a/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer
new file mode 100644
index 0000000..5079c88
--- /dev/null
+++ b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer
@@ -0,0 +1,2 @@
+rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer
+http2=org.apache.rocketmq.remoting.transport.http2.Http2ServerImpl
\ No newline at end of file
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
index 0ecfaaa..b41dc37 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
@@ -27,13 +27,13 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
-import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
index 13bb172..50409f2 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java
@@ -25,10 +25,10 @@ import java.io.PrintWriter;
import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.TlsHelper;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
index c3da3e9..2b96abd 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
@@ -20,6 +20,7 @@ import java.util.concurrent.Semaphore;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Spy;
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
index 6b5633d..d0a7ed7 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.remoting.netty;
import java.lang.reflect.Field;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
index 2bd41ce..243d7f3 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingCommandTest.java
@@ -18,10 +18,11 @@ package org.apache.rocketmq.remoting.protocol;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.CodecHelper;
+import org.apache.rocketmq.remoting.serialize.SerializeType;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -31,7 +32,7 @@ public class RemotingCommandTest {
public void testMarkProtocolType_JSONProtocolType() {
int source = 261;
SerializeType type = SerializeType.JSON;
- byte[] result = RemotingCommand.markProtocolType(source, type);
+ byte[] result = CodecHelper.markProtocolType(source, type);
assertThat(result).isEqualTo(new byte[] {0, 0, 1, 5});
}
@@ -39,13 +40,13 @@ public class RemotingCommandTest {
public void testMarkProtocolType_ROCKETMQProtocolType() {
int source = 16777215;
SerializeType type = SerializeType.ROCKETMQ;
- byte[] result = RemotingCommand.markProtocolType(source, type);
+ byte[] result = CodecHelper.markProtocolType(source, type);
assertThat(result).isEqualTo(new byte[] {1, -1, -1, -1});
}
@Test
public void testCreateRequestCommand_RegisterBroker() {
- System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
+ System.setProperty(CodecHelper.REMOTING_VERSION_KEY, "2333");
int code = 103; //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER
CommandCustomHeader header = new SampleCommandCustomHeader();
@@ -57,7 +58,7 @@ public class RemotingCommandTest {
@Test
public void testCreateResponseCommand_SuccessWithHeader() {
- System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
+ System.setProperty(CodecHelper.REMOTING_VERSION_KEY, "2333");
int code = RemotingSysResponseCode.SUCCESS;
String remark = "Sample remark";
@@ -70,7 +71,7 @@ public class RemotingCommandTest {
@Test
public void testCreateResponseCommand_SuccessWithoutHeader() {
- System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
+ System.setProperty(CodecHelper.REMOTING_VERSION_KEY, "2333");
int code = RemotingSysResponseCode.SUCCESS;
String remark = "Sample remark";
@@ -83,7 +84,7 @@ public class RemotingCommandTest {
@Test
public void testCreateResponseCommand_FailToCreateCommand() {
- System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
+ System.setProperty(CodecHelper.REMOTING_VERSION_KEY, "2333");
int code = RemotingSysResponseCode.SUCCESS;
String remark = "Sample remark";
@@ -93,7 +94,7 @@ public class RemotingCommandTest {
@Test
public void testCreateResponseCommand_SystemError() {
- System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
+ System.setProperty(CodecHelper.REMOTING_VERSION_KEY, "2333");
RemotingCommand cmd = RemotingCommand.createResponseCommand(SampleCommandCustomHeader.class);
assertThat(cmd.getCode()).isEqualTo(RemotingSysResponseCode.SYSTEM_ERROR);
@@ -102,86 +103,8 @@ public class RemotingCommandTest {
assertThat(cmd.getFlag() & 0x01).isEqualTo(1); //flag bit 0: 1 presents response
}
- @Test
- public void testEncodeAndDecode_EmptyBody() {
- System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
-
- int code = 103; //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER
- CommandCustomHeader header = new SampleCommandCustomHeader();
- RemotingCommand cmd = RemotingCommand.createRequestCommand(code, header);
-
- ByteBuffer buffer = cmd.encode();
-
- //Simulate buffer being read in NettyDecoder
- buffer.getInt();
- byte[] bytes = new byte[buffer.limit() - 4];
- buffer.get(bytes, 0, buffer.limit() - 4);
- buffer = ByteBuffer.wrap(bytes);
-
- RemotingCommand decodedCommand = RemotingCommand.decode(buffer);
-
- assertThat(decodedCommand.getSerializeTypeCurrentRPC()).isEqualTo(SerializeType.JSON);
- assertThat(decodedCommand.getBody()).isNull();
- }
-
- @Test
- public void testEncodeAndDecode_FilledBody() {
- System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
-
- int code = 103; //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER
- CommandCustomHeader header = new SampleCommandCustomHeader();
- RemotingCommand cmd = RemotingCommand.createRequestCommand(code, header);
- cmd.setBody(new byte[] {0, 1, 2, 3, 4});
- ByteBuffer buffer = cmd.encode();
- //Simulate buffer being read in NettyDecoder
- buffer.getInt();
- byte[] bytes = new byte[buffer.limit() - 4];
- buffer.get(bytes, 0, buffer.limit() - 4);
- buffer = ByteBuffer.wrap(bytes);
-
- RemotingCommand decodedCommand = RemotingCommand.decode(buffer);
-
- assertThat(decodedCommand.getSerializeTypeCurrentRPC()).isEqualTo(SerializeType.JSON);
- assertThat(decodedCommand.getBody()).isEqualTo(new byte[] {0, 1, 2, 3, 4});
- }
-
- @Test
- public void testEncodeAndDecode_FilledBodyWithExtFields() throws RemotingCommandException {
- System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
-
- int code = 103; //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER
- CommandCustomHeader header = new ExtFieldsHeader();
- RemotingCommand cmd = RemotingCommand.createRequestCommand(code, header);
-
- cmd.addExtField("key", "value");
-
- ByteBuffer buffer = cmd.encode();
-
- //Simulate buffer being read in NettyDecoder
- buffer.getInt();
- byte[] bytes = new byte[buffer.limit() - 4];
- buffer.get(bytes, 0, buffer.limit() - 4);
- buffer = ByteBuffer.wrap(bytes);
-
- RemotingCommand decodedCommand = RemotingCommand.decode(buffer);
-
- assertThat(decodedCommand.getExtFields().get("stringValue")).isEqualTo("bilibili");
- assertThat(decodedCommand.getExtFields().get("intValue")).isEqualTo("2333");
- assertThat(decodedCommand.getExtFields().get("longValue")).isEqualTo("23333333");
- assertThat(decodedCommand.getExtFields().get("booleanValue")).isEqualTo("true");
- assertThat(decodedCommand.getExtFields().get("doubleValue")).isEqualTo("0.618");
-
- assertThat(decodedCommand.getExtFields().get("key")).isEqualTo("value");
-
- CommandCustomHeader decodedHeader = decodedCommand.decodeCommandCustomHeader(ExtFieldsHeader.class);
- assertThat(((ExtFieldsHeader) decodedHeader).getStringValue()).isEqualTo("bilibili");
- assertThat(((ExtFieldsHeader) decodedHeader).getIntValue()).isEqualTo(2333);
- assertThat(((ExtFieldsHeader) decodedHeader).getLongValue()).isEqualTo(23333333l);
- assertThat(((ExtFieldsHeader) decodedHeader).isBooleanValue()).isEqualTo(true);
- assertThat(((ExtFieldsHeader) decodedHeader).getDoubleValue()).isBetween(0.617, 0.619);
- }
@Test
public void testNotNullField() throws Exception {
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java
index 3e8b7a9..7ad6db7 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RemotingSerializableTest.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.remoting.protocol;
import java.util.Arrays;
import java.util.List;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
index f1db54f..66bdf73 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializableTest.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.remoting.protocol;
import java.util.HashMap;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.RocketMQSerializable;
+import org.apache.rocketmq.remoting.serialize.SerializeType;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
index 7021992..d0c4fcb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.store.schedule;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class DelayOffsetSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
index a9b9ab0..f517e2a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
@@ -28,7 +28,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java
index 4989a9b..eae3cfb 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/monitor/MonitorServiceTest.java
@@ -52,7 +52,7 @@ import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.junit.AfterClass;