You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2019/06/28 11:19:56 UTC
[rocketmq-remoting] 21/39: Pull client channel management logic to
ClientChannelManager
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit c2e49dbc201113113de02d9412ef948b9b3b3207
Author: yukon <yu...@apache.org>
AuthorDate: Tue May 28 10:45:37 2019 +0800
Pull client channel management logic to ClientChannelManager
---
.../remoting/impl/netty/ClientChannelManager.java | 241 +++++++++++++++++++++
.../remoting/impl/netty/NettyRemotingAbstract.java | 32 ++-
.../remoting/impl/netty/NettyRemotingClient.java | 233 ++------------------
.../remoting/impl/netty/NettyRemotingServer.java | 6 -
.../rocketmq/remoting/internal/RemotingUtil.java | 27 +++
5 files changed, 299 insertions(+), 240 deletions(-)
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
new file mode 100644
index 0000000..2f59d24
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.remoting.internal.RemotingUtil.extractRemoteAddress;
+
+public class ClientChannelManager {
+ protected static final Logger LOG = LoggerFactory.getLogger(ClientChannelManager.class);
+
+ private static final long LOCK_TIMEOUT_MILLIS = 3000;
+ private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>();
+ private final Lock lockChannelTables = new ReentrantLock();
+ private final Bootstrap clientBootstrap;
+ private final RemotingConfig clientConfig;
+
+ ClientChannelManager(final Bootstrap bootstrap,
+ final RemotingConfig config) {
+ clientBootstrap = bootstrap;
+ clientConfig = config;
+ }
+
+ void clear() {
+ for (ChannelWrapper cw : this.channelTables.values()) {
+ this.closeChannel(null, cw.getChannel());
+ }
+
+ this.channelTables.clear();
+ }
+
+ Channel createIfAbsent(final String addr) {
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isActive()) {
+ return cw.getChannel();
+ }
+ return this.createChannel(addr);
+ }
+
+ private Channel createChannel(final String addr) {
+ ChannelWrapper cw = null;
+ try {
+ if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean createNewConnection;
+ cw = this.channelTables.get(addr);
+ if (cw != null) {
+ if (cw.isActive()) {
+ return cw.getChannel();
+ } else if (!cw.getChannelFuture().isDone()) {
+ createNewConnection = false;
+ } else {
+ this.channelTables.remove(addr);
+ createNewConnection = true;
+ }
+ } else {
+ createNewConnection = true;
+ }
+
+ if (createNewConnection) {
+ String[] s = addr.split(":");
+ SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1]));
+ ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress);
+ LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
+ cw = new ChannelWrapper(channelFuture);
+ this.channelTables.put(addr, cw);
+ }
+ } catch (Exception e) {
+ LOG.error("createChannel: create channel exception", e);
+ } finally {
+ this.lockChannelTables.unlock();
+ }
+ } else {
+ LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ if (cw != null) {
+ ChannelFuture channelFuture = cw.getChannelFuture();
+ if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) {
+ if (cw.isActive()) {
+ LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
+ return cw.getChannel();
+ } else {
+ LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause());
+ this.closeChannel(addr, cw.getChannel());
+ }
+ } else {
+ LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(),
+ channelFuture.toString());
+ this.closeChannel(addr, cw.getChannel());
+ }
+ }
+ return null;
+ }
+
+ void closeChannel(final String addr, final Channel channel) {
+ if (null == channel)
+ return;
+
+ final String addrRemote = null == addr ? extractRemoteAddress(channel) : addr;
+ try {
+ if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean removeItemFromTable = true;
+ ChannelWrapper prevCW = this.channelTables.get(addrRemote);
+ //Workaround for null
+ if (null == prevCW) {
+ return;
+ }
+
+ LOG.info("Begin to close the remote address {} channel {}", addrRemote, prevCW);
+
+ if (prevCW.getChannel() != channel) {
+ LOG.info("Channel {} has been closed,this is a new channel.", prevCW.getChannel(), channel);
+ removeItemFromTable = false;
+ }
+
+ if (removeItemFromTable) {
+ this.channelTables.remove(addrRemote);
+ LOG.info("Channel {} has been removed !", addrRemote);
+ }
+
+ channel.close().addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ LOG.warn("Close channel {} {}", channel, future.isSuccess());
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Close channel error !", e);
+ } finally {
+ this.lockChannelTables.unlock();
+ }
+ } else {
+ LOG.warn("Can not lock channel table in {} ms", LOCK_TIMEOUT_MILLIS);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Close channel error !", e);
+ }
+ }
+
+ void closeChannel(final Channel channel) {
+ if (null == channel)
+ return;
+
+ try {
+ if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean removeItemFromTable = true;
+ ChannelWrapper prevCW = null;
+ String addrRemote = null;
+
+ for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) {
+ ChannelWrapper prev = entry.getValue();
+ if (prev.getChannel() != null) {
+ if (prev.getChannel() == channel) {
+ prevCW = prev;
+ addrRemote = entry.getKey();
+ break;
+ }
+ }
+ }
+
+ if (null == prevCW) {
+ LOG.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+ removeItemFromTable = false;
+ }
+
+ if (removeItemFromTable) {
+ this.channelTables.remove(addrRemote);
+ LOG.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+ //RemotingHelper.closeChannel(channel);
+ }
+ } catch (Exception e) {
+ LOG.error("closeChannel: close the channel exception", e);
+ } finally {
+ this.lockChannelTables.unlock();
+ }
+ } else {
+ LOG.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+ } catch (InterruptedException e) {
+ LOG.error("closeChannel exception", e);
+ }
+ }
+
+ private class ChannelWrapper {
+ private final ChannelFuture channelFuture;
+
+ ChannelWrapper(ChannelFuture channelFuture) {
+ this.channelFuture = channelFuture;
+ }
+
+ boolean isActive() {
+ return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
+ }
+
+ boolean isWriteable() {
+ return this.channelFuture.channel().isWritable();
+ }
+
+ private Channel getChannel() {
+ return this.channelFuture.channel();
+ }
+
+ ChannelFuture getChannelFuture() {
+ return channelFuture;
+ }
+ }
+}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 38059a8..920a922 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -22,7 +22,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
@@ -58,6 +57,7 @@ import org.apache.rocketmq.remoting.external.ThreadUtils;
import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
import org.apache.rocketmq.remoting.impl.command.RemotingSysResponseCode;
+import org.apache.rocketmq.remoting.internal.RemotingUtil;
import org.apache.rocketmq.remoting.internal.UIDGenerator;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
@@ -88,7 +88,9 @@ public abstract class NettyRemotingAbstract implements RemotingService {
}
protected void putNettyEvent(final NettyChannelEvent event) {
- this.channelEventExecutor.putNettyEvent(event);
+ if (channelEventListenerGroup != null && channelEventListenerGroup.size() != 0) {
+ this.channelEventExecutor.putNettyEvent(event);
+ }
}
protected void startUpHouseKeepingService() {
@@ -172,7 +174,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
processorExecutorPair.getRight().submit(run);
} catch (RejectedExecutionException e) {
LOG.warn(String.format("Request %s from %s Rejected by server executor %s !", cmd,
- extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString()));
+ RemotingUtil.extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString()));
if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
RemotingCommand response = remotingCommandFactory.createResponse(cmd);
@@ -190,7 +192,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
responseFuture.release();
this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST,
- extractRemoteAddress(ctx.channel()), responseFuture.getRequestCommand(), response));
+ RemotingUtil.extractRemoteAddress(ctx.channel()), responseFuture.getRequestCommand(), response));
if (responseFuture.getAsyncHandler() != null) {
executeAsyncHandler(responseFuture);
@@ -199,7 +201,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
responseFuture.release();
}
} else {
- LOG.warn("request {} from {} has not matched response !", response, extractRemoteAddress(ctx.channel()));
+ LOG.warn("request {} from {} has not matched response !", response, RemotingUtil.extractRemoteAddress(ctx.channel()));
}
}
@@ -211,12 +213,12 @@ public abstract class NettyRemotingAbstract implements RemotingService {
public void run() {
try {
interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE,
- extractRemoteAddress(ctx.channel()), cmd));
+ RemotingUtil.extractRemoteAddress(ctx.channel()), cmd));
RemotingCommand response = processorExecutorPair.getLeft().processRequest(channel, cmd);
interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE,
- extractRemoteAddress(ctx.channel()), cmd, response));
+ RemotingUtil.extractRemoteAddress(ctx.channel()), cmd, response));
handleResponse(response, cmd, ctx);
} catch (Throwable e) {
@@ -228,10 +230,6 @@ public abstract class NettyRemotingAbstract implements RemotingService {
};
}
- protected String extractRemoteAddress(Channel channel) {
- return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
- }
-
private void writeAndFlush(final Channel channel, final Object msg) {
channel.writeAndFlush(msg);
}
@@ -321,14 +319,14 @@ public abstract class NettyRemotingAbstract implements RemotingService {
long timeoutMillis) {
request.trafficType(TrafficType.REQUEST_SYNC);
- final String remoteAddr = extractRemoteAddress(channel);
+ final String remoteAddr = RemotingUtil.extractRemoteAddress(channel);
this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
RemotingCommand responseCommand = this.invoke0(remoteAddr, channel, request, timeoutMillis);
this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST,
- extractRemoteAddress(channel), request, responseCommand));
+ RemotingUtil.extractRemoteAddress(channel), request, responseCommand));
return responseCommand;
}
@@ -367,9 +365,9 @@ public abstract class NettyRemotingAbstract implements RemotingService {
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
- throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause());
+ throw new RemoteTimeoutException(RemotingUtil.extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause());
} else {
- throw new RemoteAccessException(extractRemoteAddress(channel), responseFuture.getCause());
+ throw new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), responseFuture.getCause());
}
}
@@ -387,7 +385,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
final AsyncHandler invokeCallback, long timeoutMillis) {
request.trafficType(TrafficType.REQUEST_ASYNC);
- final String remoteAddr = extractRemoteAddress(channel);
+ final String remoteAddr = RemotingUtil.extractRemoteAddress(channel);
this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
@@ -436,7 +434,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request) {
request.trafficType(TrafficType.REQUEST_ONEWAY);
- this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request));
+ this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, RemotingUtil.extractRemoteAddress(channel), request));
this.invokeOneway0(channel, request);
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index c263f22..ce30aa2 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -20,8 +20,6 @@ package org.apache.rocketmq.remoting.impl.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@@ -38,13 +36,8 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.remoting.api.AsyncHandler;
import org.apache.rocketmq.remoting.api.RemotingClient;
import org.apache.rocketmq.remoting.api.command.RemotingCommand;
@@ -59,16 +52,14 @@ import org.apache.rocketmq.remoting.impl.netty.handler.ExceptionHandler;
import org.apache.rocketmq.remoting.internal.JvmUtils;
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
- private static final long LOCK_TIMEOUT_MILLIS = 3000;
private final Bootstrap clientBootstrap = new Bootstrap();
private final EventLoopGroup ioGroup;
private final Class<? extends SocketChannel> socketChannelClass;
private final RemotingConfig clientConfig;
- private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
- private final Lock lockChannelTables = new ReentrantLock();
private EventExecutorGroup workerGroup;
+ private ClientChannelManager clientChannelManager;
public NettyRemotingClient(final RemotingConfig clientConfig) {
super(clientConfig);
@@ -84,6 +75,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
socketChannelClass = NioSocketChannel.class;
}
+ this.clientChannelManager = new ClientChannelManager(clientBootstrap, clientConfig);
+
this.workerGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
}
@@ -117,11 +110,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
try {
ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
- for (ChannelWrapper cw : this.channelTables.values()) {
- this.closeChannel(null, cw.getChannel());
- }
-
- this.channelTables.clear();
+ clientChannelManager.clear();
this.ioGroup.shutdownGracefully();
@@ -157,102 +146,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
}
- private void closeChannel(final String addr, final Channel channel) {
- if (null == channel)
- return;
-
- final String addrRemote = null == addr ? extractRemoteAddress(channel) : addr;
- try {
- if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- boolean removeItemFromTable = true;
- ChannelWrapper prevCW = this.channelTables.get(addrRemote);
- //Workaround for null
- if (null == prevCW) {
- return;
- }
-
- LOG.info("Begin to close the remote address {} channel {}", addrRemote, prevCW);
-
- if (prevCW.getChannel() != channel) {
- LOG.info("Channel {} has been closed,this is a new channel.", prevCW.getChannel(), channel);
- removeItemFromTable = false;
- }
-
- if (removeItemFromTable) {
- this.channelTables.remove(addrRemote);
- LOG.info("Channel {} has been removed !", addrRemote);
- }
-
- channel.close().addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- LOG.warn("Close channel {} {}", channel, future.isSuccess());
- }
- });
- } catch (Exception e) {
- LOG.error("Close channel error !", e);
- } finally {
- this.lockChannelTables.unlock();
- }
- } else {
- LOG.warn("Can not lock channel table in {} ms", LOCK_TIMEOUT_MILLIS);
- }
- } catch (InterruptedException e) {
- LOG.error("Close channel error !", e);
- }
- }
-
- private void closeChannel(final Channel channel) {
- if (null == channel)
- return;
-
- try {
- if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- boolean removeItemFromTable = true;
- ChannelWrapper prevCW = null;
- String addrRemote = null;
-
- for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) {
- ChannelWrapper prev = entry.getValue();
- if (prev.getChannel() != null) {
- if (prev.getChannel() == channel) {
- prevCW = prev;
- addrRemote = entry.getKey();
- break;
- }
- }
- }
-
- if (null == prevCW) {
- LOG.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
- removeItemFromTable = false;
- }
-
- if (removeItemFromTable) {
- this.channelTables.remove(addrRemote);
- LOG.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
- //RemotingHelper.closeChannel(channel);
- }
- } catch (Exception e) {
- LOG.error("closeChannel: close the channel exception", e);
- } finally {
- this.lockChannelTables.unlock();
- }
- } else {
- LOG.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
- }
- } catch (InterruptedException e) {
- LOG.error("closeChannel exception", e);
- }
- }
-
@Override
public RemotingCommand invoke(final String address, final RemotingCommand request, final long timeoutMillis) {
request.trafficType(TrafficType.REQUEST_SYNC);
- Channel channel = this.createIfAbsent(address);
+ Channel channel = this.clientChannelManager.createIfAbsent(address);
if (channel != null && channel.isActive()) {
try {
return this.invokeWithInterceptor(channel, request, timeoutMillis);
@@ -260,18 +158,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
} catch (RemoteTimeoutException e) {
if (this.clientConfig.isClientCloseSocketIfTimeout()) {
LOG.warn("invoke: timeout, so close the socket {} ms, {}", timeoutMillis, address);
- this.closeChannel(address, channel);
+ this.clientChannelManager.closeChannel(address, channel);
}
LOG.warn("invoke: wait response timeout<{}ms> exception, so close the channel[{}]", timeoutMillis, address);
throw e;
} finally {
if (this.clientConfig.isClientShortConnectionEnable()) {
- this.closeChannel(address, channel);
+ this.clientChannelManager.closeChannel(address, channel);
}
}
} else {
- this.closeChannel(address, channel);
+ this.clientChannelManager.closeChannel(address, channel);
throw new RemoteConnectFailureException(address);
}
@@ -281,113 +179,21 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
public void invokeAsync(final String address, final RemotingCommand request, final AsyncHandler asyncHandler,
final long timeoutMillis) {
- final Channel channel = this.createIfAbsent(address);
+ final Channel channel = this.clientChannelManager.createIfAbsent(address);
if (channel != null && channel.isActive()) {
this.invokeAsyncWithInterceptor(channel, request, asyncHandler, timeoutMillis);
} else {
- this.closeChannel(address, channel);
+ this.clientChannelManager.closeChannel(address, channel);
}
}
@Override
public void invokeOneWay(final String address, final RemotingCommand request) {
- final Channel channel = this.createIfAbsent(address);
+ final Channel channel = this.clientChannelManager.createIfAbsent(address);
if (channel != null && channel.isActive()) {
this.invokeOnewayWithInterceptor(channel, request);
} else {
- this.closeChannel(address, channel);
- }
- }
-
- private Channel createIfAbsent(final String addr) {
- ChannelWrapper cw = this.channelTables.get(addr);
- if (cw != null && cw.isActive()) {
- return cw.getChannel();
- }
- return this.createChannel(addr);
- }
-
- //FIXME need test to verify
- private Channel createChannel(final String addr) {
- ChannelWrapper cw = null;
- try {
- if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- boolean createNewConnection;
- cw = this.channelTables.get(addr);
- if (cw != null) {
- if (cw.isActive()) {
- return cw.getChannel();
- } else if (!cw.getChannelFuture().isDone()) {
- createNewConnection = false;
- } else {
- this.channelTables.remove(addr);
- createNewConnection = true;
- }
- } else {
- createNewConnection = true;
- }
-
- if (createNewConnection) {
- String[] s = addr.split(":");
- SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1]));
- ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress);
- LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
- cw = new ChannelWrapper(channelFuture);
- this.channelTables.put(addr, cw);
- }
- } catch (Exception e) {
- LOG.error("createChannel: create channel exception", e);
- } finally {
- this.lockChannelTables.unlock();
- }
- } else {
- LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- if (cw != null) {
- ChannelFuture channelFuture = cw.getChannelFuture();
- if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) {
- if (cw.isActive()) {
- LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
- return cw.getChannel();
- } else {
- LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause());
- this.closeChannel(addr, cw.getChannel());
- }
- } else {
- LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(),
- channelFuture.toString());
- this.closeChannel(addr, cw.getChannel());
- }
- }
- return null;
- }
-
- private class ChannelWrapper {
- private final ChannelFuture channelFuture;
-
- ChannelWrapper(ChannelFuture channelFuture) {
- this.channelFuture = channelFuture;
- }
-
- boolean isActive() {
- return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
- }
-
- boolean isWriteable() {
- return this.channelFuture.channel().isWritable();
- }
-
- private Channel getChannel() {
- return this.channelFuture.channel();
- }
-
- ChannelFuture getChannelFuture() {
- return channelFuture;
+ this.clientChannelManager.closeChannel(address, channel);
}
}
@@ -407,7 +213,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
LOG.info("Remote address {} disconnect channel {}.", ctx.channel().remoteAddress(), ctx.channel());
- closeChannel(ctx.channel());
+ NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
super.disconnect(ctx, promise);
@@ -418,7 +224,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
LOG.info("Remote address {} close channel {}.", ctx.channel().remoteAddress(), ctx.channel());
- closeChannel(ctx.channel());
+ NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
super.close(ctx, promise);
@@ -431,7 +237,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
LOG.info("Close channel {} because of idle event {} ", ctx.channel(), event);
- closeChannel(ctx.channel());
+ NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
putNettyEvent(new NettyChannelEvent(NettyChannelEventType.IDLE, ctx.channel()));
}
}
@@ -440,16 +246,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
@Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
- LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(),
- ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable());
- }
-
- @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
-
- closeChannel(ctx.channel());
+ NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel()));
}
}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index 55ce2d2..f1e9360 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -233,12 +233,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
}
@Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
- LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(),
- ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable());
- }
-
- @Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause));
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java
new file mode 100644
index 0000000..89e4bff
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.internal;
+
+import io.netty.channel.Channel;
+import java.net.InetSocketAddress;
+
+public class RemotingUtil {
+ public static String extractRemoteAddress(Channel channel) {
+ return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
+ }
+}