You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/19 07:23:50 UTC

[6/8] incubator-rocketmq git commit: initialize RocketMQ5

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java
new file mode 100644
index 0000000..bbd33ea
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/ChannelHandlerContextWrapperImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.channel;
+
+import org.apache.rocketmq.remoting.api.channel.ChannelHandlerContextWrapper;
+
+public class ChannelHandlerContextWrapperImpl<ChannelHandlerContext> implements ChannelHandlerContextWrapper {
+
+    private io.netty.channel.ChannelHandlerContext context;
+
+    public ChannelHandlerContextWrapperImpl(io.netty.channel.ChannelHandlerContext context) {
+        this.context = context;
+    }
+
+    public io.netty.channel.ChannelHandlerContext getContext() {
+        return context;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java
new file mode 100644
index 0000000..b90afc1
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.channel;
+
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+import org.apache.rocketmq.remoting.api.channel.ChunkRegion;
+
+public class FileRegionImpl extends AbstractReferenceCounted implements FileRegion {
+    private final ChunkRegion chunkRegion;
+
+    public FileRegionImpl(ChunkRegion chunkRegion) {
+        this.chunkRegion = chunkRegion;
+    }
+
+    @Override
+    public long position() {
+        return chunkRegion.position();
+    }
+
+    @Override
+    public long transfered() {
+        return chunkRegion.transferred();
+    }
+
+    @Override
+    public long transferred() {
+        return chunkRegion.transferred();
+    }
+
+    @Override
+    public long count() {
+        return chunkRegion.count();
+    }
+
+    @Override
+    public long transferTo(WritableByteChannel target, long position) throws IOException {
+        return chunkRegion.transferTo(target, position);
+    }
+
+    @Override
+    protected void deallocate() {
+        chunkRegion.release();
+    }
+
+    @Override
+    public FileRegion retain() {
+        super.retain();
+        return this;
+    }
+
+    @Override
+    public FileRegion retain(int increment) {
+        super.retain(increment);
+        return this;
+    }
+
+    @Override
+    public FileRegion touch() {
+        return this;
+    }
+
+    @Override
+    public FileRegion touch(Object hint) {
+        return this;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java
new file mode 100644
index 0000000..ba4a969
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.channel;
+
+import io.netty.channel.Channel;
+import java.net.SocketAddress;
+import org.apache.rocketmq.remoting.api.channel.ChunkRegion;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+
+public class NettyChannelImpl implements RemotingChannel {
+    private final io.netty.channel.Channel channel;
+
+    public NettyChannelImpl(Channel channel) {
+        this.channel = channel;
+    }
+
+    @Override
+    public SocketAddress localAddress() {
+        return channel.localAddress();
+    }
+
+    @Override
+    public SocketAddress remoteAddress() {
+        return channel.remoteAddress();
+    }
+
+    @Override
+    public boolean isWritable() {
+        return channel.isWritable();
+    }
+
+    @Override
+    public boolean isActive() {
+        return channel.isActive();
+    }
+
+    @Override
+    public void close() {
+        channel.close();
+    }
+
+    @Override
+    public void reply(final RemotingCommand command) {
+        channel.writeAndFlush(command);
+    }
+
+    @Override
+    public void reply(final ChunkRegion fileRegion) {
+        channel.writeAndFlush(fileRegion);
+    }
+
+    public io.netty.channel.Channel getChannel() {
+        return channel;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        final NettyChannelImpl that = (NettyChannelImpl) o;
+
+        return channel != null ? channel.equals(that.channel) : that.channel == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        return channel != null ? channel.hashCode() : 0;
+    }
+
+    @Override
+    public String toString() {
+        return "NettyChannelImpl [channel=" + channel + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
new file mode 100644
index 0000000..44d4fd9
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.command;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Map.Entry;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.command.TrafficType;
+import org.apache.rocketmq.remoting.api.exception.RemoteCodecException;
+
+public class CodecHelper {
+    //ProtocolType + TotalLength + RequestId + SerializeType + TrafficType + CodeLength + RemarkLength + PropertiesSize + ParameterLength
+    public final static int MIN_PROTOCOL_LEN = 1 + 4 + 4 + 1 + 1 + 2 + 2 + 2 + 4;
+    public final static char PROPERTY_SEPARATOR = '\n';
+    public final static Charset REMOTING_CHARSET = Charset.forName("UTF-8");
+
+    public final static int CODE_MAX_LEN = 512;
+    public final static int PARAMETER_MAX_LEN = 33554432;
+    public final static int BODY_MAX_LEN = 33554432;
+    public final static int PACKET_MAX_LEN = 33554432;
+
+    public static ByteBuffer encodeHeader(final RemotingCommand command, final int parameterLength,
+        final int extraPayload) {
+        byte[] code = command.opCode().getBytes(REMOTING_CHARSET);
+        int codeLength = code.length;
+
+        byte[] remark = command.remark().getBytes(REMOTING_CHARSET);
+        int remarkLength = remark.length;
+
+        byte[][] props = null;
+        int propsLength = 0;
+        StringBuilder sb = new StringBuilder();
+        if (!command.properties().isEmpty()) {
+            props = new byte[command.properties().size()][];
+            int i = 0;
+            for (Entry<String, String> next : command.properties().entrySet()) {
+                sb.setLength(0);
+                sb.append(next.getKey());
+                sb.append(PROPERTY_SEPARATOR);
+                sb.append(next.getValue());
+
+                props[i] = sb.toString().getBytes(REMOTING_CHARSET);
+
+                propsLength += 2;
+                propsLength += props[i].length;
+                i++;
+            }
+        }
+
+        int totalLength = MIN_PROTOCOL_LEN - 1 - 4
+            + codeLength
+            + remarkLength
+            + propsLength
+            + parameterLength
+            + extraPayload;
+
+        int headerLength = 1 + 4 + totalLength - parameterLength - extraPayload;
+
+        ByteBuffer buf = ByteBuffer.allocate(headerLength);
+        buf.put(command.protocolType());
+        buf.putInt(totalLength);
+        buf.putInt(command.requestID());
+        buf.put(command.serializerType());
+        buf.put((byte) command.trafficType().ordinal());
+
+        buf.putShort((short) codeLength);
+        if (codeLength > 0) {
+            buf.put(code);
+        }
+        buf.putShort((short) remarkLength);
+        if (remarkLength > 0) {
+            buf.put(remark);
+        }
+        if (props != null) {
+            buf.putShort((short) props.length);
+            for (byte[] prop : props) {
+                buf.putShort((short) prop.length);
+                buf.put(prop);
+            }
+        } else {
+            buf.putShort((short) 0);
+        }
+
+        buf.putInt(parameterLength);
+
+        buf.flip();
+
+        return buf;
+    }
+
+    public static RemotingCommand decode(final ByteBuffer byteBuffer) {
+        RemotingCommandImpl cmd = new RemotingCommandImpl();
+        int totalLength = byteBuffer.limit();
+        cmd.requestID(byteBuffer.getInt());
+        cmd.serializerType(byteBuffer.get());
+        cmd.trafficType(TrafficType.parse(byteBuffer.get()));
+
+        {
+            short size = byteBuffer.getShort();
+            if (size > 0 && size <= CODE_MAX_LEN) {
+                byte[] bytes = new byte[size];
+                byteBuffer.get(bytes);
+                String str = new String(bytes, REMOTING_CHARSET);
+                cmd.opCode(str);
+            } else {
+                throw new RemoteCodecException(String.format("Code length: %d over max limit: %d", size, CODE_MAX_LEN));
+            }
+        }
+
+        {
+            short size = byteBuffer.getShort();
+            if (size > 0) {
+                byte[] bytes = new byte[size];
+                byteBuffer.get(bytes);
+                String str = new String(bytes, REMOTING_CHARSET);
+                cmd.remark(str);
+            }
+        }
+
+        {
+            short size = byteBuffer.getShort();
+            if (size > 0) {
+                for (int i = 0; i < size; i++) {
+                    short length = byteBuffer.getShort();
+                    if (length > 0) {
+                        byte[] bytes = new byte[length];
+                        byteBuffer.get(bytes);
+                        String str = new String(bytes, REMOTING_CHARSET);
+                        int index = str.indexOf(PROPERTY_SEPARATOR);
+                        if (index > 0) {
+                            String key = str.substring(0, index);
+                            String value = str.substring(index + 1);
+                            cmd.property(key, value);
+                        }
+                    }
+                }
+            }
+        }
+
+        {
+            int size = byteBuffer.getInt();
+            if (size > 0 && size <= PARAMETER_MAX_LEN) {
+                byte[] bytes = new byte[size];
+                byteBuffer.get(bytes);
+                cmd.parameterBytes(bytes);
+            } else if (size != 0) {
+                throw new RemoteCodecException(String.format("Parameter size: %d over max limit: %d", size, PARAMETER_MAX_LEN));
+            }
+        }
+
+        {
+            int size = totalLength - byteBuffer.position();
+            if (size > 0 && size <= BODY_MAX_LEN) {
+                byte[] bytes = new byte[size];
+                byteBuffer.get(bytes);
+                cmd.extraPayload(bytes);
+            } else if (size != 0) {
+                throw new RemoteCodecException(String.format("Body size: %d over max limit: %d", size, BODY_MAX_LEN));
+            }
+        }
+
+        return cmd;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
new file mode 100644
index 0000000..f5d2126
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandFactoryImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.command;
+
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory;
+import org.apache.rocketmq.remoting.api.command.TrafficType;
+import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta;
+
+public class RemotingCommandFactoryImpl implements RemotingCommandFactory {
+    private RemotingCommandFactoryMeta remotingCommandFactoryMeta;
+
+    public RemotingCommandFactoryImpl() {
+        this(new RemotingCommandFactoryMeta());
+    }
+
+    public RemotingCommandFactoryImpl(final RemotingCommandFactoryMeta remotingCommandFactoryMeta) {
+        this.remotingCommandFactoryMeta = remotingCommandFactoryMeta;
+    }
+
+    @Override
+    public RemotingCommand createRequest() {
+        RemotingCommand request = new RemotingCommandImpl();
+        request.protocolType(this.remotingCommandFactoryMeta.getProtocolType());
+        request.serializerType(this.remotingCommandFactoryMeta.getSerializeType());
+        return request;
+    }
+
+    @Override
+    public RemotingCommand createResponse(final RemotingCommand command) {
+        RemotingCommand response = new RemotingCommandImpl();
+        response.requestID(command.requestID());
+        response.protocolType(command.protocolType());
+        response.serializerType(command.serializerType());
+        response.trafficType(TrafficType.RESPONSE);
+        return response;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
new file mode 100644
index 0000000..bcf2338
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.command;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.command.TrafficType;
+import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
+import org.apache.rocketmq.remoting.common.TypePresentation;
+
+public class RemotingCommandImpl implements RemotingCommand {
+    public final static RequestIdGenerator REQUEST_ID_GENERATOR = RequestIdGenerator.inst;
+
+    private byte protocolType;
+    private byte serializeType;
+
+    private volatile int requestId = REQUEST_ID_GENERATOR.incrementAndGet();
+    private TrafficType trafficType = TrafficType.REQUEST_SYNC;
+    private String code = CommandFlag.SUCCESS.flag();
+    private String remark = "";
+    private Map<String, String> properties = new HashMap<String, String>();
+    private Object parameter;
+    private byte[] extraPayload;
+
+    private byte[] parameterByte;
+
+    protected RemotingCommandImpl() {
+    }
+
+    @Override
+    public byte protocolType() {
+        return this.protocolType;
+    }
+
+    @Override
+    public void protocolType(byte value) {
+        this.protocolType = value;
+    }
+
+    @Override
+    public int requestID() {
+        return requestId;
+    }
+
+    @Override
+    public void requestID(int value) {
+        this.requestId = value;
+    }
+
+    @Override
+    public byte serializerType() {
+        return this.serializeType;
+    }
+
+    @Override
+    public void serializerType(byte value) {
+        this.serializeType = value;
+    }
+
+    @Override
+    public TrafficType trafficType() {
+        return this.trafficType;
+    }
+
+    @Override
+    public void trafficType(TrafficType value) {
+        this.trafficType = value;
+    }
+
+    @Override
+    public String opCode() {
+        return this.code;
+    }
+
+    @Override
+    public void opCode(String value) {
+        this.code = value;
+    }
+
+    @Override
+    public String remark() {
+        return this.remark;
+    }
+
+    @Override
+    public void remark(String value) {
+        this.remark = value;
+    }
+
+    @Override
+    public Map<String, String> properties() {
+        return this.properties;
+    }
+
+    @Override
+    public void properties(Map<String, String> value) {
+        this.properties = value;
+    }
+
+    @Override
+    public String property(String key) {
+        return this.properties.get(key);
+    }
+
+    @Override
+    public void property(String key, String value) {
+        this.properties.put(key, value);
+    }
+
+    @Override
+    public Object parameter() {
+        return this.parameter;
+    }
+
+    @Override
+    public void parameter(Object value) {
+        this.parameter = value;
+    }
+
+    @Override
+    public byte[] parameterBytes() {
+        return this.getParameterByte();
+    }
+
+    public byte[] getParameterByte() {
+        return parameterByte;
+    }
+
+    public void setParameterByte(byte[] parameterByte) {
+        this.parameterByte = parameterByte;
+    }
+
+    @Override
+    public void parameterBytes(byte[] value) {
+        this.setParameterByte(value);
+    }
+
+    @Override
+    public byte[] extraPayload() {
+        return this.extraPayload;
+    }
+
+    @Override
+    public void extraPayload(byte[] value) {
+        this.extraPayload = value;
+    }
+
+    @Override
+    public <T> T parameter(SerializerFactory serializerFactory, Class<T> c) {
+        if (this.parameter() != null)
+            return (T) this.parameter();
+        final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), c);
+        this.parameter(decode);
+        return decode;
+    }
+
+    @Override
+    public <T> T parameter(SerializerFactory serializerFactory, TypePresentation<T> typePresentation) {
+        if (this.parameter() != null)
+            return (T) this.parameter();
+        final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), typePresentation);
+        this.parameter(decode);
+        return decode;
+    }
+
+    @Override
+    public <T> T parameter(SerializerFactory serializerFactory, Type type) {
+        if (this.parameter() != null)
+            return (T) this.parameter();
+        final T decode = serializerFactory.get(this.serializerType()).decode(this.parameterBytes(), type);
+        this.parameter(decode);
+        return decode;
+    }
+
+    @Override
+    public int hashCode() {
+        return HashCodeBuilder.reflectionHashCode(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return EqualsBuilder.reflectionEquals(this, o);
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RequestIdGenerator.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RequestIdGenerator.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RequestIdGenerator.java
new file mode 100644
index 0000000..9b85c95
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RequestIdGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.command;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RequestIdGenerator {
+    public static RequestIdGenerator inst = new RequestIdGenerator();
+
+    private AtomicInteger generator = new AtomicInteger(0);
+
+    private RequestIdGenerator() {
+
+    }
+
+    public int incrementAndGet() {
+        return generator.incrementAndGet();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java
new file mode 100644
index 0000000..ec9cece
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.channel.Channel;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class NettyChannelEvent {
+    private final Channel channel;
+    private final NettyChannelEventType type;
+    private final Throwable cause;
+
+    public NettyChannelEvent(NettyChannelEventType type, Channel channel) {
+        this(type, channel, null);
+    }
+
+    public NettyChannelEvent(NettyChannelEventType type, Channel channel, Throwable cause) {
+        this.type = type;
+        this.channel = channel;
+        this.cause = cause;
+    }
+
+    public NettyChannelEventType getType() {
+        return type;
+    }
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+    public Throwable getCause() {
+        return cause;
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
new file mode 100644
index 0000000..1bf2277
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEventType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+public enum NettyChannelEventType {
+    ACTIVE,
+    INACTIVE,
+    IDLE,
+    EXCEPTION
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
new file mode 100644
index 0000000..1af62cb
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingEndPoint;
+import org.apache.rocketmq.remoting.api.RemotingService;
+import org.apache.rocketmq.remoting.api.RequestProcessor;
+import org.apache.rocketmq.remoting.api.channel.ChannelEventListener;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory;
+import org.apache.rocketmq.remoting.api.command.TrafficType;
+import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
+import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext;
+import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
+import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
+import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
+import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
+import org.apache.rocketmq.remoting.api.protocol.ProtocolFactory;
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.remoting.api.serializable.SerializerFactory;
+import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup;
+import org.apache.rocketmq.remoting.common.Pair;
+import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta;
+import org.apache.rocketmq.remoting.common.ResponseResult;
+import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
+import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.external.ThreadUtils;
+import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
+import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
+import org.apache.rocketmq.remoting.impl.protocol.ProtocolFactoryImpl;
+import org.apache.rocketmq.remoting.impl.protocol.serializer.SerializerFactoryImpl;
+import org.apache.rocketmq.remoting.internal.UIDGenerator;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class NettyRemotingAbstract implements RemotingService {
+    protected static final Logger LOG = LoggerFactory.getLogger(NettyRemotingAbstract.class);
+    protected final ProtocolFactory protocolFactory = new ProtocolFactoryImpl();
+    protected final SerializerFactory serializerFactory = new SerializerFactoryImpl();
+    protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor");
+    private final Semaphore semaphoreOneway;
+    private final Semaphore semaphoreAsync;
+    private final Map<Integer, ResponseResult> ackTables = new ConcurrentHashMap<Integer, ResponseResult>(256);
+    private final Map<String, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<String, Pair<RequestProcessor, ExecutorService>>();
+    private final AtomicLong count = new AtomicLong(0);
+    private final RemotingCommandFactory remotingCommandFactory;
+    private final String remotingInstanceId = UIDGenerator.instance().createUID();
+
+    private final ExecutorService publicExecutor;
+    protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true);
+    private InterceptorGroup interceptorGroup = new InterceptorGroup();
+    private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup();
+
+    NettyRemotingAbstract(RemotingConfig clientConfig) {
+        this(clientConfig, new RemotingCommandFactoryMeta());
+    }
+
+    NettyRemotingAbstract(RemotingConfig clientConfig, RemotingCommandFactoryMeta remotingCommandFactoryMeta) {
+        this.semaphoreOneway = new Semaphore(clientConfig.getClientOnewayInvokeSemaphore(), true);
+        this.semaphoreAsync = new Semaphore(clientConfig.getClientAsyncInvokeSemaphore(), true);
+        this.publicExecutor = ThreadUtils.newThreadPoolExecutor(clientConfig.getClientAsyncCallbackExecutorThreads(), clientConfig.getClientAsyncCallbackExecutorThreads(), 60,
+            TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10000), "PublicExecutor", true);
+        this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta);
+    }
+
+    public SerializerFactory getSerializerFactory() {
+        return serializerFactory;
+    }
+
+    protected void putNettyEvent(final NettyChannelEvent event) {
+        this.channelEventExecutor.putNettyEvent(event);
+    }
+
+    protected void startUpHouseKeepingService() {
+        this.houseKeepingService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                scanResponseTable();
+            }
+        }, 3000, 1000, TimeUnit.MICROSECONDS);
+    }
+
+    @Override
+    public void start() {
+        if (this.channelEventListenerGroup.size() > 0) {
+            this.channelEventExecutor.start();
+        }
+    }
+
+    @Override
+    public void stop() {
+        ThreadUtils.shutdownGracefully(publicExecutor, 2000, TimeUnit.MILLISECONDS);
+        ThreadUtils.shutdownGracefully(channelEventExecutor);
+    }
+
+    protected void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand command) throws Exception {
+        if (command != null) {
+            switch (command.trafficType()) {
+                case REQUEST_ONEWAY:
+                case REQUEST_ASYNC:
+                case REQUEST_SYNC:
+                    processRequestCommand(ctx, command);
+                    break;
+                case RESPONSE:
+                    processResponseCommand(ctx, command);
+                    break;
+                default:
+                    LOG.warn("Not supported The traffic type {} !", command.trafficType());
+                    break;
+            }
+        }
+    }
+
+    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
+        Pair<RequestProcessor, ExecutorService> processorExecutorPair = this.processorTables.get(cmd.opCode());
+
+        RemotingChannel channel = new NettyChannelImpl(ctx.channel());
+
+        Runnable run = buildProcessorTask(ctx, cmd, processorExecutorPair, channel);
+
+        try {
+            processorExecutorPair.getRight().submit(run);
+        } catch (RejectedExecutionException e) {
+            if ((System.currentTimeMillis() % 10000) == 0) {
+                LOG.warn(String.format("Request %s from %s Rejected by server executor %s !", cmd,
+                    extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString()));
+            }
+
+            if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
+                interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE,
+                    extractRemoteAddress(ctx.channel()), cmd, e, "FLOW_CONTROL"));
+
+                RemotingCommand response = remotingCommandFactory.createResponse(cmd);
+                response.opCode(RemotingCommand.CommandFlag.ERROR.flag());
+                response.remark("SYSTEM_BUSY");
+                writeAndFlush(ctx.channel(), response);
+            }
+        }
+    }
+
+    @NotNull
+    private Runnable buildProcessorTask(final ChannelHandlerContext ctx, final RemotingCommand cmd,
+        final Pair<RequestProcessor, ExecutorService> processorExecutorPair, final RemotingChannel channel) {
+        return new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE,
+                        extractRemoteAddress(ctx.channel()), cmd));
+
+                    RemotingCommand response = processorExecutorPair.getLeft().processRequest(channel, cmd);
+
+                    interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE,
+                        extractRemoteAddress(ctx.channel()), cmd, response));
+
+                    handleResponse(response, cmd, ctx);
+                } catch (Throwable e) {
+                    LOG.error(String.format("Process request %s error !", cmd.toString()), e);
+
+                    handleException(e, cmd, ctx);
+                }
+            }
+        };
+    }
+
+    private void handleException(Throwable e, RemotingCommand cmd, ChannelHandlerContext ctx) {
+        if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
+            //FiXME Exception interceptor can not throw exception
+            interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, ""));
+            RemotingCommand response = remotingCommandFactory.createResponse(cmd);
+            response.opCode(RemotingCommand.CommandFlag.ERROR.flag());
+            response.remark(serializeException(cmd.serializerType(), e));
+            response.property("Exception", e.getClass().getName());
+            ctx.writeAndFlush(response);
+        }
+    }
+
+    private String serializeException(byte serializeType, Throwable exception) {
+        final Serializer serialization = getSerializerFactory().get(serializeType);
+        return serialization.encode(exception).toString();
+    }
+
+    private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) {
+        if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) {
+            if (response != null) {
+                try {
+                    writeAndFlush(ctx.channel(), response);
+                } catch (Throwable e) {
+                    LOG.error(String.format("Process request %s success, but transfer response %s failed !",
+                        cmd.toString(), response.toString()), e);
+                }
+            }
+        }
+
+    }
+
+    private void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
+        final ResponseResult responseResult = ackTables.get(cmd.requestID());
+        if (responseResult != null) {
+            responseResult.setResponseCommand(cmd);
+            responseResult.release();
+
+            long time = System.currentTimeMillis();
+            ackTables.remove(cmd.requestID());
+            if (count.incrementAndGet() % 5000 == 0)
+                LOG.warn("REQUEST ID:{}, cost time:{}, ackTables.size:{}", cmd.requestID(), time - responseResult.getBeginTimestamp(),
+                    ackTables.size());
+            if (responseResult.getAsyncHandler() != null) {
+                boolean sameThread = false;
+                ExecutorService executor = this.getCallbackExecutor();
+                if (executor != null) {
+                    try {
+                        executor.submit(new Runnable() {
+                            @Override
+                            public void run() {
+                                try {
+                                    responseResult.executeCallbackArrived(responseResult.getResponseCommand());
+                                } catch (Throwable e) {
+                                    LOG.warn("Execute callback error !", e);
+                                }
+                            }
+                        });
+                    } catch (RejectedExecutionException e) {
+                        sameThread = true;
+                        LOG.warn("Execute submit error !", e);
+                    }
+                } else {
+                    sameThread = true;
+                }
+
+                if (sameThread) {
+                    try {
+                        responseResult.executeCallbackArrived(responseResult.getResponseCommand());
+                    } catch (Throwable e) {
+                        LOG.warn("Execute callback in response thread error !", e);
+                    }
+                }
+            } else {
+                responseResult.putResponse(cmd);
+            }
+        } else {
+            LOG.warn("request {} from {} has not matched response !", cmd, extractRemoteAddress(ctx.channel()));
+        }
+    }
+
+    private void writeAndFlush(final Channel channel, final Object msg, final ChannelFutureListener listener) {
+        channel.writeAndFlush(msg).addListener(listener);
+    }
+
+    private void writeAndFlush(final Channel channel, final Object msg) {
+        channel.writeAndFlush(msg);
+    }
+
+    public ExecutorService getCallbackExecutor() {
+        return this.publicExecutor;
+    }
+
+    void scanResponseTable() {
+        /*
+        Iterator<Map.Entry<Integer, ResponseResult>> iterator = this.ackTables.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<Integer, ResponseResult> next = iterator.next();
+            ResponseResult result = next.getValue();
+
+            if ((result.getBeginTimestamp() + result.getTimeoutMillis()) <= System.currentTimeMillis()) {
+                iterator.remove();
+                try {
+                    long timeoutMillis = result.getTimeoutMillis();
+                    long costTimeMillis = System.currentTimeMillis() - result.getBeginTimestamp();
+                    result.onTimeout(timeoutMillis, costTimeMillis);
+                    LOG.error("scan response table command {} failed", result.getRequestId());
+                } catch (Throwable e) {
+                    LOG.warn("Error occurred when execute timeout callback !", e);
+                } finally {
+                    result.release();
+                    LOG.warn("Removed timeout request {} ", result);
+                }
+            }
+        }
+        */
+    }
+
+    public RemotingCommand invokeWithInterceptor(final Channel channel, final RemotingCommand request,
+        long timeoutMillis) {
+        request.trafficType(TrafficType.REQUEST_SYNC);
+
+        final String remoteAddr = extractRemoteAddress(channel);
+
+        //FIXME try catch here
+        this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
+
+        RemotingCommand responseCommand = this.invoke0(remoteAddr, channel, request, timeoutMillis);
+
+        this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST,
+            extractRemoteAddress(channel), request, responseCommand));
+
+        return responseCommand;
+    }
+
+    private RemotingCommand invoke0(final String remoteAddr, final Channel channel, final RemotingCommand request,
+        final long timeoutMillis) {
+        try {
+            final int opaque = request.requestID();
+            final ResponseResult responseResult = new ResponseResult(opaque, timeoutMillis);
+            responseResult.setRequestCommand(request);
+            //FIXME one interceptor for all case ?
+            responseResult.setInterceptorGroup(this.interceptorGroup);
+            responseResult.setRemoteAddr(remoteAddr);
+
+            this.ackTables.put(opaque, responseResult);
+
+            ChannelFutureListener listener = new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture f) throws Exception {
+                    if (f.isSuccess()) {
+                        responseResult.setSendRequestOK(true);
+                        return;
+                    } else {
+                        responseResult.setSendRequestOK(false);
+
+                        ackTables.remove(opaque);
+                        responseResult.setCause(f.cause());
+                        responseResult.putResponse(null);
+
+                        LOG.warn("Send request command to {} failed !", remoteAddr);
+                    }
+                }
+            };
+
+            this.writeAndFlush(channel, request, listener);
+
+            RemotingCommand responseCommand = responseResult.waitResponse(timeoutMillis);
+
+            if (null == responseCommand) {
+                if (responseResult.isSendRequestOK()) {
+                    throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseResult.getCause());
+                }
+                /*
+                else {
+                    throw new RemoteAccessException(extractRemoteAddress(channel), responseResult.getCause());
+                }*/
+            }
+
+            return responseCommand;
+        } finally {
+            this.ackTables.remove(request.requestID());
+        }
+    }
+
+    public void invokeAsyncWithInterceptor(final Channel channel, final RemotingCommand request,
+        final AsyncHandler invokeCallback, long timeoutMillis) {
+        request.trafficType(TrafficType.REQUEST_ASYNC);
+
+        final String remoteAddr = extractRemoteAddress(channel);
+
+        this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
+
+        Exception exception = null;
+
+        try {
+            this.invokeAsync0(remoteAddr, channel, request, timeoutMillis, invokeCallback);
+        } catch (InterruptedException e) {
+            exception = e;
+        } finally {
+            if (null != exception) {
+                try {
+                    this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION"));
+                } catch (Throwable e) {
+                    LOG.warn("onException ", e);
+                }
+            }
+        }
+    }
+
+    private void invokeAsync0(final String remoteAddr, final Channel channel, final RemotingCommand request,
+        final long timeoutMillis, final AsyncHandler invokeCallback) throws InterruptedException {
+        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+        if (acquired) {
+            final int requestID = request.requestID();
+
+            SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
+
+            final ResponseResult responseResult = new ResponseResult(request.requestID(), timeoutMillis, invokeCallback, once);
+            responseResult.setRequestCommand(request);
+            responseResult.setInterceptorGroup(this.interceptorGroup);
+            responseResult.setRemoteAddr(remoteAddr);
+
+            this.ackTables.put(request.requestID(), responseResult);
+            try {
+                ChannelFutureListener listener = new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture f) throws Exception {
+                        responseResult.setSendRequestOK(f.isSuccess());
+                        if (f.isSuccess()) {
+                            return;
+                        }
+
+                        responseResult.putResponse(null);
+                        ackTables.remove(requestID);
+                        try {
+                            responseResult.executeRequestSendFailed();
+                        } catch (Throwable e) {
+                            LOG.warn("Execute callback error !", e);
+                        } finally {
+                            responseResult.release();
+                        }
+
+                        LOG.warn("Send request command to channel  failed.", remoteAddr);
+                    }
+                };
+
+                this.writeAndFlush(channel, request, listener);
+            } catch (Exception e) {
+                responseResult.release();
+                LOG.error("Send request command to channel " + channel + " error !", e);
+            }
+        } else {
+            String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d",
+                timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits());
+            LOG.error(info);
+            throw new RemoteTimeoutException(info);
+        }
+    }
+
+    public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request, long timeoutMillis) {
+        request.trafficType(TrafficType.REQUEST_ONEWAY);
+
+        this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request));
+
+        Exception exception = null;
+
+        try {
+            this.invokeOneway0(channel, request, timeoutMillis);
+        } catch (InterruptedException e) {
+            exception = e;
+        } finally {
+            if (null != exception) {
+                try {
+                    this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION"));
+                } catch (Throwable e) {
+                    LOG.warn("onException ", e);
+                }
+            }
+        }
+    }
+
+    private void invokeOneway0(final Channel channel, final RemotingCommand request,
+        final long timeoutMillis) throws InterruptedException {
+        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+        if (acquired) {
+            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
+            try {
+                final SocketAddress socketAddress = channel.remoteAddress();
+
+                ChannelFutureListener listener = new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture f) throws Exception {
+                        once.release();
+                        if (!f.isSuccess()) {
+                            LOG.warn("Send request command to channel {} failed !", socketAddress);
+                        }
+                    }
+                };
+
+                this.writeAndFlush(channel, request, listener);
+            } catch (Exception e) {
+                once.release();
+                LOG.error("Send request command to channel " + channel + " error !", e);
+            }
+        } else {
+            String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d",
+                timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits());
+            LOG.error(info);
+            throw new RemoteTimeoutException(info);
+        }
+    }
+
+    public String getRemotingInstanceId() {
+        return remotingInstanceId;
+    }
+
+    @Override
+    public ProtocolFactory protocolFactory() {
+        return this.protocolFactory;
+    }
+
+    @Override
+    public SerializerFactory serializerFactory() {
+        return this.serializerFactory;
+    }
+
+    @Override
+    public RemotingCommandFactory commandFactory() {
+        return this.remotingCommandFactory;
+    }
+
+    @Override
+    public void registerRequestProcessor(String requestCode, RequestProcessor processor, ExecutorService executor) {
+        Pair<RequestProcessor, ExecutorService> pair = new Pair<RequestProcessor, ExecutorService>(processor, executor);
+        if (!this.processorTables.containsKey(requestCode)) {
+            this.processorTables.put(requestCode, pair);
+        }
+    }
+
+    @Override
+    public void registerRequestProcessor(String requestCode, RequestProcessor processor) {
+        this.registerRequestProcessor(requestCode, processor, publicExecutor);
+    }
+
+    @Override
+    public void unregisterRequestProcessor(String requestCode) {
+        this.processorTables.remove(requestCode);
+    }
+
+    @Override
+    public String remotingInstanceId() {
+        return this.getRemotingInstanceId();
+    }
+
+    @Override
+    public void registerInterceptor(Interceptor interceptor) {
+        this.interceptorGroup.registerInterceptor(interceptor);
+    }
+
+    @Override
+    public void registerChannelEventListener(ChannelEventListener listener) {
+        this.channelEventListenerGroup.registerChannelEventListener(listener);
+    }
+
+    @Override
+    public Pair<RequestProcessor, ExecutorService> processor(String requestCode) {
+        return processorTables.get(requestCode);
+    }
+
+    protected String extractRemoteAddress(Channel channel) {
+        return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress();
+    }
+
+    class ChannelEventExecutor extends Thread {
+        private final static int MAX_SIZE = 10000;
+        private final LinkedBlockingQueue<NettyChannelEvent> eventQueue = new LinkedBlockingQueue<NettyChannelEvent>();
+        private String name;
+
+        public ChannelEventExecutor(String nettyEventExector) {
+            super(nettyEventExector);
+            this.name = nettyEventExector;
+        }
+        //private final AtomicBoolean isStopped = new AtomicBoolean(true);
+
+        public void putNettyEvent(final NettyChannelEvent event) {
+            if (this.eventQueue.size() <= MAX_SIZE) {
+                this.eventQueue.add(event);
+            } else {
+                LOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
+            }
+        }
+
+        @Override
+        public void run() {
+            LOG.info(this.name + " service started");
+
+            ChannelEventListenerGroup listener = NettyRemotingAbstract.this.channelEventListenerGroup;
+
+            while (true) {
+                try {
+                    NettyChannelEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
+                    if (event != null && listener != null) {
+                        RemotingChannel channel = new NettyChannelImpl(event.getChannel());
+
+                        LOG.warn("Channel Event, {}", event);
+
+                        switch (event.getType()) {
+                            case IDLE:
+                                listener.onChannelIdle(channel);
+                                break;
+                            case INACTIVE:
+                                listener.onChannelClose(channel);
+                                break;
+                            case ACTIVE:
+                                listener.onChannelConnect(channel);
+                                break;
+                            case EXCEPTION:
+                                listener.onChannelException(channel);
+                                break;
+                            default:
+                                break;
+                        }
+                    }
+                } catch (Exception e) {
+                    LOG.error("error", e);
+                    break;
+                }
+            }
+        }
+
+    }
+
+    protected class EventDispatcher extends SimpleChannelInboundHandler<RemotingCommand> {
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+            processMessageReceived(ctx, msg);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
new file mode 100644
index 0000000..7481574
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.SupportedCipherSuiteFilter;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.net.ssl.SSLException;
+import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingClient;
+import org.apache.rocketmq.remoting.api.command.RemotingCommand;
+import org.apache.rocketmq.remoting.api.command.TrafficType;
+import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException;
+import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+import org.apache.rocketmq.remoting.common.RemotingCommandFactoryMeta;
+import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.external.ThreadUtils;
+import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
+import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
+import org.apache.rocketmq.remoting.impl.netty.handler.ExceptionHandler;
+import org.apache.rocketmq.remoting.impl.netty.handler.Http2Handler;
+import org.apache.rocketmq.remoting.internal.JvmUtils;
+
+public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
+    private static final long LOCK_TIMEOUT_MILLIS = 3000;
+    private final Bootstrap clientBootstrap = new Bootstrap();
+    private final EventLoopGroup ioGroup;
+    private final Class<? extends SocketChannel> socketChannelClass;
+
+    private final RemotingConfig clientConfig;
+
+    private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
+    private final Lock lockChannelTables = new ReentrantLock();
+    private EventExecutorGroup workerGroup;
+    private SslContext sslContext;
+
+    NettyRemotingClient(final RemotingConfig clientConfig) {
+        super(clientConfig, new RemotingCommandFactoryMeta(clientConfig.getProtocolName(), clientConfig.getSerializerName()));
+        this.clientConfig = clientConfig;
+
+        if (JvmUtils.isLinux() && this.clientConfig.isClientNativeEpollEnable()) {
+            this.ioGroup = new EpollEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
+                clientConfig.getClientWorkerThreads()));
+            socketChannelClass = EpollSocketChannel.class;
+        } else {
+            this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientNioIoThreads",
+                clientConfig.getClientWorkerThreads()));
+            socketChannelClass = NioSocketChannel.class;
+        }
+
+        this.workerGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
+            ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
+
+        if (Protocol.HTTP2.equals(clientConfig.getProtocolName())) {
+            buildSslContext();
+        }
+    }
+
+    private void applyOptions(Bootstrap bootstrap) {
+        if (null != clientConfig) {
+            if (clientConfig.getTcpSoLinger() > 0) {
+                bootstrap.option(ChannelOption.SO_LINGER, clientConfig.getTcpSoLinger());
+            }
+
+            if (clientConfig.getTcpSoSndBufSize() > 0) {
+                bootstrap.option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSoSndBufSize());
+            }
+            if (clientConfig.getTcpSoRcvBufSize() > 0) {
+                bootstrap.option(ChannelOption.SO_RCVBUF, clientConfig.getTcpSoRcvBufSize());
+            }
+
+            bootstrap.option(ChannelOption.SO_REUSEADDR, clientConfig.isTcpSoReuseAddress()).
+                option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()).
+                option(ChannelOption.TCP_NODELAY, clientConfig.isTcpSoNoDelay()).
+                option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeout()).
+                option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(clientConfig.getWriteBufLowWaterMark(),
+                    clientConfig.getWriteBufHighWaterMark()));
+        }
+    }
+
+    @Override
+    public void start() {
+        super.start();
+
+        this.clientBootstrap.group(this.ioGroup).channel(socketChannelClass)
+            .handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                public void initChannel(SocketChannel ch) throws Exception {
+                    if (Protocol.HTTP2.equals(clientConfig.getProtocolName())) {
+                        ch.pipeline().addFirst(sslContext.newHandler(ch.alloc()), Http2Handler.newHandler(false));
+                    }
+                    ch.pipeline().addLast(workerGroup, new Decoder(), new Encoder(), new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(),
+                            clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()),
+                        new ClientConnectionHandler(), new EventDispatcher(), new ExceptionHandler());
+                }
+            });
+
+        applyOptions(clientBootstrap);
+
+        startUpHouseKeepingService();
+    }
+
+    private void buildSslContext() {
+        SslProvider provider = OpenSsl.isAvailable() ? SslProvider.OPENSSL : SslProvider.JDK;
+        try {
+            sslContext = SslContextBuilder.forClient()
+                .sslProvider(provider)
+                    /* NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
+                     * Please refer to the HTTP/2 specification for cipher requirements. */
+                .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
+                .trustManager(InsecureTrustManagerFactory.INSTANCE)
+                .build();
+        } catch (SSLException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void stop() {
+        // try {
+        ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS);
+
+        for (ChannelWrapper cw : this.channelTables.values()) {
+            this.closeChannel(null, cw.getChannel());
+        }
+
+        this.channelTables.clear();
+
+        this.ioGroup.shutdownGracefully();
+
+        ThreadUtils.shutdownGracefully(channelEventExecutor);
+
+        this.workerGroup.shutdownGracefully();
+        /*
+        } catch (Exception e) {
+            LOG.error("RemotingClient stopped error !", e);
+        }
+        */
+
+        super.stop();
+    }
+
+    private void closeChannel(final String addr, final Channel channel) {
+        if (null == channel)
+            return;
+
+        final String addrRemote = null == addr ? extractRemoteAddress(channel) : addr;
+        try {
+            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    boolean removeItemFromTable = true;
+                    ChannelWrapper prevCW = this.channelTables.get(addrRemote);
+                    //Workaround for null
+                    if (null == prevCW) {
+                        return;
+                    }
+
+                    LOG.info("Begin to close the remote address {} channel {}", addrRemote, prevCW);
+
+                    if (prevCW.getChannel() != channel) {
+                        LOG.info("Channel {} has been closed,this is a new channel.", prevCW.getChannel(), channel);
+                        removeItemFromTable = false;
+                    }
+
+                    if (removeItemFromTable) {
+                        this.channelTables.remove(addrRemote);
+                        LOG.info("Channel {} has been removed !", addrRemote);
+                    }
+
+                    channel.close().addListener(new ChannelFutureListener() {
+                        @Override
+                        public void operationComplete(ChannelFuture future) throws Exception {
+                            LOG.warn("Close channel {} {}", channel, future.isSuccess());
+                        }
+                    });
+                } catch (Exception e) {
+                    LOG.error("Close channel error !", e);
+                } finally {
+                    this.lockChannelTables.unlock();
+                }
+            } else {
+                LOG.warn("Can not lock channel table in {} ms", LOCK_TIMEOUT_MILLIS);
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Close channel error !", e);
+        }
+    }
+
+    private Channel createIfAbsent(final String addr) {
+        ChannelWrapper cw = this.channelTables.get(addr);
+        if (cw != null && cw.isActive()) {
+            return cw.getChannel();
+        }
+        return this.createChannel(addr);
+    }
+
+    //FIXME need test to verify
+    private Channel createChannel(final String addr) {
+        ChannelWrapper cw = null;
+        try {
+            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    boolean createNewConnection;
+                    cw = this.channelTables.get(addr);
+                    if (cw != null) {
+                        if (cw.isActive()) {
+                            return cw.getChannel();
+                        } else if (!cw.getChannelFuture().isDone()) {
+                            createNewConnection = false;
+                        } else {
+                            this.channelTables.remove(addr);
+                            createNewConnection = true;
+                        }
+                    } else {
+                        createNewConnection = true;
+                    }
+
+                    if (createNewConnection) {
+                        String[] s = addr.split(":");
+                        SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1]));
+                        ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress);
+                        LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
+                        cw = new ChannelWrapper(channelFuture);
+                        this.channelTables.put(addr, cw);
+                    }
+                } catch (Exception e) {
+                    LOG.error("createChannel: create channel exception", e);
+                } finally {
+                    this.lockChannelTables.unlock();
+                }
+            } else {
+                LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        if (cw != null) {
+            ChannelFuture channelFuture = cw.getChannelFuture();
+            if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) {
+                if (cw.isActive()) {
+                    LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
+                    return cw.getChannel();
+                } else {
+                    LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause());
+                    this.closeChannel(addr, cw.getChannel());
+                }
+            } else {
+                LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(),
+                    channelFuture.toString());
+                this.closeChannel(addr, cw.getChannel());
+            }
+        }
+        return null;
+    }
+
+    private void closeChannel(final Channel channel) {
+        if (null == channel)
+            return;
+
+        try {
+            if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    boolean removeItemFromTable = true;
+                    ChannelWrapper prevCW = null;
+                    String addrRemote = null;
+
+                    for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) {
+                        ChannelWrapper prev = entry.getValue();
+                        if (prev.getChannel() != null) {
+                            if (prev.getChannel() == channel) {
+                                prevCW = prev;
+                                addrRemote = entry.getKey();
+                                break;
+                            }
+                        }
+                    }
+
+                    if (null == prevCW) {
+                        LOG.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+                        removeItemFromTable = false;
+                    }
+
+                    if (removeItemFromTable) {
+                        this.channelTables.remove(addrRemote);
+                        LOG.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+                        //RemotingHelper.closeChannel(channel);
+                    }
+                } catch (Exception e) {
+                    LOG.error("closeChannel: close the channel exception", e);
+                } finally {
+                    this.lockChannelTables.unlock();
+                }
+            } else {
+                LOG.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+            }
+        } catch (InterruptedException e) {
+            LOG.error("closeChannel exception", e);
+        }
+    }
+
+    @Override
+    public RemotingCommand invoke(final String address, final RemotingCommand request, final long timeoutMillis) {
+        request.trafficType(TrafficType.REQUEST_SYNC);
+
+        Channel channel = this.createIfAbsent(address);
+        if (channel != null && channel.isActive()) {
+            try {
+                return this.invokeWithInterceptor(channel, request, timeoutMillis);
+
+            } catch (RemoteTimeoutException e) {
+                if (this.clientConfig.isClientCloseSocketIfTimeout()) {
+                    LOG.warn("invoke: timeout, so close the socket {} ms, {}", timeoutMillis, address);
+                    this.closeChannel(address, channel);
+                }
+
+                LOG.warn("invoke: wait response timeout<{}ms> exception, so close the channel[{}]", timeoutMillis, address);
+                throw e;
+            } finally {
+                /*
+                if (this.clientConfig.isClientShortConnectionEnable()) {
+                    this.closeChannel(addr, channel);
+                }
+                */
+            }
+        } else {
+            this.closeChannel(address, channel);
+            throw new RemoteConnectFailureException(address);
+        }
+
+    }
+
+    @Override
+    public void invokeAsync(final String address, final RemotingCommand request, final AsyncHandler asyncHandler,
+        final long timeoutMillis) {
+
+        final Channel channel = this.createIfAbsent(address);
+        if (channel != null && channel.isActive()) {
+            // We support Netty's channel-level backpressure thereby respecting slow receivers on the other side.
+            if (!channel.isWritable()) {
+                // Note: It's up to the layer above a transport to decide whether or not to requeue a canceled write.
+                LOG.warn("Channel statistics - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", channel.bytesBeforeUnwritable(), channel.bytesBeforeWritable());
+            }
+            this.invokeAsyncWithInterceptor(channel, request, asyncHandler, timeoutMillis);
+        } else {
+            this.closeChannel(address, channel);
+        }
+    }
+
+    @Override
+    public void invokeOneWay(final String address, final RemotingCommand request, final long timeoutMillis) {
+        final Channel channel = this.createIfAbsent(address);
+        if (channel != null && channel.isActive()) {
+            if (!channel.isWritable()) {
+                //if (this.clientConfig.isSocketFlowControl()) {
+                LOG.warn("Channel statistics - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", channel.bytesBeforeUnwritable(), channel.bytesBeforeWritable());
+                //throw new ServiceInvocationFailureException(String.format("Channel[%s] is not writable now", channel.toString()));
+            }
+            this.invokeOnewayWithInterceptor(channel, request, timeoutMillis);
+        } else {
+            this.closeChannel(address, channel);
+        }
+    }
+
+    private class ChannelWrapper {
+        private final ChannelFuture channelFuture;
+
+        ChannelWrapper(ChannelFuture channelFuture) {
+            this.channelFuture = channelFuture;
+        }
+
+        boolean isActive() {
+            return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
+        }
+
+        boolean isWriteable() {
+            return this.channelFuture.channel().isWritable();
+        }
+
+        private Channel getChannel() {
+            return this.channelFuture.channel();
+        }
+
+        ChannelFuture getChannelFuture() {
+            return channelFuture;
+        }
+    }
+
+    private class ClientConnectionHandler extends ChannelDuplexHandler {
+
+        @Override
+        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+            LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(),
+                ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable());
+        }
+
+        @Override
+        public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
+            ChannelPromise promise)
+            throws Exception {
+            LOG.info("Connected from {} to {}.", localAddress, remoteAddress);
+            super.connect(ctx, remoteAddress, localAddress, promise);
+
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.ACTIVE, ctx.channel()));
+        }
+
+        @Override
+        public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+            LOG.info("Remote address {} disconnect channel {}.", ctx.channel().remoteAddress(), ctx.channel());
+
+            closeChannel(ctx.channel());
+
+            super.disconnect(ctx, promise);
+
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+        }
+
+        @Override
+        public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+            LOG.info("Remote address {} close channel {}.", ctx.channel().remoteAddress(), ctx.channel());
+
+            closeChannel(ctx.channel());
+
+            super.close(ctx, promise);
+
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.INACTIVE, ctx.channel()));
+        }
+
+        @Override
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+            if (evt instanceof IdleStateEvent) {
+                IdleStateEvent event = (IdleStateEvent) evt;
+                if (event.state().equals(IdleState.ALL_IDLE)) {
+                    LOG.info("Close channel {} because of idle event {} ", ctx.channel(), event);
+                    closeChannel(ctx.channel());
+                    putNettyEvent(new NettyChannelEvent(NettyChannelEventType.IDLE, ctx.channel()));
+                }
+            }
+
+            ctx.fireUserEventTriggered(evt);
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+            LOG.info("Close channel {} because of error {} ", ctx.channel(), cause);
+
+            closeChannel(ctx.channel());
+            putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel()));
+        }
+    }
+}