You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2019/06/28 11:19:56 UTC

[rocketmq-remoting] 21/39: Pull client channel management logic to ClientChannelManager

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git

commit c2e49dbc201113113de02d9412ef948b9b3b3207
Author: yukon <yu...@apache.org>
AuthorDate: Tue May 28 10:45:37 2019 +0800

    Pull client channel management logic to ClientChannelManager
---
 .../remoting/impl/netty/ClientChannelManager.java  | 241 +++++++++++++++++++++
 .../remoting/impl/netty/NettyRemotingAbstract.java |  32 ++-
 .../remoting/impl/netty/NettyRemotingClient.java   | 233 ++------------------
 .../remoting/impl/netty/NettyRemotingServer.java   |   6 -
 .../rocketmq/remoting/internal/RemotingUtil.java   |  27 +++
 5 files changed, 299 insertions(+), 240 deletions(-)

diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
new file mode 100644
index 0000000..2f59d24
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.remoting.internal.RemotingUtil.extractRemoteAddress;
+
+public class ClientChannelManager {
+    protected static final Logger LOG = LoggerFactory.getLogger(ClientChannelManager.class);
+
+    private static final long LOCK_TIMEOUT_MILLIS = 3000;
+    private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>();
+    private final Lock lockChannelTables = new ReentrantLock();
+    private final Bootstrap clientBootstrap;
+    private final RemotingConfig clientConfig;
+
+    ClientChannelManager(final Bootstrap bootstrap,
+        final RemotingConfig config) {
+        clientBootstrap = bootstrap;
+        clientConfig = config;
+    }
+
+    void clear() {
+        for (ChannelWrapper cw : this.channelTables.values()) {
+            this.closeChannel(null, cw.getChannel());
+        }
+
+        this.channelTables.clear();
+    }
+
+    Channel createIfAbsent(final String addr) {
+        ChannelWrapper cw = this.channelTables.get(addr);
+        if (cw != null && cw.isActive()) {
+            return cw.getChannel();
+        }
+        return this.createChannel(addr);
+    }
+
+    private Channel createChannel(final String addr) {
+        ChannelWrapper cw = null;
+        try {
+            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    boolean createNewConnection;
+                    cw = this.channelTables.get(addr);
+                    if (cw != null) {
+                        if (cw.isActive()) {
+                            return cw.getChannel();
+                        } else if (!cw.getChannelFuture().isDone()) {
+                            createNewConnection = false;
+                        } else {
+                            this.channelTables.remove(addr);
+                            createNewConnection = true;
+                        }
+                    } else {
+                        createNewConnection = true;
+                    }
+
+                    if (createNewConnection) {
+                        String[] s = addr.split(":");
+                        SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1]));
+                        ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress);
+                        LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
+                        cw = new ChannelWrapper(channelFuture);
+                        this.channelTables.put(addr, cw);
+                    }
+                } catch (Exception e) {
+                    LOG.error("createChannel: create channel exception", e);
+                } finally {
+                    this.lockChannelTables.unlock();
+                }
+            } else {
+                LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        if (cw != null) {
+            ChannelFuture channelFuture = cw.getChannelFuture();
+            if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) {
+                if (cw.isActive()) {
+                    LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
+                    return cw.getChannel();
+                } else {
+                    LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause());
+                    this.closeChannel(addr, cw.getChannel());
+                }
+            } else {
+                LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(),
+                    channelFuture.toString());
+                this.closeChannel(addr, cw.getChannel());
+            }
+        }
+        return null;
+    }
+
+    void closeChannel(final String addr, final Channel channel) {
+        if (null == channel)
+            return;
+
+        final String addrRemote = null == addr ? extractRemoteAddress(channel) : addr;
+        try {
+            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    boolean removeItemFromTable = true;
+                    ChannelWrapper prevCW = this.channelTables.get(addrRemote);
+                    //Workaround for null
+                    if (null == prevCW) {
+                        return;
+                    }
+
+                    LOG.info("Begin to close the remote address {} channel {}", addrRemote, prevCW);
+
+                    if (prevCW.getChannel() != channel) {
+                        LOG.info("Channel {} has been closed,this is a new channel.", prevCW.getChannel(), channel);
+                        removeItemFromTable = false;
+                    }
+
+                    if (removeItemFromTable) {
+                        this.channelTables.remove(addrRemote);
+                        LOG.info("Channel {} has been removed !", addrRemote);
+                    }
+
+                    channel.close().addListener(new ChannelFutureListener() {
+                        @Override
+                        public void operationComplete(ChannelFuture future) throws Exception {
+                            LOG.warn("Close channel {} {}", channel, future.isSuccess());
+                        }
+                    });
+                } catch (Exception e) {
+                    LOG.error("Close channel error !", e);
+                } finally {
+                    this.lockChannelTables.unlock();
+                }
+            } else {
+                LOG.warn("Can not lock channel table in {} ms", LOCK_TIMEOUT_MILLIS);
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Close channel error !", e);
+        }
+    }
+
+    void closeChannel(final Channel channel) {
+        if (null == channel)
+            return;
+
+        try {
+            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    boolean removeItemFromTable = true;
+                    ChannelWrapper prevCW = null;
+                    String addrRemote = null;
+
+                    for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) {
+                        ChannelWrapper prev = entry.getValue();
+                        if (prev.getChannel() != null) {
+                            if (prev.getChannel() == channel) {
+                                prevCW = prev;
+                                addrRemote = entry.getKey();
+                                break;
+                            }
+                        }
+                    }
+
+                    if (null == prevCW) {
+                        LOG.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+                        removeItemFromTable = false;
+                    }
+
+                    if (removeItemFromTable) {
+                        this.channelTables.remove(addrRemote);
+                        LOG.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+                        //RemotingHelper.closeChannel(channel);
+                    }
+                } catch (Exception e) {
+                    LOG.error("closeChannel: close the channel exception", e);
+                } finally {
+                    this.lockChannelTables.unlock();
+                }
+            } else {
+                LOG.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+            }
+        } catch (InterruptedException e) {
+            LOG.error("closeChannel exception", e);
+        }
+    }
+
+    private class ChannelWrapper {
+        private final ChannelFuture channelFuture;
+
+        ChannelWrapper(ChannelFuture channelFuture) {
+            this.channelFuture = channelFuture;
+        }
+
+        boolean isActive() {
+            return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
+        }
+
+        boolean isWriteable() {
+            return this.channelFuture.channel().isWritable();
+        }
+
+        private Channel getChannel() {
+            return this.channelFuture.channel();
+        }
+
+        ChannelFuture getChannelFuture() {
+            return channelFuture;
+        }
+    }
+}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index 38059a8..920a922 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -22,7 +22,6 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.List;
@@ -58,6 +57,7 @@ import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
 import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
 import org.apache.rocketmq.remoting.impl.command.RemotingSysResponseCode;
+import org.apache.rocketmq.remoting.internal.RemotingUtil;
 import org.apache.rocketmq.remoting.internal.UIDGenerator;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
@@ -88,7 +88,9 @@ public abstract class NettyRemotingAbstract implements RemotingService {
     }
 
     protected void putNettyEvent(final NettyChannelEvent event) {
-        this.channelEventExecutor.putNettyEvent(event);
+        if (channelEventListenerGroup != null && channelEventListenerGroup.size() != 0) {
+            this.channelEventExecutor.putNettyEvent(event);
+        }
     }
 
     protected void startUpHouseKeepingService() {
@@ -172,7 +174,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
             processorExecutorPair.getRight().submit(run);
         } catch (RejectedExecutionException e) {
             LOG.warn(String.format("Request %s from %s Rejected by server executor %s !", cmd,
-                extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString()));
+                RemotingUtil.extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString()));
 
             if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
                 RemotingCommand response = remotingCommandFactory.createResponse(cmd);
@@ -190,7 +192,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
             responseFuture.release();
 
             this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST,
-                extractRemoteAddress(ctx.channel()), responseFuture.getRequestCommand(), response));
+                RemotingUtil.extractRemoteAddress(ctx.channel()), responseFuture.getRequestCommand(), response));
 
             if (responseFuture.getAsyncHandler() != null) {
                 executeAsyncHandler(responseFuture);
@@ -199,7 +201,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
                 responseFuture.release();
             }
         } else {
-            LOG.warn("request {} from {} has not matched response !", response, extractRemoteAddress(ctx.channel()));
+            LOG.warn("request {} from {} has not matched response !", response, RemotingUtil.extractRemoteAddress(ctx.channel()));
         }
     }
 
@@ -211,12 +213,12 @@ public abstract class NettyRemotingAbstract implements RemotingService {
             public void run() {
                 try {
                     interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE,
-                        extractRemoteAddress(ctx.channel()), cmd));
+                        RemotingUtil.extractRemoteAddress(ctx.channel()), cmd));
 
                     RemotingCommand response = processorExecutorPair.getLeft().processRequest(channel, cmd);
 
                     interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE,
-                        extractRemoteAddress(ctx.channel()), cmd, response));
+                        RemotingUtil.extractRemoteAddress(ctx.channel()), cmd, response));
 
                     handleResponse(response, cmd, ctx);
                 } catch (Throwable e) {
@@ -228,10 +230,6 @@ public abstract class NettyRemotingAbstract implements RemotingService {
         };
     }
 
-    protected String extractRemoteAddress(Channel channel) {
-        return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
-    }
-
     private void writeAndFlush(final Channel channel, final Object msg) {
         channel.writeAndFlush(msg);
     }
@@ -321,14 +319,14 @@ public abstract class NettyRemotingAbstract implements RemotingService {
         long timeoutMillis) {
         request.trafficType(TrafficType.REQUEST_SYNC);
 
-        final String remoteAddr = extractRemoteAddress(channel);
+        final String remoteAddr = RemotingUtil.extractRemoteAddress(channel);
 
         this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
 
         RemotingCommand responseCommand = this.invoke0(remoteAddr, channel, request, timeoutMillis);
 
         this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST,
-            extractRemoteAddress(channel), request, responseCommand));
+            RemotingUtil.extractRemoteAddress(channel), request, responseCommand));
 
         return responseCommand;
     }
@@ -367,9 +365,9 @@ public abstract class NettyRemotingAbstract implements RemotingService {
 
             if (null == responseCommand) {
                 if (responseFuture.isSendRequestOK()) {
-                    throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause());
+                    throw new RemoteTimeoutException(RemotingUtil.extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause());
                 } else {
-                    throw new RemoteAccessException(extractRemoteAddress(channel), responseFuture.getCause());
+                    throw new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), responseFuture.getCause());
                 }
             }
 
@@ -387,7 +385,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
         final AsyncHandler invokeCallback, long timeoutMillis) {
         request.trafficType(TrafficType.REQUEST_ASYNC);
 
-        final String remoteAddr = extractRemoteAddress(channel);
+        final String remoteAddr = RemotingUtil.extractRemoteAddress(channel);
 
         this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
 
@@ -436,7 +434,7 @@ public abstract class NettyRemotingAbstract implements RemotingService {
     public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request) {
         request.trafficType(TrafficType.REQUEST_ONEWAY);
 
-        this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request));
+        this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, RemotingUtil.extractRemoteAddress(channel), request));
         this.invokeOneway0(channel, request);
     }
 
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
index c263f22..ce30aa2 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -20,8 +20,6 @@ package org.apache.rocketmq.remoting.impl.netty;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
@@ -38,13 +36,8 @@ import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import io.netty.util.concurrent.EventExecutorGroup;
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
 import org.apache.rocketmq.remoting.api.RemotingClient;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
@@ -59,16 +52,14 @@ import org.apache.rocketmq.remoting.impl.netty.handler.ExceptionHandler;
 import org.apache.rocketmq.remoting.internal.JvmUtils;
 
 public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
-    private static final long LOCK_TIMEOUT_MILLIS = 3000;
     private final Bootstrap clientBootstrap = new Bootstrap();
     private final EventLoopGroup ioGroup;
     private final Class<? extends SocketChannel> socketChannelClass;
 
     private final RemotingConfig clientConfig;
 
-    private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
-    private final Lock lockChannelTables = new ReentrantLock();
     private EventExecutorGroup workerGroup;
+    private ClientChannelManager clientChannelManager;
 
     public NettyRemotingClient(final RemotingConfig clientConfig) {
         super(clientConfig);
@@ -84,6 +75,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             socketChannelClass = NioSocketChannel.class;
         }
 
+        this.clientChannelManager = new ClientChannelManager(clientBootstrap, clientConfig);
+
         this.workerGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
             ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
     }
@@ -117,11 +110,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         try {
             ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
 
-            for (ChannelWrapper cw : this.channelTables.values()) {
-                this.closeChannel(null, cw.getChannel());
-            }
-
-            this.channelTables.clear();
+            clientChannelManager.clear();
 
             this.ioGroup.shutdownGracefully();
 
@@ -157,102 +146,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         }
     }
 
-    private void closeChannel(final String addr, final Channel channel) {
-        if (null == channel)
-            return;
-
-        final String addrRemote = null == addr ? extractRemoteAddress(channel) : addr;
-        try {
-            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    boolean removeItemFromTable = true;
-                    ChannelWrapper prevCW = this.channelTables.get(addrRemote);
-                    //Workaround for null
-                    if (null == prevCW) {
-                        return;
-                    }
-
-                    LOG.info("Begin to close the remote address {} channel {}", addrRemote, prevCW);
-
-                    if (prevCW.getChannel() != channel) {
-                        LOG.info("Channel {} has been closed,this is a new channel.", prevCW.getChannel(), channel);
-                        removeItemFromTable = false;
-                    }
-
-                    if (removeItemFromTable) {
-                        this.channelTables.remove(addrRemote);
-                        LOG.info("Channel {} has been removed !", addrRemote);
-                    }
-
-                    channel.close().addListener(new ChannelFutureListener() {
-                        @Override
-                        public void operationComplete(ChannelFuture future) throws Exception {
-                            LOG.warn("Close channel {} {}", channel, future.isSuccess());
-                        }
-                    });
-                } catch (Exception e) {
-                    LOG.error("Close channel error !", e);
-                } finally {
-                    this.lockChannelTables.unlock();
-                }
-            } else {
-                LOG.warn("Can not lock channel table in {} ms", LOCK_TIMEOUT_MILLIS);
-            }
-        } catch (InterruptedException e) {
-            LOG.error("Close channel error !", e);
-        }
-    }
-
-    private void closeChannel(final Channel channel) {
-        if (null == channel)
-            return;
-
-        try {
-            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    boolean removeItemFromTable = true;
-                    ChannelWrapper prevCW = null;
-                    String addrRemote = null;
-
-                    for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) {
-                        ChannelWrapper prev = entry.getValue();
-                        if (prev.getChannel() != null) {
-                            if (prev.getChannel() == channel) {
-                                prevCW = prev;
-                                addrRemote = entry.getKey();
-                                break;
-                            }
-                        }
-                    }
-
-                    if (null == prevCW) {
-                        LOG.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
-                        removeItemFromTable = false;
-                    }
-
-                    if (removeItemFromTable) {
-                        this.channelTables.remove(addrRemote);
-                        LOG.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
-                        //RemotingHelper.closeChannel(channel);
-                    }
-                } catch (Exception e) {
-                    LOG.error("closeChannel: close the channel exception", e);
-                } finally {
-                    this.lockChannelTables.unlock();
-                }
-            } else {
-                LOG.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
-            }
-        } catch (InterruptedException e) {
-            LOG.error("closeChannel exception", e);
-        }
-    }
-
     @Override
     public RemotingCommand invoke(final String address, final RemotingCommand request, final long timeoutMillis) {
         request.trafficType(TrafficType.REQUEST_SYNC);
 
-        Channel channel = this.createIfAbsent(address);
+        Channel channel = this.clientChannelManager.createIfAbsent(address);
         if (channel != null && channel.isActive()) {
             try {
                 return this.invokeWithInterceptor(channel, request, timeoutMillis);
@@ -260,18 +158,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             } catch (RemoteTimeoutException e) {
                 if (this.clientConfig.isClientCloseSocketIfTimeout()) {
                     LOG.warn("invoke: timeout, so close the socket {} ms, {}", timeoutMillis, address);
-                    this.closeChannel(address, channel);
+                    this.clientChannelManager.closeChannel(address, channel);
                 }
 
                 LOG.warn("invoke: wait response timeout<{}ms> exception, so close the channel[{}]", timeoutMillis, address);
                 throw e;
             } finally {
                 if (this.clientConfig.isClientShortConnectionEnable()) {
-                    this.closeChannel(address, channel);
+                    this.clientChannelManager.closeChannel(address, channel);
                 }
             }
         } else {
-            this.closeChannel(address, channel);
+            this.clientChannelManager.closeChannel(address, channel);
             throw new RemoteConnectFailureException(address);
         }
 
@@ -281,113 +179,21 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
     public void invokeAsync(final String address, final RemotingCommand request, final AsyncHandler asyncHandler,
         final long timeoutMillis) {
 
-        final Channel channel = this.createIfAbsent(address);
+        final Channel channel = this.clientChannelManager.createIfAbsent(address);
         if (channel != null && channel.isActive()) {
             this.invokeAsyncWithInterceptor(channel, request, asyncHandler, timeoutMillis);
         } else {
-            this.closeChannel(address, channel);
+            this.clientChannelManager.closeChannel(address, channel);
         }
     }
 
     @Override
     public void invokeOneWay(final String address, final RemotingCommand request) {
-        final Channel channel = this.createIfAbsent(address);
+        final Channel channel = this.clientChannelManager.createIfAbsent(address);
         if (channel != null && channel.isActive()) {
             this.invokeOnewayWithInterceptor(channel, request);
         } else {
-            this.closeChannel(address, channel);
-        }
-    }
-
-    private Channel createIfAbsent(final String addr) {
-        ChannelWrapper cw = this.channelTables.get(addr);
-        if (cw != null && cw.isActive()) {
-            return cw.getChannel();
-        }
-        return this.createChannel(addr);
-    }
-
-    //FIXME need test to verify
-    private Channel createChannel(final String addr) {
-        ChannelWrapper cw = null;
-        try {
-            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    boolean createNewConnection;
-                    cw = this.channelTables.get(addr);
-                    if (cw != null) {
-                        if (cw.isActive()) {
-                            return cw.getChannel();
-                        } else if (!cw.getChannelFuture().isDone()) {
-                            createNewConnection = false;
-                        } else {
-                            this.channelTables.remove(addr);
-                            createNewConnection = true;
-                        }
-                    } else {
-                        createNewConnection = true;
-                    }
-
-                    if (createNewConnection) {
-                        String[] s = addr.split(":");
-                        SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1]));
-                        ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress);
-                        LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
-                        cw = new ChannelWrapper(channelFuture);
-                        this.channelTables.put(addr, cw);
-                    }
-                } catch (Exception e) {
-                    LOG.error("createChannel: create channel exception", e);
-                } finally {
-                    this.lockChannelTables.unlock();
-                }
-            } else {
-                LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
-            }
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-
-        if (cw != null) {
-            ChannelFuture channelFuture = cw.getChannelFuture();
-            if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) {
-                if (cw.isActive()) {
-                    LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
-                    return cw.getChannel();
-                } else {
-                    LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause());
-                    this.closeChannel(addr, cw.getChannel());
-                }
-            } else {
-                LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(),
-                    channelFuture.toString());
-                this.closeChannel(addr, cw.getChannel());
-            }
-        }
-        return null;
-    }
-
-    private class ChannelWrapper {
-        private final ChannelFuture channelFuture;
-
-        ChannelWrapper(ChannelFuture channelFuture) {
-            this.channelFuture = channelFuture;
-        }
-
-        boolean isActive() {
-            return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
-        }
-
-        boolean isWriteable() {
-            return this.channelFuture.channel().isWritable();
-        }
-
-        private Channel getChannel() {
-            return this.channelFuture.channel();
-        }
-
-        ChannelFuture getChannelFuture() {
-            return channelFuture;
+            this.clientChannelManager.closeChannel(address, channel);
         }
     }
 
@@ -407,7 +213,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
             LOG.info("Remote address {} disconnect channel {}.", ctx.channel().remoteAddress(), ctx.channel());
 
-            closeChannel(ctx.channel());
+            NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
 
             super.disconnect(ctx, promise);
 
@@ -418,7 +224,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
             LOG.info("Remote address {} close channel {}.", ctx.channel().remoteAddress(), ctx.channel());
 
-            closeChannel(ctx.channel());
+            NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
 
             super.close(ctx, promise);
 
@@ -431,7 +237,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                 IdleStateEvent event = (IdleStateEvent) evt;
                 if (event.state().equals(IdleState.ALL_IDLE)) {
                     LOG.info("Close channel {} because of idle event {} ", ctx.channel(), event);
-                    closeChannel(ctx.channel());
+                    NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
                     putNettyEvent(new NettyChannelEvent(NettyChannelEventType.IDLE, ctx.channel()));
                 }
             }
@@ -440,16 +246,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         }
 
         @Override
-        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
-            LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(),
-                ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable());
-        }
-
-        @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
             LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
-
-            closeChannel(ctx.channel());
+            NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel());
             putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel()));
         }
     }
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
index 55ce2d2..f1e9360 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java
@@ -233,12 +233,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
         }
 
         @Override
-        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
-            LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(),
-                ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable());
-        }
-
-        @Override
         public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
             putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause));
 
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java
new file mode 100644
index 0000000..89e4bff
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.internal;
+
+import io.netty.channel.Channel;
+import java.net.InetSocketAddress;
+
+public class RemotingUtil {
+    public static String extractRemoteAddress(Channel channel) {
+        return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
+    }
+}