You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:31 UTC

[14/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTimeoutException.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTimeoutException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTimeoutException.java
new file mode 100644
index 0000000..b8c1bb0
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTimeoutException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.exception;
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingTimeoutException extends RemotingException {
+
+    private static final long serialVersionUID = 4106899185095245979L;
+
+
+    public RemotingTimeoutException(String message) {
+        super(message);
+    }
+
+
+    public RemotingTimeoutException(String addr, long timeoutMillis) {
+        this(addr, timeoutMillis, null);
+    }
+
+
+    public RemotingTimeoutException(String addr, long timeoutMillis, Throwable cause) {
+        super("wait response on the channel <" + addr + "> timeout, " + timeoutMillis + "(ms)", cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTooMuchRequestException.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTooMuchRequestException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTooMuchRequestException.java
new file mode 100644
index 0000000..41be8b3
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTooMuchRequestException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.exception;
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingTooMuchRequestException extends RemotingException {
+    private static final long serialVersionUID = 4326919581254519654L;
+
+
+    public RemotingTooMuchRequestException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyClientConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyClientConfig.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyClientConfig.java
new file mode 100644
index 0000000..4665b28
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyClientConfig.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class NettyClientConfig {
+    /**
+     * Worker thread number
+     */
+    private int clientWorkerThreads = 4;
+    private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
+    private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
+    private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
+    private int connectTimeoutMillis = 3000;
+    private long channelNotActiveInterval = 1000 * 60;
+
+    /**
+     * IdleStateEvent will be triggered when neither read nor write was performed for
+     * the specified period of this time. Specify {@code 0} to disable
+     */
+    private int clientChannelMaxIdleTimeSeconds = 120;
+
+    private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;
+    private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
+    private boolean clientPooledByteBufAllocatorEnable = false;
+    private boolean clientCloseSocketIfTimeout = false;
+
+    public boolean isClientCloseSocketIfTimeout() {
+        return clientCloseSocketIfTimeout;
+    }
+
+    public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) {
+        this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout;
+    }
+
+    public int getClientWorkerThreads() {
+        return clientWorkerThreads;
+    }
+
+
+    public void setClientWorkerThreads(int clientWorkerThreads) {
+        this.clientWorkerThreads = clientWorkerThreads;
+    }
+
+
+    public int getClientOnewaySemaphoreValue() {
+        return clientOnewaySemaphoreValue;
+    }
+
+
+    public void setClientOnewaySemaphoreValue(int clientOnewaySemaphoreValue) {
+        this.clientOnewaySemaphoreValue = clientOnewaySemaphoreValue;
+    }
+
+
+    public int getConnectTimeoutMillis() {
+        return connectTimeoutMillis;
+    }
+
+
+    public void setConnectTimeoutMillis(int connectTimeoutMillis) {
+        this.connectTimeoutMillis = connectTimeoutMillis;
+    }
+
+
+    public int getClientCallbackExecutorThreads() {
+        return clientCallbackExecutorThreads;
+    }
+
+
+    public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
+        this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
+    }
+
+
+    public long getChannelNotActiveInterval() {
+        return channelNotActiveInterval;
+    }
+
+
+    public void setChannelNotActiveInterval(long channelNotActiveInterval) {
+        this.channelNotActiveInterval = channelNotActiveInterval;
+    }
+
+
+    public int getClientAsyncSemaphoreValue() {
+        return clientAsyncSemaphoreValue;
+    }
+
+
+    public void setClientAsyncSemaphoreValue(int clientAsyncSemaphoreValue) {
+        this.clientAsyncSemaphoreValue = clientAsyncSemaphoreValue;
+    }
+
+
+    public int getClientChannelMaxIdleTimeSeconds() {
+        return clientChannelMaxIdleTimeSeconds;
+    }
+
+
+    public void setClientChannelMaxIdleTimeSeconds(int clientChannelMaxIdleTimeSeconds) {
+        this.clientChannelMaxIdleTimeSeconds = clientChannelMaxIdleTimeSeconds;
+    }
+
+
+    public int getClientSocketSndBufSize() {
+        return clientSocketSndBufSize;
+    }
+
+
+    public void setClientSocketSndBufSize(int clientSocketSndBufSize) {
+        this.clientSocketSndBufSize = clientSocketSndBufSize;
+    }
+
+
+    public int getClientSocketRcvBufSize() {
+        return clientSocketRcvBufSize;
+    }
+
+
+    public void setClientSocketRcvBufSize(int clientSocketRcvBufSize) {
+        this.clientSocketRcvBufSize = clientSocketRcvBufSize;
+    }
+
+
+    public boolean isClientPooledByteBufAllocatorEnable() {
+        return clientPooledByteBufAllocatorEnable;
+    }
+
+
+    public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) {
+        this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyDecoder.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyDecoder.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyDecoder.java
new file mode 100644
index 0000000..9e68533
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyDecoder.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class NettyDecoder extends LengthFieldBasedFrameDecoder {
+    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+    private static final int FRAME_MAX_LENGTH = //
+            Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
+
+
+    public NettyDecoder() {
+        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
+    }
+
+
+    @Override
+    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+        ByteBuf frame = null;
+        try {
+            frame = (ByteBuf) super.decode(ctx, in);
+            if (null == frame) {
+                return null;
+            }
+
+            ByteBuffer byteBuffer = frame.nioBuffer();
+
+            return RemotingCommand.decode(byteBuffer);
+        } catch (Exception e) {
+            log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
+            RemotingUtil.closeChannel(ctx.channel());
+        } finally {
+            if (null != frame) {
+                frame.release();
+            }
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEncoder.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEncoder.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEncoder.java
new file mode 100644
index 0000000..c6c901c
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEncoder.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
+    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    @Override
+    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
+            throws Exception {
+        try {
+            ByteBuffer header = remotingCommand.encodeHeader();
+            out.writeBytes(header);
+            byte[] body = remotingCommand.getBody();
+            if (body != null) {
+                out.writeBytes(body);
+            }
+        } catch (Exception e) {
+            log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
+            if (remotingCommand != null) {
+                log.error(remotingCommand.toString());
+            }
+            RemotingUtil.closeChannel(ctx.channel());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEvent.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEvent.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEvent.java
new file mode 100644
index 0000000..14a2071
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEvent.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import io.netty.channel.Channel;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class NettyEvent {
+    private final NettyEventType type;
+    private final String remoteAddr;
+    private final Channel channel;
+
+
+    public NettyEvent(NettyEventType type, String remoteAddr, Channel channel) {
+        this.type = type;
+        this.remoteAddr = remoteAddr;
+        this.channel = channel;
+    }
+
+
+    public NettyEventType getType() {
+        return type;
+    }
+
+
+    public String getRemoteAddr() {
+        return remoteAddr;
+    }
+
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+
+    @Override
+    public String toString() {
+        return "NettyEvent [type=" + type + ", remoteAddr=" + remoteAddr + ", channel=" + channel + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEventType.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEventType.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEventType.java
new file mode 100644
index 0000000..3113147
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEventType.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public enum NettyEventType {
+    CONNECT,
+    CLOSE,
+    IDLE,
+    EXCEPTION
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
new file mode 100644
index 0000000..70ae5b5
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.ChannelEventListener;
+import com.alibaba.rocketmq.remoting.InvokeCallback;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.common.Pair;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
+import com.alibaba.rocketmq.remoting.common.ServiceThread;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSysResponseCode;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author shijia.wxr
+ */
+public abstract class NettyRemotingAbstract {
+    private static final Logger PLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+
+    protected final Semaphore semaphoreOneway;
+
+
+    protected final Semaphore semaphoreAsync;
+
+
+    protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
+            new ConcurrentHashMap<Integer, ResponseFuture>(256);
+
+    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
+            new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
+    protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter();
+
+    protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
+
+
+    public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
+        this.semaphoreOneway = new Semaphore(permitsOneway, true);
+        this.semaphoreAsync = new Semaphore(permitsAsync, true);
+    }
+
+    public abstract ChannelEventListener getChannelEventListener();
+
+    public void putNettyEvent(final NettyEvent event) {
+        this.nettyEventExecuter.putNettyEvent(event);
+    }
+
+    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+        final RemotingCommand cmd = msg;
+        if (cmd != null) {
+            switch (cmd.getType()) {
+                case REQUEST_COMMAND:
+                    processRequestCommand(ctx, cmd);
+                    break;
+                case RESPONSE_COMMAND:
+                    processResponseCommand(ctx, cmd);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
+        final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
+        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
+        final int opaque = cmd.getOpaque();
+
+        if (pair != null) {
+            Runnable run = new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
+                        if (rpcHook != null) {
+                            rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
+                        }
+
+                        final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
+                        if (rpcHook != null) {
+                            rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
+                        }
+
+                        if (!cmd.isOnewayRPC()) {
+                            if (response != null) {
+                                response.setOpaque(opaque);
+                                response.markResponseType();
+                                try {
+                                    ctx.writeAndFlush(response);
+                                } catch (Throwable e) {
+                                    PLOG.error("process request over, but response failed", e);
+                                    PLOG.error(cmd.toString());
+                                    PLOG.error(response.toString());
+                                }
+                            } else {
+
+                            }
+                        }
+                    } catch (Throwable e) {
+                        if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
+                                .equals(e.getClass().getCanonicalName())) {
+                            PLOG.error("process request exception", e);
+                            PLOG.error(cmd.toString());
+                        }
+
+                        if (!cmd.isOnewayRPC()) {
+                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
+                                    RemotingHelper.exceptionSimpleDesc(e));
+                            response.setOpaque(opaque);
+                            ctx.writeAndFlush(response);
+                        }
+                    }
+                }
+            };
+
+            if (pair.getObject1().rejectRequest()) {
+                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
+                        "[REJECTREQUEST]system busy, start flow control for a while");
+                response.setOpaque(opaque);
+                ctx.writeAndFlush(response);
+                return;
+            }
+
+            try {
+                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
+                pair.getObject2().submit(requestTask);
+            } catch (RejectedExecutionException e) {
+                if ((System.currentTimeMillis() % 10000) == 0) {
+                    PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
+                            + ", too many requests and system thread pool busy, RejectedExecutionException " //
+                            + pair.getObject2().toString() //
+                            + " request code: " + cmd.getCode());
+                }
+
+                if (!cmd.isOnewayRPC()) {
+                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
+                            "[OVERLOAD]system busy, start flow control for a while");
+                    response.setOpaque(opaque);
+                    ctx.writeAndFlush(response);
+                }
+            }
+        } else {
+            String error = " request type " + cmd.getCode() + " not supported";
+            final RemotingCommand response =
+                    RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
+            response.setOpaque(opaque);
+            ctx.writeAndFlush(response);
+            PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
+        }
+    }
+
+    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
+        final int opaque = cmd.getOpaque();
+        final ResponseFuture responseFuture = responseTable.get(opaque);
+        if (responseFuture != null) {
+            responseFuture.setResponseCommand(cmd);
+
+            responseFuture.release();
+
+            responseTable.remove(opaque);
+
+            if (responseFuture.getInvokeCallback() != null) {
+                boolean runInThisThread = false;
+                ExecutorService executor = this.getCallbackExecutor();
+                if (executor != null) {
+                    try {
+                        executor.submit(new Runnable() {
+                            @Override
+                            public void run() {
+                                try {
+                                    responseFuture.executeInvokeCallback();
+                                } catch (Throwable e) {
+                                    PLOG.warn("execute callback in executor exception, and callback throw", e);
+                                }
+                            }
+                        });
+                    } catch (Exception e) {
+                        runInThisThread = true;
+                        PLOG.warn("execute callback in executor exception, maybe executor busy", e);
+                    }
+                } else {
+                    runInThisThread = true;
+                }
+
+                if (runInThisThread) {
+                    try {
+                        responseFuture.executeInvokeCallback();
+                    } catch (Throwable e) {
+                        PLOG.warn("executeInvokeCallback Exception", e);
+                    }
+                }
+            } else {
+                responseFuture.putResponse(cmd);
+            }
+        } else {
+            PLOG.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+            PLOG.warn(cmd.toString());
+        }
+    }
+
+    public abstract RPCHook getRPCHook();
+
+    abstract public ExecutorService getCallbackExecutor();
+
+    public void scanResponseTable() {
+        final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
+        Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<Integer, ResponseFuture> next = it.next();
+            ResponseFuture rep = next.getValue();
+
+            if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
+                rep.release();
+                it.remove();
+                rfList.add(rep);
+                PLOG.warn("remove timeout request, " + rep);
+            }
+        }
+
+        for (ResponseFuture rf : rfList) {
+            try {
+                rf.executeInvokeCallback();
+            } catch (Throwable e) {
+                PLOG.warn("scanResponseTable, operationComplete Exception", e);
+            }
+        }
+    }
+
+    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
+            throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
+        final int opaque = request.getOpaque();
+
+        try {
+            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
+            this.responseTable.put(opaque, responseFuture);
+            final SocketAddress addr = channel.remoteAddress();
+            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture f) throws Exception {
+                    if (f.isSuccess()) {
+                        responseFuture.setSendRequestOK(true);
+                        return;
+                    } else {
+                        responseFuture.setSendRequestOK(false);
+                    }
+
+                    responseTable.remove(opaque);
+                    responseFuture.setCause(f.cause());
+                    responseFuture.putResponse(null);
+                    PLOG.warn("send a request command to channel <" + addr + "> failed.");
+                }
+            });
+
+            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
+            if (null == responseCommand) {
+                if (responseFuture.isSendRequestOK()) {
+                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
+                            responseFuture.getCause());
+                } else {
+                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
+                }
+            }
+
+            return responseCommand;
+        } finally {
+            this.responseTable.remove(opaque);
+        }
+    }
+
+    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
+                                final InvokeCallback invokeCallback)
+            throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        final int opaque = request.getOpaque();
+        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+        if (acquired) {
+            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
+
+            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
+            this.responseTable.put(opaque, responseFuture);
+            try {
+                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture f) throws Exception {
+                        if (f.isSuccess()) {
+                            responseFuture.setSendRequestOK(true);
+                            return;
+                        } else {
+                            responseFuture.setSendRequestOK(false);
+                        }
+
+                        responseFuture.putResponse(null);
+                        responseTable.remove(opaque);
+                        try {
+                            responseFuture.executeInvokeCallback();
+                        } catch (Throwable e) {
+                            PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);
+                        } finally {
+                            responseFuture.release();
+                        }
+
+                        PLOG.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
+                    }
+                });
+            } catch (Exception e) {
+                responseFuture.release();
+                PLOG.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
+                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
+            }
+        } else {
+            String info =
+                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
+                            timeoutMillis, //
+                            this.semaphoreAsync.getQueueLength(), //
+                            this.semaphoreAsync.availablePermits()//
+                    );
+            PLOG.warn(info);
+            throw new RemotingTooMuchRequestException(info);
+        }
+    }
+
+    public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
+            throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        request.markOnewayRPC();
+        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+        if (acquired) {
+            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
+            try {
+                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture f) throws Exception {
+                        once.release();
+                        if (!f.isSuccess()) {
+                            PLOG.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
+                        }
+                    }
+                });
+            } catch (Exception e) {
+                once.release();
+                PLOG.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
+                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
+            }
+        } else {
+            if (timeoutMillis <= 0) {
+                throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
+            } else {
+                String info = String.format(
+                        "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
+                        timeoutMillis, //
+                        this.semaphoreOneway.getQueueLength(), //
+                        this.semaphoreOneway.availablePermits()//
+                );
+                PLOG.warn(info);
+                throw new RemotingTimeoutException(info);
+            }
+        }
+    }
+
+    class NettyEventExecuter extends ServiceThread {
+        private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
+        private final int maxSize = 10000;
+
+
+        public void putNettyEvent(final NettyEvent event) {
+            if (this.eventQueue.size() <= maxSize) {
+                this.eventQueue.add(event);
+            } else {
+                PLOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
+            }
+        }
+
+
+        @Override
+        public void run() {
+            PLOG.info(this.getServiceName() + " service started");
+
+            final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
+
+            while (!this.isStopped()) {
+                try {
+                    NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
+                    if (event != null && listener != null) {
+                        switch (event.getType()) {
+                            case IDLE:
+                                listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
+                                break;
+                            case CLOSE:
+                                listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
+                                break;
+                            case CONNECT:
+                                listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
+                                break;
+                            case EXCEPTION:
+                                listener.onChannelException(event.getRemoteAddr(), event.getChannel());
+                                break;
+                            default:
+                                break;
+
+                        }
+                    }
+                } catch (Exception e) {
+                    PLOG.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            PLOG.info(this.getServiceName() + " service end");
+        }
+
+
+        @Override
+        public String getServiceName() {
+            return NettyEventExecuter.class.getSimpleName();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java
new file mode 100644
index 0000000..68555c5
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -0,0 +1,682 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.ChannelEventListener;
+import com.alibaba.rocketmq.remoting.InvokeCallback;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.RemotingClient;
+import com.alibaba.rocketmq.remoting.common.Pair;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
+    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+    private static final long LOCK_TIMEOUT_MILLIS = 3000;
+
+    private final NettyClientConfig nettyClientConfig;
+    private final Bootstrap bootstrap = new Bootstrap();
+    private final EventLoopGroup eventLoopGroupWorker;
+    private final Lock lockChannelTables = new ReentrantLock();
+    private final ConcurrentHashMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
+
+    private final Timer timer = new Timer("ClientHouseKeepingService", true);
+
+    private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
+    private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
+    private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
+    private final Lock lockNamesrvChannel = new ReentrantLock();
+
+    private final ExecutorService publicExecutor;
+    private final ChannelEventListener channelEventListener;
+    private DefaultEventExecutorGroup defaultEventExecutorGroup;
+    private RPCHook rpcHook;
+
+    public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
+        this(nettyClientConfig, null);
+    }
+
+    public NettyRemotingClient(final NettyClientConfig nettyClientConfig, //
+                               final ChannelEventListener channelEventListener) {
+        super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
+        this.nettyClientConfig = nettyClientConfig;
+        this.channelEventListener = channelEventListener;
+
+        int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
+        if (publicThreadNums <= 0) {
+            publicThreadNums = 4;
+        }
+
+        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+
+
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
+            }
+        });
+
+        this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+
+
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
+            }
+        });
+    }
+
+    private static int initValueIndex() {
+        Random r = new Random();
+
+        return Math.abs(r.nextInt() % 999) % 999;
+    }
+
+    @Override
+    public void start() {
+        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
+                nettyClientConfig.getClientWorkerThreads(), //
+                new ThreadFactory() {
+
+                    private AtomicInteger threadIndex = new AtomicInteger(0);
+
+
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
+                    }
+                });
+
+        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
+                .option(ChannelOption.TCP_NODELAY, true)
+                .option(ChannelOption.SO_KEEPALIVE, false)
+                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
+                .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
+                .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
+                .handler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    public void initChannel(SocketChannel ch) throws Exception {
+                        ch.pipeline().addLast(
+                                defaultEventExecutorGroup,
+                                new NettyEncoder(),
+                                new NettyDecoder(),
+                                new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
+                                new NettyConnetManageHandler(),
+                                new NettyClientHandler());
+                    }
+                });
+
+        this.timer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    NettyRemotingClient.this.scanResponseTable();
+                } catch (Exception e) {
+                    log.error("scanResponseTable exception", e);
+                }
+            }
+        }, 1000 * 3, 1000);
+
+        if (this.channelEventListener != null) {
+            this.nettyEventExecuter.start();
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            this.timer.cancel();
+
+            for (ChannelWrapper cw : this.channelTables.values()) {
+                this.closeChannel(null, cw.getChannel());
+            }
+
+            this.channelTables.clear();
+
+            this.eventLoopGroupWorker.shutdownGracefully();
+
+            if (this.nettyEventExecuter != null) {
+                this.nettyEventExecuter.shutdown();
+            }
+
+            if (this.defaultEventExecutorGroup != null) {
+                this.defaultEventExecutorGroup.shutdownGracefully();
+            }
+        } catch (Exception e) {
+            log.error("NettyRemotingClient shutdown exception, ", e);
+        }
+
+        if (this.publicExecutor != null) {
+            try {
+                this.publicExecutor.shutdown();
+            } catch (Exception e) {
+                log.error("NettyRemotingServer shutdown exception, ", e);
+            }
+        }
+    }
+
+    public void closeChannel(final String addr, final Channel channel) {
+        if (null == channel)
+            return;
+
+        final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr;
+
+        try {
+            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    boolean removeItemFromTable = true;
+                    final ChannelWrapper prevCW = this.channelTables.get(addrRemote);
+
+                    log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
+
+                    if (null == prevCW) {
+                        log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+                        removeItemFromTable = false;
+                    } else if (prevCW.getChannel() != channel) {
+                        log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
+                                addrRemote);
+                        removeItemFromTable = false;
+                    }
+
+                    if (removeItemFromTable) {
+                        this.channelTables.remove(addrRemote);
+                        log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+                    }
+
+                    RemotingUtil.closeChannel(channel);
+                } catch (Exception e) {
+                    log.error("closeChannel: close the channel exception", e);
+                } finally {
+                    this.lockChannelTables.unlock();
+                }
+            } else {
+                log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+            }
+        } catch (InterruptedException e) {
+            log.error("closeChannel exception", e);
+        }
+    }
+
+    @Override
+    public void registerRPCHook(RPCHook rpcHook) {
+        this.rpcHook = rpcHook;
+    }
+
+    public void closeChannel(final Channel channel) {
+        if (null == channel)
+            return;
+
+        try {
+            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    boolean removeItemFromTable = true;
+                    ChannelWrapper prevCW = null;
+                    String addrRemote = null;
+                    for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) {
+                        String key = entry.getKey();
+                        ChannelWrapper prev = entry.getValue();
+                        if (prev.getChannel() != null) {
+                            if (prev.getChannel() == channel) {
+                                prevCW = prev;
+                                addrRemote = key;
+                                break;
+                            }
+                        }
+                    }
+
+                    if (null == prevCW) {
+                        log.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+                        removeItemFromTable = false;
+                    }
+
+                    if (removeItemFromTable) {
+                        this.channelTables.remove(addrRemote);
+                        log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+                        RemotingUtil.closeChannel(channel);
+                    }
+                } catch (Exception e) {
+                    log.error("closeChannel: close the channel exception", e);
+                } finally {
+                    this.lockChannelTables.unlock();
+                }
+            } else {
+                log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+            }
+        } catch (InterruptedException e) {
+            log.error("closeChannel exception", e);
+        }
+    }
+
+    @Override
+    public void updateNameServerAddressList(List<String> addrs) {
+        List<String> old = this.namesrvAddrList.get();
+        boolean update = false;
+
+        if (!addrs.isEmpty()) {
+            if (null == old) {
+                update = true;
+            } else if (addrs.size() != old.size()) {
+                update = true;
+            } else {
+                for (int i = 0; i < addrs.size() && !update; i++) {
+                    if (!old.contains(addrs.get(i))) {
+                        update = true;
+                    }
+                }
+            }
+
+            if (update) {
+                Collections.shuffle(addrs);
+                this.namesrvAddrList.set(addrs);
+            }
+        }
+    }
+
+    @Override
+    public List<String> getNameServerAddressList() {
+        return this.namesrvAddrList.get();
+    }
+
+    @Override
+    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
+            throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
+        final Channel channel = this.getAndCreateChannel(addr);
+        if (channel != null && channel.isActive()) {
+            try {
+                if (this.rpcHook != null) {
+                    this.rpcHook.doBeforeRequest(addr, request);
+                }
+                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
+                if (this.rpcHook != null) {
+                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
+                }
+                return response;
+            } catch (RemotingSendRequestException e) {
+                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
+                this.closeChannel(addr, channel);
+                throw e;
+            } catch (RemotingTimeoutException e) {
+                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
+                    this.closeChannel(addr, channel);
+                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
+                }
+                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
+                throw e;
+            }
+        } else {
+            this.closeChannel(addr, channel);
+            throw new RemotingConnectException(addr);
+        }
+    }
+
+    private Channel getAndCreateChannel(final String addr) throws InterruptedException {
+        if (null == addr)
+            return getAndCreateNameserverChannel();
+
+        ChannelWrapper cw = this.channelTables.get(addr);
+        if (cw != null && cw.isOK()) {
+            return cw.getChannel();
+        }
+
+        return this.createChannel(addr);
+    }
+
+    private Channel getAndCreateNameserverChannel() throws InterruptedException {
+        String addr = this.namesrvAddrChoosed.get();
+        if (addr != null) {
+            ChannelWrapper cw = this.channelTables.get(addr);
+            if (cw != null && cw.isOK()) {
+                return cw.getChannel();
+            }
+        }
+
+        final List<String> addrList = this.namesrvAddrList.get();
+        if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+            try {
+                addr = this.namesrvAddrChoosed.get();
+                if (addr != null) {
+                    ChannelWrapper cw = this.channelTables.get(addr);
+                    if (cw != null && cw.isOK()) {
+                        return cw.getChannel();
+                    }
+                }
+
+                if (addrList != null && !addrList.isEmpty()) {
+                    for (int i = 0; i < addrList.size(); i++) {
+                        int index = this.namesrvIndex.incrementAndGet();
+                        index = Math.abs(index);
+                        index = index % addrList.size();
+                        String newAddr = addrList.get(index);
+
+                        this.namesrvAddrChoosed.set(newAddr);
+                        Channel channelNew = this.createChannel(newAddr);
+                        if (channelNew != null)
+                            return channelNew;
+                    }
+                }
+            } catch (Exception e) {
+                log.error("getAndCreateNameserverChannel: create name server channel exception", e);
+            } finally {
+                this.lockNamesrvChannel.unlock();
+            }
+        } else {
+            log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+        }
+
+        return null;
+    }
+
+    private Channel createChannel(final String addr) throws InterruptedException {
+        ChannelWrapper cw = this.channelTables.get(addr);
+        if (cw != null && cw.isOK()) {
+            return cw.getChannel();
+        }
+
+
+        if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+            try {
+                boolean createNewConnection = false;
+                cw = this.channelTables.get(addr);
+                if (cw != null) {
+
+                    if (cw.isOK()) {
+                        return cw.getChannel();
+                    } else if (!cw.getChannelFuture().isDone()) {
+                        createNewConnection = false;
+                    } else {
+                        this.channelTables.remove(addr);
+                        createNewConnection = true;
+                    }
+                } else {
+                    createNewConnection = true;
+                }
+
+                if (createNewConnection) {
+                    ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
+                    log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
+                    cw = new ChannelWrapper(channelFuture);
+                    this.channelTables.put(addr, cw);
+                }
+            } catch (Exception e) {
+                log.error("createChannel: create channel exception", e);
+            } finally {
+                this.lockChannelTables.unlock();
+            }
+        } else {
+            log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+        }
+
+        if (cw != null) {
+            ChannelFuture channelFuture = cw.getChannelFuture();
+            if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
+                if (cw.isOK()) {
+                    log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
+                    return cw.getChannel();
+                } else {
+                    log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
+                }
+            } else {
+                log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
+                        channelFuture.toString());
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
+            throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
+            RemotingSendRequestException {
+        final Channel channel = this.getAndCreateChannel(addr);
+        if (channel != null && channel.isActive()) {
+            try {
+                if (this.rpcHook != null) {
+                    this.rpcHook.doBeforeRequest(addr, request);
+                }
+                this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
+            } catch (RemotingSendRequestException e) {
+                log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
+                this.closeChannel(addr, channel);
+                throw e;
+            }
+        } else {
+            this.closeChannel(addr, channel);
+            throw new RemotingConnectException(addr);
+        }
+    }
+
+    @Override
+    public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
+            RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        final Channel channel = this.getAndCreateChannel(addr);
+        if (channel != null && channel.isActive()) {
+            try {
+                if (this.rpcHook != null) {
+                    this.rpcHook.doBeforeRequest(addr, request);
+                }
+                this.invokeOnewayImpl(channel, request, timeoutMillis);
+            } catch (RemotingSendRequestException e) {
+                log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
+                this.closeChannel(addr, channel);
+                throw e;
+            }
+        } else {
+            this.closeChannel(addr, channel);
+            throw new RemotingConnectException(addr);
+        }
+    }
+
+    @Override
+    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
+        ExecutorService executorThis = executor;
+        if (null == executor) {
+            executorThis = this.publicExecutor;
+        }
+
+        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+        this.processorTable.put(requestCode, pair);
+    }
+
+    @Override
+    public boolean isChannelWriteable(String addr) {
+        ChannelWrapper cw = this.channelTables.get(addr);
+        if (cw != null && cw.isOK()) {
+            return cw.isWriteable();
+        }
+        return true;
+    }
+
+    @Override
+    public ChannelEventListener getChannelEventListener() {
+        return channelEventListener;
+    }
+
+    @Override
+    public RPCHook getRPCHook() {
+        return this.rpcHook;
+    }
+
+    @Override
+    public ExecutorService getCallbackExecutor() {
+        return this.publicExecutor;
+    }
+
+    public List<String> getNamesrvAddrList() {
+        return namesrvAddrList.get();
+    }
+
+    public RPCHook getRpcHook() {
+        return rpcHook;
+    }
+
+    static class ChannelWrapper {
+        private final ChannelFuture channelFuture;
+
+
+        public ChannelWrapper(ChannelFuture channelFuture) {
+            this.channelFuture = channelFuture;
+        }
+
+
+        public boolean isOK() {
+            return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
+        }
+
+
+        public boolean isWriteable() {
+            return this.channelFuture.channel().isWritable();
+        }
+
+
+        private Channel getChannel() {
+            return this.channelFuture.channel();
+        }
+
+
+        public ChannelFuture getChannelFuture() {
+            return channelFuture;
+        }
+    }
+
+    class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+            processMessageReceived(ctx, msg);
+
+        }
+    }
+
+    class NettyConnetManageHandler extends ChannelDuplexHandler {
+        @Override
+        public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise)
+                throws Exception {
+            final String local = localAddress == null ? "UNKNOW" : localAddress.toString();
+            final String remote = remoteAddress == null ? "UNKNOW" : remoteAddress.toString();
+            log.info("NETTY CLIENT PIPELINE: CONNECT  {} => {}", local, remote);
+            super.connect(ctx, remoteAddress, localAddress, promise);
+
+            if (NettyRemotingClient.this.channelEventListener != null) {
+                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel()));
+            }
+        }
+
+
+        @Override
+        public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
+            closeChannel(ctx.channel());
+            super.disconnect(ctx, promise);
+
+            if (NettyRemotingClient.this.channelEventListener != null) {
+                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
+            }
+        }
+
+
+        @Override
+        public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
+            closeChannel(ctx.channel());
+            super.close(ctx, promise);
+
+            if (NettyRemotingClient.this.channelEventListener != null) {
+                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
+            }
+        }
+
+        @Override
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+            if (evt instanceof IdleStateEvent) {
+                IdleStateEvent evnet = (IdleStateEvent) evt;
+                if (evnet.state().equals(IdleState.ALL_IDLE)) {
+                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+                    log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
+                    closeChannel(ctx.channel());
+                    if (NettyRemotingClient.this.channelEventListener != null) {
+                        NettyRemotingClient.this
+                                .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
+                    }
+                }
+            }
+
+            ctx.fireUserEventTriggered(evt);
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
+            log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
+            closeChannel(ctx.channel());
+            if (NettyRemotingClient.this.channelEventListener != null) {
+                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel()));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingServer.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingServer.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingServer.java
new file mode 100644
index 0000000..a14947e
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.ChannelEventListener;
+import com.alibaba.rocketmq.remoting.InvokeCallback;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.RemotingServer;
+import com.alibaba.rocketmq.remoting.common.Pair;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
+    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+    private final ServerBootstrap serverBootstrap;
+    private final EventLoopGroup eventLoopGroupSelector;
+    private final EventLoopGroup eventLoopGroupBoss;
+    private final NettyServerConfig nettyServerConfig;
+
+    private final ExecutorService publicExecutor;
+    private final ChannelEventListener channelEventListener;
+
+    private final Timer timer = new Timer("ServerHouseKeepingService", true);
+    private DefaultEventExecutorGroup defaultEventExecutorGroup;
+
+    private RPCHook rpcHook;
+
+
+    private int port = 0;
+
+
+    public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
+        this(nettyServerConfig, null);
+    }
+
+
+    public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) {
+        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
+        this.serverBootstrap = new ServerBootstrap();
+        this.nettyServerConfig = nettyServerConfig;
+        this.channelEventListener = channelEventListener;
+
+        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
+        if (publicThreadNums <= 0) {
+            publicThreadNums = 4;
+        }
+
+        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+
+
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
+            }
+        });
+
+        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+
+
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
+            }
+        });
+
+        if (RemotingUtil.isLinuxPlatform() //
+                && nettyServerConfig.isUseEpollNativeSelector()) {
+            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
+                private AtomicInteger threadIndex = new AtomicInteger(0);
+                private int threadTotal = nettyServerConfig.getServerSelectorThreads();
+
+
+                @Override
+                public Thread newThread(Runnable r) {
+                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
+                }
+            });
+        } else {
+            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
+                private AtomicInteger threadIndex = new AtomicInteger(0);
+                private int threadTotal = nettyServerConfig.getServerSelectorThreads();
+
+
+                @Override
+                public Thread newThread(Runnable r) {
+                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
+                }
+            });
+        }
+    }
+
+
+    @Override
+    public void start() {
+        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
+                nettyServerConfig.getServerWorkerThreads(), //
+                new ThreadFactory() {
+
+                    private AtomicInteger threadIndex = new AtomicInteger(0);
+
+
+                    @Override
+                    public Thread newThread(Runnable r) {
+                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
+                    }
+                });
+
+        ServerBootstrap childHandler =
+                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
+                        .option(ChannelOption.SO_BACKLOG, 1024)
+                        .option(ChannelOption.SO_REUSEADDR, true)
+                        .option(ChannelOption.SO_KEEPALIVE, false)
+                        .childOption(ChannelOption.TCP_NODELAY, true)
+                        .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
+                        .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
+                        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
+                        .childHandler(new ChannelInitializer<SocketChannel>() {
+                            @Override
+                            public void initChannel(SocketChannel ch) throws Exception {
+                                ch.pipeline().addLast(
+                                        defaultEventExecutorGroup,
+                                        new NettyEncoder(),
+                                        new NettyDecoder(),
+                                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+                                        new NettyConnetManageHandler(),
+                                        new NettyServerHandler());
+                            }
+                        });
+
+        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
+            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+        }
+
+        try {
+            ChannelFuture sync = this.serverBootstrap.bind().sync();
+            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
+            this.port = addr.getPort();
+        } catch (InterruptedException e1) {
+            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
+        }
+
+        if (this.channelEventListener != null) {
+            this.nettyEventExecuter.start();
+        }
+
+        this.timer.scheduleAtFixedRate(new TimerTask() {
+
+            @Override
+            public void run() {
+                try {
+                    NettyRemotingServer.this.scanResponseTable();
+                } catch (Exception e) {
+                    log.error("scanResponseTable exception", e);
+                }
+            }
+        }, 1000 * 3, 1000);
+    }
+
+    @Override
+    public void shutdown() {
+        try {
+            if (this.timer != null) {
+                this.timer.cancel();
+            }
+
+            this.eventLoopGroupBoss.shutdownGracefully();
+
+            this.eventLoopGroupSelector.shutdownGracefully();
+
+            if (this.nettyEventExecuter != null) {
+                this.nettyEventExecuter.shutdown();
+            }
+
+            if (this.defaultEventExecutorGroup != null) {
+                this.defaultEventExecutorGroup.shutdownGracefully();
+            }
+        } catch (Exception e) {
+            log.error("NettyRemotingServer shutdown exception, ", e);
+        }
+
+        if (this.publicExecutor != null) {
+            try {
+                this.publicExecutor.shutdown();
+            } catch (Exception e) {
+                log.error("NettyRemotingServer shutdown exception, ", e);
+            }
+        }
+    }
+
+    @Override
+    public void registerRPCHook(RPCHook rpcHook) {
+        this.rpcHook = rpcHook;
+    }
+
+    @Override
+    public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
+        ExecutorService executorThis = executor;
+        if (null == executor) {
+            executorThis = this.publicExecutor;
+        }
+
+        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+        this.processorTable.put(requestCode, pair);
+    }
+
+    @Override
+    public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
+        this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
+    }
+
+    @Override
+    public int localListenPort() {
+        return this.port;
+    }
+
+    @Override
+    public Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(int requestCode) {
+        return processorTable.get(requestCode);
+    }
+
+    @Override
+    public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
+            throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
+        return this.invokeSyncImpl(channel, request, timeoutMillis);
+    }
+
+    @Override
+    public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
+            throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
+    }
+
+    @Override
+    public void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException,
+            RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        this.invokeOnewayImpl(channel, request, timeoutMillis);
+    }
+
+    @Override
+    public ChannelEventListener getChannelEventListener() {
+        return channelEventListener;
+    }
+
+    @Override
+    public RPCHook getRPCHook() {
+        return this.rpcHook;
+    }
+
+    @Override
+    public ExecutorService getCallbackExecutor() {
+        return this.publicExecutor;
+    }
+
+    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+            processMessageReceived(ctx, msg);
+        }
+    }
+
+    class NettyConnetManageHandler extends ChannelDuplexHandler {
+        @Override
+        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
+            super.channelRegistered(ctx);
+        }
+
+
+        @Override
+        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
+            super.channelUnregistered(ctx);
+        }
+
+
+        @Override
+        public void channelActive(ChannelHandlerContext ctx) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
+            super.channelActive(ctx);
+
+            if (NettyRemotingServer.this.channelEventListener != null) {
+                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel()));
+            }
+        }
+
+
+        @Override
+        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
+            super.channelInactive(ctx);
+
+            if (NettyRemotingServer.this.channelEventListener != null) {
+                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
+            }
+        }
+
+
+        @Override
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+            if (evt instanceof IdleStateEvent) {
+                IdleStateEvent evnet = (IdleStateEvent) evt;
+                if (evnet.state().equals(IdleState.ALL_IDLE)) {
+                    final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+                    log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
+                    RemotingUtil.closeChannel(ctx.channel());
+                    if (NettyRemotingServer.this.channelEventListener != null) {
+                        NettyRemotingServer.this
+                                .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
+                    }
+                }
+            }
+
+            ctx.fireUserEventTriggered(evt);
+        }
+
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+            final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+            log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
+            log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
+
+            if (NettyRemotingServer.this.channelEventListener != null) {
+                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel()));
+            }
+
+            RemotingUtil.closeChannel(ctx.channel());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRequestProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRequestProcessor.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRequestProcessor.java
new file mode 100644
index 0000000..dae7f9e
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRequestProcessor.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+
+
+/**
+ * Common remoting command processor
+ *
+ * @author shijia.wxr
+ *
+ */
+public interface NettyRequestProcessor {
+    RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
+            throws Exception;
+    boolean rejectRequest();
+}