You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:30 UTC

[13/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyServerConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyServerConfig.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyServerConfig.java
new file mode 100644
index 0000000..7922206
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyServerConfig.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+/**
+
+ *
+ * @author shijia.wxr
+ *
+ */
+public class NettyServerConfig implements Cloneable {
+    private int listenPort = 8888;
+    private int serverWorkerThreads = 8;
+    private int serverCallbackExecutorThreads = 0;
+    private int serverSelectorThreads = 3;
+    private int serverOnewaySemaphoreValue = 256;
+    private int serverAsyncSemaphoreValue = 64;
+    private int serverChannelMaxIdleTimeSeconds = 120;
+
+    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
+    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
+    private boolean serverPooledByteBufAllocatorEnable = true;
+
+    /**
+     * make make install
+     *
+     *
+     * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
+     * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
+     */
+    private boolean useEpollNativeSelector = false;
+
+
+    public int getListenPort() {
+        return listenPort;
+    }
+
+
+    public void setListenPort(int listenPort) {
+        this.listenPort = listenPort;
+    }
+
+
+    public int getServerWorkerThreads() {
+        return serverWorkerThreads;
+    }
+
+
+    public void setServerWorkerThreads(int serverWorkerThreads) {
+        this.serverWorkerThreads = serverWorkerThreads;
+    }
+
+
+    public int getServerSelectorThreads() {
+        return serverSelectorThreads;
+    }
+
+
+    public void setServerSelectorThreads(int serverSelectorThreads) {
+        this.serverSelectorThreads = serverSelectorThreads;
+    }
+
+
+    public int getServerOnewaySemaphoreValue() {
+        return serverOnewaySemaphoreValue;
+    }
+
+
+    public void setServerOnewaySemaphoreValue(int serverOnewaySemaphoreValue) {
+        this.serverOnewaySemaphoreValue = serverOnewaySemaphoreValue;
+    }
+
+
+    public int getServerCallbackExecutorThreads() {
+        return serverCallbackExecutorThreads;
+    }
+
+
+    public void setServerCallbackExecutorThreads(int serverCallbackExecutorThreads) {
+        this.serverCallbackExecutorThreads = serverCallbackExecutorThreads;
+    }
+
+
+    public int getServerAsyncSemaphoreValue() {
+        return serverAsyncSemaphoreValue;
+    }
+
+
+    public void setServerAsyncSemaphoreValue(int serverAsyncSemaphoreValue) {
+        this.serverAsyncSemaphoreValue = serverAsyncSemaphoreValue;
+    }
+
+
+    public int getServerChannelMaxIdleTimeSeconds() {
+        return serverChannelMaxIdleTimeSeconds;
+    }
+
+
+    public void setServerChannelMaxIdleTimeSeconds(int serverChannelMaxIdleTimeSeconds) {
+        this.serverChannelMaxIdleTimeSeconds = serverChannelMaxIdleTimeSeconds;
+    }
+
+
+    public int getServerSocketSndBufSize() {
+        return serverSocketSndBufSize;
+    }
+
+
+    public void setServerSocketSndBufSize(int serverSocketSndBufSize) {
+        this.serverSocketSndBufSize = serverSocketSndBufSize;
+    }
+
+
+    public int getServerSocketRcvBufSize() {
+        return serverSocketRcvBufSize;
+    }
+
+
+    public void setServerSocketRcvBufSize(int serverSocketRcvBufSize) {
+        this.serverSocketRcvBufSize = serverSocketRcvBufSize;
+    }
+
+
+    public boolean isServerPooledByteBufAllocatorEnable() {
+        return serverPooledByteBufAllocatorEnable;
+    }
+
+
+    public void setServerPooledByteBufAllocatorEnable(boolean serverPooledByteBufAllocatorEnable) {
+        this.serverPooledByteBufAllocatorEnable = serverPooledByteBufAllocatorEnable;
+    }
+
+
+    public boolean isUseEpollNativeSelector() {
+        return useEpollNativeSelector;
+    }
+
+
+    public void setUseEpollNativeSelector(boolean useEpollNativeSelector) {
+        this.useEpollNativeSelector = useEpollNativeSelector;
+    }
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+        return (NettyServerConfig) super.clone();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettySystemConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettySystemConfig.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettySystemConfig.java
new file mode 100644
index 0000000..41589ce
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettySystemConfig.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting.netty;
+
+public class NettySystemConfig {
+    public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
+            "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable";
+    public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE = //
+            "com.rocketmq.remoting.socket.sndbuf.size";
+    public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE = //
+            "com.rocketmq.remoting.socket.rcvbuf.size";
+    public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE = //
+            "com.rocketmq.remoting.clientAsyncSemaphoreValue";
+    public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = //
+            "com.rocketmq.remoting.clientOnewaySemaphoreValue";
+    public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
+            Boolean
+                    .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
+    public static int socketSndbufSize = //
+            Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
+    public static int socketRcvbufSize = //
+            Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
+    public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
+            Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
+    public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = //
+            Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/RequestTask.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/RequestTask.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/RequestTask.java
new file mode 100644
index 0000000..e02ae48
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/RequestTask.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting.netty;
+
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.Channel;
+
+public class RequestTask implements Runnable {
+    private final Runnable runnable;
+    private final long createTimestamp = System.currentTimeMillis();
+    private final Channel channel;
+    private final RemotingCommand request;
+    private boolean stopRun = false;
+
+    public RequestTask(final Runnable runnable, final Channel channel, final RemotingCommand request) {
+        this.runnable = runnable;
+        this.channel = channel;
+        this.request = request;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = runnable != null ? runnable.hashCode() : 0;
+        result = 31 * result + (int) (getCreateTimestamp() ^ (getCreateTimestamp() >>> 32));
+        result = 31 * result + (channel != null ? channel.hashCode() : 0);
+        result = 31 * result + (request != null ? request.hashCode() : 0);
+        result = 31 * result + (isStopRun() ? 1 : 0);
+        return result;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (!(o instanceof RequestTask)) return false;
+
+        final RequestTask that = (RequestTask) o;
+
+        if (getCreateTimestamp() != that.getCreateTimestamp()) return false;
+        if (isStopRun() != that.isStopRun()) return false;
+        if (channel != null ? !channel.equals(that.channel) : that.channel != null) return false;
+        return request != null ? request.getOpaque() == that.request.getOpaque() : that.request == null;
+
+    }
+
+    public long getCreateTimestamp() {
+        return createTimestamp;
+    }
+
+    public boolean isStopRun() {
+        return stopRun;
+    }
+
+    public void setStopRun(final boolean stopRun) {
+        this.stopRun = stopRun;
+    }
+
+    @Override
+    public void run() {
+        if (!this.stopRun) this.runnable.run();
+    }
+
+    public void returnResponse(int code, String remark) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(code, remark);
+        response.setOpaque(request.getOpaque());
+        this.channel.writeAndFlush(response);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/ResponseFuture.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/ResponseFuture.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/ResponseFuture.java
new file mode 100644
index 0000000..b6185f1
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/ResponseFuture.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.InvokeCallback;
+import com.alibaba.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ResponseFuture {
+    private final int opaque;
+    private final long timeoutMillis;
+    private final InvokeCallback invokeCallback;
+    private final long beginTimestamp = System.currentTimeMillis();
+    private final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+    private final SemaphoreReleaseOnlyOnce once;
+
+    private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
+    private volatile RemotingCommand responseCommand;
+    private volatile boolean sendRequestOK = true;
+    private volatile Throwable cause;
+
+
+    public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
+                          SemaphoreReleaseOnlyOnce once) {
+        this.opaque = opaque;
+        this.timeoutMillis = timeoutMillis;
+        this.invokeCallback = invokeCallback;
+        this.once = once;
+    }
+
+
+    public void executeInvokeCallback() {
+        if (invokeCallback != null) {
+            if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
+                invokeCallback.operationComplete(this);
+            }
+        }
+    }
+
+
+    public void release() {
+        if (this.once != null) {
+            this.once.release();
+        }
+    }
+
+
+    public boolean isTimeout() {
+        long diff = System.currentTimeMillis() - this.beginTimestamp;
+        return diff > this.timeoutMillis;
+    }
+
+
+    public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
+        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+        return this.responseCommand;
+    }
+
+
+    public void putResponse(final RemotingCommand responseCommand) {
+        this.responseCommand = responseCommand;
+        this.countDownLatch.countDown();
+    }
+
+
+    public long getBeginTimestamp() {
+        return beginTimestamp;
+    }
+
+
+    public boolean isSendRequestOK() {
+        return sendRequestOK;
+    }
+
+
+    public void setSendRequestOK(boolean sendRequestOK) {
+        this.sendRequestOK = sendRequestOK;
+    }
+
+
+    public long getTimeoutMillis() {
+        return timeoutMillis;
+    }
+
+
+    public InvokeCallback getInvokeCallback() {
+        return invokeCallback;
+    }
+
+
+    public Throwable getCause() {
+        return cause;
+    }
+
+
+    public void setCause(Throwable cause) {
+        this.cause = cause;
+    }
+
+
+    public RemotingCommand getResponseCommand() {
+        return responseCommand;
+    }
+
+
+    public void setResponseCommand(RemotingCommand responseCommand) {
+        this.responseCommand = responseCommand;
+    }
+
+
+    public int getOpaque() {
+        return opaque;
+    }
+
+
+    @Override
+    public String toString() {
+        return "ResponseFuture [responseCommand=" + responseCommand + ", sendRequestOK=" + sendRequestOK
+                + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis
+                + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp
+                + ", countDownLatch=" + countDownLatch + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java
new file mode 100644
index 0000000..9f4adbe
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting.protocol;
+
+public enum LanguageCode {
+    JAVA((byte) 0),
+    CPP((byte) 1),
+    DOTNET((byte) 2),
+    PYTHON((byte) 3),
+    DELPHI((byte) 4),
+    ERLANG((byte) 5),
+    RUBY((byte) 6),
+    OTHER((byte) 7),
+    HTTP((byte) 8);
+
+    private byte code;
+
+    LanguageCode(byte code) {
+        this.code = code;
+    }
+
+    public static LanguageCode valueOf(byte code) {
+        for (LanguageCode languageCode : LanguageCode.values()) {
+            if (languageCode.getCode() == code) {
+                return languageCode;
+            }
+        }
+        return null;
+    }
+
+    public byte getCode() {
+        return code;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java
new file mode 100644
index 0000000..a09dd3b
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.protocol;
+
+import com.alibaba.fastjson.annotation.JSONField;
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingCommand {
+    public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
+    public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
+    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+    private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
+    private static final int RPC_ONEWAY = 1; // 0, RPC
+
+    private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
+            new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
+    private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
+    // 1, RESPONSE_COMMAND
+    private static final Map<Field, Annotation> NOT_NULL_ANNOTATION_CACHE = new HashMap<Field, Annotation>();
+    // 1, Oneway
+
+    private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
+    private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
+    private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
+    private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();
+    private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();
+    private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();
+    private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
+    private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
+    private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
+    public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
+    private static volatile int configVersion = -1;
+    private static AtomicInteger requestId = new AtomicInteger(0);
+
+    private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;
+
+    static {
+        final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));
+        if (!isBlank(protocol)) {
+            try {
+                serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);
+            } catch (IllegalArgumentException e) {
+                throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e);
+            }
+        }
+    }
+
+    /**
+
+     */
+    private int code;
+    private LanguageCode language = LanguageCode.JAVA;
+    private int version = 0;
+    private int opaque = requestId.getAndIncrement();
+    private int flag = 0;
+    private String remark;
+    private HashMap<String, String> extFields;
+    private transient CommandCustomHeader customHeader;
+    /**
+
+     */
+    private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
+    /**
+
+     */
+    private transient byte[] body;
+
+
+    protected RemotingCommand() {
+    }
+
+    public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
+        RemotingCommand cmd = new RemotingCommand();
+        cmd.setCode(code);
+        cmd.customHeader = customHeader;
+        setCmdVersion(cmd);
+        return cmd;
+    }
+
+    private static void setCmdVersion(RemotingCommand cmd) {
+        if (configVersion >= 0) {
+            cmd.setVersion(configVersion);
+        } else {
+            String v = System.getProperty(REMOTING_VERSION_KEY);
+            if (v != null) {
+                int value = Integer.parseInt(v);
+                cmd.setVersion(value);
+                configVersion = value;
+            }
+        }
+    }
+
+    public static RemotingCommand createResponseCommand(Class<? extends CommandCustomHeader> classHeader) {
+        RemotingCommand cmd = createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
+
+        return cmd;
+    }
+
+    /**
+
+     */
+    public static RemotingCommand createResponseCommand(int code, String remark, Class<? extends CommandCustomHeader> classHeader) {
+        RemotingCommand cmd = new RemotingCommand();
+        cmd.markResponseType();
+        cmd.setCode(code);
+        cmd.setRemark(remark);
+        setCmdVersion(cmd);
+
+        if (classHeader != null) {
+            try {
+                CommandCustomHeader objectHeader = classHeader.newInstance();
+                cmd.customHeader = objectHeader;
+            } catch (InstantiationException e) {
+                return null;
+            } catch (IllegalAccessException e) {
+                return null;
+            }
+        }
+
+        return cmd;
+    }
+
+    public void markResponseType() {
+        int bits = 1 << RPC_TYPE;
+        this.flag |= bits;
+    }
+
+    public static RemotingCommand createResponseCommand(int code, String remark) {
+        return createResponseCommand(code, remark, null);
+    }
+
+    public static RemotingCommand decode(final byte[] array) {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(array);
+        return decode(byteBuffer);
+    }
+
+    public static RemotingCommand decode(final ByteBuffer byteBuffer) {
+        int length = byteBuffer.limit();
+        int oriHeaderLen = byteBuffer.getInt();
+        int headerLength = getHeaderLength(oriHeaderLen);
+
+        byte[] headerData = new byte[headerLength];
+        byteBuffer.get(headerData);
+
+        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
+
+        int bodyLength = length - 4 - headerLength;
+        byte[] bodyData = null;
+        if (bodyLength > 0) {
+            bodyData = new byte[bodyLength];
+            byteBuffer.get(bodyData);
+        }
+        cmd.body = bodyData;
+
+        return cmd;
+    }
+
+    public static int getHeaderLength(int length) {
+        return length & 0xFFFFFF;
+    }
+
+    private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
+        switch (type) {
+            case JSON:
+                RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
+                resultJson.setSerializeTypeCurrentRPC(type);
+                return resultJson;
+            case ROCKETMQ:
+                RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
+                resultRMQ.setSerializeTypeCurrentRPC(type);
+                return resultRMQ;
+            default:
+                break;
+        }
+
+        return null;
+    }
+
+    public static SerializeType getProtocolType(int source) {
+        return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
+    }
+
+    public static int createNewRequestId() {
+        return requestId.incrementAndGet();
+    }
+
+    public static SerializeType getSerializeTypeConfigInThisServer() {
+        return serializeTypeConfigInThisServer;
+    }
+
+    private static boolean isBlank(String str) {
+        int strLen;
+        if (str == null || (strLen = str.length()) == 0) {
+            return true;
+        }
+        for (int i = 0; i < strLen; i++) {
+            if (!Character.isWhitespace(str.charAt(i))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public CommandCustomHeader readCustomHeader() {
+        return customHeader;
+    }
+
+    public void writeCustomHeader(CommandCustomHeader customHeader) {
+        this.customHeader = customHeader;
+    }
+
+    public CommandCustomHeader decodeCommandCustomHeader(Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
+        CommandCustomHeader objectHeader;
+        try {
+            objectHeader = classHeader.newInstance();
+        } catch (InstantiationException e) {
+            return null;
+        } catch (IllegalAccessException e) {
+            return null;
+        }
+
+        if (this.extFields != null) {
+
+            Field[] fields = getClazzFields(classHeader);
+            for (Field field : fields) {
+                if (!Modifier.isStatic(field.getModifiers())) {
+                    String fieldName = field.getName();
+                    if (!fieldName.startsWith("this")) {
+                        try {
+                            String value = this.extFields.get(fieldName);
+                            if (null == value) {
+                                Annotation annotation = getNotNullAnnotation(field);
+                                if (annotation != null) {
+                                    throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
+                                }
+
+                                continue;
+                            }
+
+                            field.setAccessible(true);
+                            String type = getCanonicalName(field.getType());
+                            Object valueParsed;
+
+                            if (type.equals(STRING_CANONICAL_NAME)) {
+                                valueParsed = value;
+                            } else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) {
+                                valueParsed = Integer.parseInt(value);
+                            } else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) {
+                                valueParsed = Long.parseLong(value);
+                            } else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) {
+                                valueParsed = Boolean.parseBoolean(value);
+                            } else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
+                                valueParsed = Double.parseDouble(value);
+                            } else {
+                                throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
+                            }
+
+                            field.set(objectHeader, valueParsed);
+
+                        } catch (Throwable e) {
+                        }
+                    }
+                }
+            }
+
+            objectHeader.checkFields();
+        }
+
+        return objectHeader;
+    }
+
+    private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
+        Field[] field = CLASS_HASH_MAP.get(classHeader);
+
+        if (field == null) {
+            field = classHeader.getDeclaredFields();
+            synchronized (CLASS_HASH_MAP) {
+                CLASS_HASH_MAP.put(classHeader, field);
+            }
+        }
+        return field;
+    }
+
+    private Annotation getNotNullAnnotation(Field field) {
+        Annotation annotation = NOT_NULL_ANNOTATION_CACHE.get(field);
+
+        if (annotation == null) {
+            annotation = field.getAnnotation(CFNotNull.class);
+            synchronized (NOT_NULL_ANNOTATION_CACHE) {
+                NOT_NULL_ANNOTATION_CACHE.put(field, annotation);
+            }
+        }
+        return annotation;
+    }
+
+    private String getCanonicalName(Class clazz) {
+        String name = CANONICAL_NAME_CACHE.get(clazz);
+
+        if (name == null) {
+            name = clazz.getCanonicalName();
+            synchronized (CANONICAL_NAME_CACHE) {
+                CANONICAL_NAME_CACHE.put(clazz, name);
+            }
+        }
+        return name;
+    }
+
+    public ByteBuffer encode() {
+        // 1> header length size
+        int length = 4;
+
+        // 2> header data length
+        byte[] headerData = this.headerEncode();
+        length += headerData.length;
+
+        // 3> body data length
+        if (this.body != null) {
+            length += body.length;
+        }
+
+        ByteBuffer result = ByteBuffer.allocate(4 + length);
+
+        // length
+        result.putInt(length);
+
+        // header length
+        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
+
+        // header data
+        result.put(headerData);
+
+        // body data;
+        if (this.body != null) {
+            result.put(this.body);
+        }
+
+        result.flip();
+
+        return result;
+    }
+
+    private byte[] headerEncode() {
+        this.makeCustomHeaderToNet();
+        if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
+            return RocketMQSerializable.rocketMQProtocolEncode(this);
+        } else {
+            return RemotingSerializable.encode(this);
+        }
+    }
+
+    public static byte[] markProtocolType(int source, SerializeType type) {
+        byte[] result = new byte[4];
+
+        result[0] = type.getCode();
+        result[1] = (byte) ((source >> 16) & 0xFF);
+        result[2] = (byte) ((source >> 8) & 0xFF);
+        result[3] = (byte) (source & 0xFF);
+        return result;
+    }
+
+    public void makeCustomHeaderToNet() {
+        if (this.customHeader != null) {
+            Field[] fields = getClazzFields(customHeader.getClass());
+            if (null == this.extFields) {
+                this.extFields = new HashMap<String, String>();
+            }
+
+            for (Field field : fields) {
+                if (!Modifier.isStatic(field.getModifiers())) {
+                    String name = field.getName();
+                    if (!name.startsWith("this")) {
+                        Object value = null;
+                        try {
+                            field.setAccessible(true);
+                            value = field.get(this.customHeader);
+                        } catch (IllegalArgumentException e) {
+                        } catch (IllegalAccessException e) {
+                        }
+
+                        if (value != null) {
+                            this.extFields.put(name, value.toString());
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public ByteBuffer encodeHeader() {
+        return encodeHeader(this.body != null ? this.body.length : 0);
+    }
+
+    /**
+
+     */
+    public ByteBuffer encodeHeader(final int bodyLength) {
+        // 1> header length size
+        int length = 4;
+
+        // 2> header data length
+        byte[] headerData;
+        headerData = this.headerEncode();
+
+        length += headerData.length;
+
+        // 3> body data length
+        length += bodyLength;
+
+        ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
+
+        // length
+        result.putInt(length);
+
+        // header length
+        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
+
+        // header data
+        result.put(headerData);
+
+        result.flip();
+
+        return result;
+    }
+
+    public void markOnewayRPC() {
+        int bits = 1 << RPC_ONEWAY;
+        this.flag |= bits;
+    }
+
+    @JSONField(serialize = false)
+    public boolean isOnewayRPC() {
+        int bits = 1 << RPC_ONEWAY;
+        return (this.flag & bits) == bits;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    @JSONField(serialize = false)
+    public RemotingCommandType getType() {
+        if (this.isResponseType()) {
+            return RemotingCommandType.RESPONSE_COMMAND;
+        }
+
+        return RemotingCommandType.REQUEST_COMMAND;
+    }
+
+    @JSONField(serialize = false)
+    public boolean isResponseType() {
+        int bits = 1 << RPC_TYPE;
+        return (this.flag & bits) == bits;
+    }
+
+    public LanguageCode getLanguage() {
+        return language;
+    }
+
+    public void setLanguage(LanguageCode language) {
+        this.language = language;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    public int getOpaque() {
+        return opaque;
+    }
+
+    public void setOpaque(int opaque) {
+        this.opaque = opaque;
+    }
+
+    public int getFlag() {
+        return flag;
+    }
+
+    public void setFlag(int flag) {
+        this.flag = flag;
+    }
+
+    public String getRemark() {
+        return remark;
+    }
+
+    public void setRemark(String remark) {
+        this.remark = remark;
+    }
+
+    public byte[] getBody() {
+        return body;
+    }
+
+    public void setBody(byte[] body) {
+        this.body = body;
+    }
+
+    public HashMap<String, String> getExtFields() {
+        return extFields;
+    }
+
+    public void setExtFields(HashMap<String, String> extFields) {
+        this.extFields = extFields;
+    }
+
+    public void addExtField(String key, String value) {
+        if (null == extFields) {
+            extFields = new HashMap<String, String>();
+        }
+        extFields.put(key, value);
+    }
+
+    @Override
+    public String toString() {
+        return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
+                + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
+                + serializeTypeCurrentRPC + "]";
+    }
+
+
+    public SerializeType getSerializeTypeCurrentRPC() {
+        return serializeTypeCurrentRPC;
+    }
+
+
+    public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) {
+        this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommandType.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommandType.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommandType.java
new file mode 100644
index 0000000..c0ac288
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommandType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.protocol;
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public enum RemotingCommandType {
+    REQUEST_COMMAND,
+    RESPONSE_COMMAND;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSerializable.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSerializable.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSerializable.java
new file mode 100644
index 0000000..c144ea6
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSerializable.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.protocol;
+
+import com.alibaba.fastjson.JSON;
+
+import java.nio.charset.Charset;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public abstract class RemotingSerializable {
+    public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+
+    public static byte[] encode(final Object obj) {
+        final String json = toJson(obj, false);
+        if (json != null) {
+            return json.getBytes(CHARSET_UTF8);
+        }
+        return null;
+    }
+
+    public static String toJson(final Object obj, boolean prettyFormat) {
+        return JSON.toJSONString(obj, prettyFormat);
+    }
+
+    public static <T> T decode(final byte[] data, Class<T> classOfT) {
+        final String json = new String(data, CHARSET_UTF8);
+        return fromJson(json, classOfT);
+    }
+
+    public static <T> T fromJson(String json, Class<T> classOfT) {
+        return JSON.parseObject(json, classOfT);
+    }
+
+    public byte[] encode() {
+        final String json = this.toJson();
+        if (json != null) {
+            return json.getBytes(CHARSET_UTF8);
+        }
+        return null;
+    }
+
+    public String toJson() {
+        return toJson(false);
+    }
+
+    public String toJson(final boolean prettyFormat) {
+        return toJson(this, prettyFormat);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSysResponseCode.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSysResponseCode.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSysResponseCode.java
new file mode 100644
index 0000000..085c750
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSysResponseCode.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting.protocol;
+
+public class RemotingSysResponseCode {
+
+    public static final int SUCCESS = 0;
+
+    public static final int SYSTEM_ERROR = 1;
+
+    public static final int SYSTEM_BUSY = 2;
+
+    public static final int REQUEST_CODE_NOT_SUPPORTED = 3;
+
+    public static final int TRANSACTION_FAILED = 4;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RocketMQSerializable.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RocketMQSerializable.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RocketMQSerializable.java
new file mode 100644
index 0000000..6956aab
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RocketMQSerializable.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.protocol;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+
+/**
+ * @author manhong.yqd
+ *
+ */
+public class RocketMQSerializable {
+    public static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+
+    public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
+        // String remark
+        byte[] remarkBytes = null;
+        int remarkLen = 0;
+        if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
+            remarkBytes = cmd.getRemark().getBytes(RemotingSerializable.CHARSET_UTF8);
+            remarkLen = remarkBytes.length;
+        }
+
+        // HashMap<String, String> extFields
+        byte[] extFieldsBytes = null;
+        int extLen = 0;
+        if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
+            extFieldsBytes = mapSerialize(cmd.getExtFields());
+            extLen = extFieldsBytes.length;
+        }
+
+        // ################### cal total length
+        int totalLen = calTotalLen(remarkLen, extLen);
+
+        // ################### content
+        ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
+        // int code(~32767)
+        headerBuffer.putShort((short) cmd.getCode());
+        // LanguageCode language
+        headerBuffer.put(cmd.getLanguage().getCode());
+        // int version(~32767)
+        headerBuffer.putShort((short) cmd.getVersion());
+        // int opaque
+        headerBuffer.putInt(cmd.getOpaque());
+        // int flag
+        headerBuffer.putInt(cmd.getFlag());
+        // String remark
+        if (remarkBytes != null) {
+            headerBuffer.putInt(remarkBytes.length);
+            headerBuffer.put(remarkBytes);
+        } else {
+            headerBuffer.putInt(0);
+        }
+        // HashMap<String, String> extFields;
+        if (extFieldsBytes != null) {
+            headerBuffer.putInt(extFieldsBytes.length);
+            headerBuffer.put(extFieldsBytes);
+        } else {
+            headerBuffer.putInt(0);
+        }
+
+        return headerBuffer.array();
+    }
+
+    public static byte[] mapSerialize(HashMap<String, String> map) {
+        // keySize+key+valSize+val
+        // keySize+key+valSize+val
+        if (null == map || map.isEmpty())
+            return null;
+
+        int totalLength = 0;
+        int kvLength;
+        Iterator<Map.Entry<String, String>> it = map.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, String> entry = it.next();
+            if (entry.getKey() != null && entry.getValue() != null) {
+                kvLength =
+                        // keySize + Key
+                        2 + entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8).length
+                                // valSize + val
+                                + 4 + entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8).length;
+                totalLength += kvLength;
+            }
+        }
+
+        ByteBuffer content = ByteBuffer.allocate(totalLength);
+        byte[] key;
+        byte[] val;
+        it = map.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, String> entry = it.next();
+            if (entry.getKey() != null && entry.getValue() != null) {
+                key = entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8);
+                val = entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8);
+
+                content.putShort((short) key.length);
+                content.put(key);
+
+                content.putInt(val.length);
+                content.put(val);
+            }
+        }
+
+        return content.array();
+    }
+
+    private static int calTotalLen(int remark, int ext) {
+        // int code(~32767)
+        int length = 2
+                // LanguageCode language
+                + 1
+                // int version(~32767)
+                + 2
+                // int opaque
+                + 4
+                // int flag
+                + 4
+                // String remark
+                + 4 + remark
+                // HashMap<String, String> extFields
+                + 4 + ext;
+
+        return length;
+    }
+
+    public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
+        RemotingCommand cmd = new RemotingCommand();
+        ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
+        // int code(~32767)
+        cmd.setCode(headerBuffer.getShort());
+        // LanguageCode language
+        cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
+        // int version(~32767)
+        cmd.setVersion(headerBuffer.getShort());
+        // int opaque
+        cmd.setOpaque(headerBuffer.getInt());
+        // int flag
+        cmd.setFlag(headerBuffer.getInt());
+        // String remark
+        int remarkLength = headerBuffer.getInt();
+        if (remarkLength > 0) {
+            byte[] remarkContent = new byte[remarkLength];
+            headerBuffer.get(remarkContent);
+            cmd.setRemark(new String(remarkContent, RemotingSerializable.CHARSET_UTF8));
+        }
+
+        // HashMap<String, String> extFields
+        int extFieldsLength = headerBuffer.getInt();
+        if (extFieldsLength > 0) {
+            byte[] extFieldsBytes = new byte[extFieldsLength];
+            headerBuffer.get(extFieldsBytes);
+            cmd.setExtFields(mapDeserialize(extFieldsBytes));
+        }
+        return cmd;
+    }
+
+    public static HashMap<String, String> mapDeserialize(byte[] bytes) {
+        if (bytes == null || bytes.length <= 0)
+            return null;
+
+        HashMap<String, String> map = new HashMap<String, String>();
+        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+
+        short keySize = 0;
+        byte[] keyContent = null;
+        int valSize = 0;
+        byte[] valContent = null;
+        while (byteBuffer.hasRemaining()) {
+            keySize = byteBuffer.getShort();
+            keyContent = new byte[keySize];
+            byteBuffer.get(keyContent);
+
+            valSize = byteBuffer.getInt();
+            valContent = new byte[valSize];
+            byteBuffer.get(valContent);
+
+            map.put(new String(keyContent, RemotingSerializable.CHARSET_UTF8), new String(valContent,
+                    RemotingSerializable.CHARSET_UTF8));
+        }
+        return map;
+    }
+
+
+    public static boolean isBlank(String str) {
+        int strLen;
+        if (str == null || (strLen = str.length()) == 0) {
+            return true;
+        }
+        for (int i = 0; i < strLen; i++) {
+            if (!Character.isWhitespace(str.charAt(i))) {
+                return false;
+            }
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/SerializeType.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/SerializeType.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/SerializeType.java
new file mode 100644
index 0000000..b46d8d8
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/SerializeType.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting.protocol;
+
+public enum SerializeType {
+    JSON((byte) 0),
+    ROCKETMQ((byte) 1);
+
+    private byte code;
+
+    SerializeType(byte code) {
+        this.code = code;
+    }
+
+    public static SerializeType valueOf(byte code) {
+        for (SerializeType serializeType : SerializeType.values()) {
+            if (serializeType.getCode() == code) {
+                return serializeType;
+            }
+        }
+        return null;
+    }
+
+    public byte getCode() {
+        return code;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.sevialize.txt
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.sevialize.txt b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.sevialize.txt
new file mode 100644
index 0000000..976cdf2
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.sevialize.txt
@@ -0,0 +1,6 @@
+//
+// Remoting protocol V0.1 draft
+//
+// protocol <length> <header length> <header data> <body data>
+//            1        2               3             4
+//
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.txt
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.txt b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.txt
new file mode 100644
index 0000000..7444cbb
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.txt
@@ -0,0 +1,6 @@
+//
+// Remoting protocol V0.1 draft
+//
+// protocol <length> <header length> <header data> <body data>
+//            1        2               3             4
+//

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/ExceptionTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/ExceptionTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/ExceptionTest.java
new file mode 100644
index 0000000..6baa013
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/ExceptionTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: ExceptionTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.netty.*;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+import org.junit.Test;
+
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ExceptionTest {
+    private static RemotingServer createRemotingServer() throws InterruptedException {
+        NettyServerConfig config = new NettyServerConfig();
+        RemotingServer client = new NettyRemotingServer(config);
+        client.registerProcessor(0, new NettyRequestProcessor() {
+            private int i = 0;
+
+
+            @Override
+            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+                System.out.println("processRequest=" + request + " " + (i++));
+                request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress());
+                return request;
+            }
+
+            @Override
+            public boolean rejectRequest() {
+                return false;
+            }
+        }, Executors.newCachedThreadPool());
+        client.start();
+        return client;
+    }
+
+    @Test
+    public void test_CONNECT_EXCEPTION() {
+        RemotingClient client = createRemotingClient();
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+        RemotingCommand response = null;
+        try {
+            response = client.invokeSync("localhost:8888", request, 1000 * 3);
+        } catch (RemotingConnectException e) {
+            e.printStackTrace();
+        } catch (RemotingSendRequestException e) {
+            e.printStackTrace();
+        } catch (RemotingTimeoutException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        System.out.println("invoke result = " + response);
+        assertTrue(null == response);
+
+        client.shutdown();
+        System.out.println("-----------------------------------------------------------------");
+    }
+
+    private static RemotingClient createRemotingClient() {
+        NettyClientConfig config = new NettyClientConfig();
+        RemotingClient client = new NettyRemotingClient(config);
+        client.start();
+        return client;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/MixTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/MixTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/MixTest.java
new file mode 100644
index 0000000..97d1663
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/MixTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: MixTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.remoting;
+
+import org.junit.Test;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MixTest {
+    @Test
+    public void test_extFieldsValue() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
new file mode 100644
index 0000000..e4ff948
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Test;
+
+
+/**
+
+ *
+ * @author shijia.wxr
+ *
+ */
+public class NettyConnectionTest {
+    @Test
+    public void test_connect_timeout() throws InterruptedException, RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException {
+        RemotingClient client = createRemotingClient();
+
+        for (int i = 0; i < 100; i++) {
+            try {
+                RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+                RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        client.shutdown();
+        System.out.println("-----------------------------------------------------------------");
+    }
+
+    public static RemotingClient createRemotingClient() {
+        NettyClientConfig config = new NettyClientConfig();
+        config.setClientChannelMaxIdleTimeSeconds(15);
+        RemotingClient client = new NettyRemotingClient(config);
+        client.start();
+        return client;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyIdleTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyIdleTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyIdleTest.java
new file mode 100644
index 0000000..23145f4
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyIdleTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.netty.*;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class NettyIdleTest {
+    // @Test
+    public void test_idle_event() throws InterruptedException, RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException {
+        RemotingServer server = createRemotingServer();
+        RemotingClient client = createRemotingClient();
+
+        for (int i = 0; i < 10; i++) {
+            RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+            RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3);
+            System.out.println(i + " invoke result = " + response);
+            assertTrue(response != null);
+
+            Thread.sleep(1000 * 10);
+        }
+
+        Thread.sleep(1000 * 60);
+
+        client.shutdown();
+        server.shutdown();
+        System.out.println("-----------------------------------------------------------------");
+    }
+
+    public static RemotingServer createRemotingServer() throws InterruptedException {
+        NettyServerConfig config = new NettyServerConfig();
+        config.setServerChannelMaxIdleTimeSeconds(30);
+        RemotingServer remotingServer = new NettyRemotingServer(config);
+        remotingServer.registerProcessor(0, new NettyRequestProcessor() {
+            private int i = 0;
+
+
+            @Override
+            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+                System.out.println("processRequest=" + request + " " + (i++));
+                request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress());
+                return request;
+            }
+
+            @Override
+            public boolean rejectRequest() {
+                return false;
+            }
+        }, Executors.newCachedThreadPool());
+        remotingServer.start();
+        return remotingServer;
+    }
+
+    public static RemotingClient createRemotingClient() {
+        NettyClientConfig config = new NettyClientConfig();
+        config.setClientChannelMaxIdleTimeSeconds(15);
+        RemotingClient client = new NettyRemotingClient(config);
+        client.start();
+        return client;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyRPCTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyRPCTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyRPCTest.java
new file mode 100644
index 0000000..7d4f68e
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyRPCTest.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: NettyRPCTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.annotation.CFNullable;
+import com.alibaba.rocketmq.remoting.exception.*;
+import com.alibaba.rocketmq.remoting.netty.*;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+import org.junit.Test;
+
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class NettyRPCTest {
+    @Test
+    public void test_RPC_Sync() throws InterruptedException, RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException {
+        RemotingServer server = createRemotingServer();
+        RemotingClient client = createRemotingClient();
+
+        for (int i = 0; i < 100; i++) {
+            TestRequestHeader requestHeader = new TestRequestHeader();
+            requestHeader.setCount(i);
+            requestHeader.setMessageTitle("HelloMessageTitle");
+            RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
+            RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3000);
+            System.out.println("invoke result = " + response);
+            assertTrue(response != null);
+        }
+
+        client.shutdown();
+        server.shutdown();
+        System.out.println("-----------------------------------------------------------------");
+    }
+
+    public static RemotingServer createRemotingServer() throws InterruptedException {
+        NettyServerConfig config = new NettyServerConfig();
+        RemotingServer remotingServer = new NettyRemotingServer(config);
+        remotingServer.registerProcessor(0, new NettyRequestProcessor() {
+            private int i = 0;
+
+
+            @Override
+            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+                System.out.println("processRequest=" + request + " " + (i++));
+                request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress());
+                return request;
+            }
+
+            @Override
+            public boolean rejectRequest() {
+                return false;
+            }
+        }, Executors.newCachedThreadPool());
+        remotingServer.start();
+        return remotingServer;
+    }
+
+    public static RemotingClient createRemotingClient() {
+        NettyClientConfig config = new NettyClientConfig();
+        RemotingClient client = new NettyRemotingClient(config);
+        client.start();
+        return client;
+    }
+
+    @Test
+    public void test_RPC_Oneway() throws InterruptedException, RemotingConnectException,
+            RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
+        RemotingServer server = createRemotingServer();
+        RemotingClient client = createRemotingClient();
+
+        for (int i = 0; i < 100; i++) {
+            RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+            request.setRemark(String.valueOf(i));
+            client.invokeOneway("localhost:8888", request, 1000 * 3);
+        }
+
+        client.shutdown();
+        server.shutdown();
+        System.out.println("-----------------------------------------------------------------");
+    }
+
+
+    @Test
+    public void test_RPC_Async() throws InterruptedException, RemotingConnectException,
+            RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
+        RemotingServer server = createRemotingServer();
+        RemotingClient client = createRemotingClient();
+
+        for (int i = 0; i < 100; i++) {
+            RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+            request.setRemark(String.valueOf(i));
+            client.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
+                @Override
+                public void operationComplete(ResponseFuture responseFuture) {
+                    System.out.println(responseFuture.getResponseCommand());
+                }
+            });
+        }
+
+        Thread.sleep(1000 * 3);
+
+        client.shutdown();
+        server.shutdown();
+        System.out.println("-----------------------------------------------------------------");
+    }
+
+
+    @Test
+    public void test_server_call_client() throws InterruptedException, RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException {
+        final RemotingServer server = createRemotingServer();
+        final RemotingClient client = createRemotingClient();
+
+        server.registerProcessor(0, new NettyRequestProcessor() {
+            @Override
+            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+                try {
+                    return server.invokeSync(ctx.channel(), request, 1000 * 10);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } catch (RemotingSendRequestException e) {
+                    e.printStackTrace();
+                } catch (RemotingTimeoutException e) {
+                    e.printStackTrace();
+                }
+
+                return null;
+            }
+
+            @Override
+            public boolean rejectRequest() {
+                return false;
+            }
+        }, Executors.newCachedThreadPool());
+
+        client.registerProcessor(0, new NettyRequestProcessor() {
+            @Override
+            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+                System.out.println("client receive server request = " + request);
+                request.setRemark("client remark");
+                return request;
+            }
+
+            @Override
+            public boolean rejectRequest() {
+                return false;
+            }
+        }, Executors.newCachedThreadPool());
+
+        for (int i = 0; i < 3; i++) {
+            RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+            RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3);
+            System.out.println("invoke result = " + response);
+            assertTrue(response != null);
+        }
+
+        client.shutdown();
+        server.shutdown();
+        System.out.println("-----------------------------------------------------------------");
+    }
+
+}
+
+
+class TestRequestHeader implements CommandCustomHeader {
+    @CFNullable
+    private Integer count;
+
+    @CFNullable
+    private String messageTitle;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public Integer getCount() {
+        return count;
+    }
+
+
+    public void setCount(Integer count) {
+        this.count = count;
+    }
+
+
+    public String getMessageTitle() {
+        return messageTitle;
+    }
+
+
+    public void setMessageTitle(String messageTitle) {
+        this.messageTitle = messageTitle;
+    }
+}
+
+
+class TestResponseHeader implements CommandCustomHeader {
+    @CFNullable
+    private Integer count;
+
+    @CFNullable
+    private String messageTitle;
+
+    public Integer getCount() {
+        return count;
+    }
+
+    public void setCount(Integer count) {
+        this.count = count;
+    }
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public String getMessageTitle() {
+        return messageTitle;
+    }
+
+    public void setMessageTitle(String messageTitle) {
+        this.messageTitle = messageTitle;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/SyncInvokeTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/SyncInvokeTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/SyncInvokeTest.java
new file mode 100644
index 0000000..fc3e708
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/SyncInvokeTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: SyncInvokeTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SyncInvokeTest {
+    @Test
+    public void test_RPC_Sync() throws Exception {
+        RemotingServer server = NettyRPCTest.createRemotingServer();
+        RemotingClient client = NettyRPCTest.createRemotingClient();
+
+        for (int i = 0; i < 100; i++) {
+            try {
+                RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+                RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3);
+                System.out.println(i + "\t" + "invoke result = " + response);
+                assertTrue(response != null);
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw e;
+            }
+        }
+
+        client.shutdown();
+        server.shutdown();
+        System.out.println("-----------------------------------------------------------------");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/subclass/TestSubClassAuto.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/subclass/TestSubClassAuto.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/subclass/TestSubClassAuto.java
new file mode 100644
index 0000000..3d18d23
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/subclass/TestSubClassAuto.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ *
+ */
+package com.alibaba.rocketmq.subclass;
+
+import org.junit.Test;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TestSubClassAuto {
+    @Test
+    public void test_sub() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-srvutil/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-srvutil/pom.xml b/rocketmq-srvutil/pom.xml
new file mode 100644
index 0000000..98b4e29
--- /dev/null
+++ b/rocketmq-srvutil/pom.xml
@@ -0,0 +1,51 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain producerGroup copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>com.alibaba.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-srvutil</artifactId>
+    <name>rocketmq-srvutil ${project.version}</name>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-common</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-srvutil/src/main/java/com/alibaba/rocketmq/srvutil/ServerUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-srvutil/src/main/java/com/alibaba/rocketmq/srvutil/ServerUtil.java b/rocketmq-srvutil/src/main/java/com/alibaba/rocketmq/srvutil/ServerUtil.java
new file mode 100644
index 0000000..72fa9a4
--- /dev/null
+++ b/rocketmq-srvutil/src/main/java/com/alibaba/rocketmq/srvutil/ServerUtil.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.srvutil;
+
+import org.apache.commons.cli.*;
+
+import java.util.Properties;
+
+public class ServerUtil {
+
+    public static Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("h", "help", false, "Print help");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt =
+                new Option("n", "namesrvAddr", true,
+                        "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+
+    public static CommandLine parseCmdLine(final String appName, String[] args, Options options,
+                                           CommandLineParser parser) {
+        HelpFormatter hf = new HelpFormatter();
+        hf.setWidth(110);
+        CommandLine commandLine = null;
+        try {
+            commandLine = parser.parse(options, args);
+            if (commandLine.hasOption('h')) {
+                hf.printHelp(appName, options, true);
+                return null;
+            }
+        } catch (ParseException e) {
+            hf.printHelp(appName, options, true);
+        }
+
+        return commandLine;
+    }
+
+
+    public static void printCommandLineHelp(final String appName, final Options options) {
+        HelpFormatter hf = new HelpFormatter();
+        hf.setWidth(110);
+        hf.printHelp(appName, options, true);
+    }
+
+
+    public static Properties commandLine2Properties(final CommandLine commandLine) {
+        Properties properties = new Properties();
+        Option[] opts = commandLine.getOptions();
+
+        if (opts != null) {
+            for (Option opt : opts) {
+                String name = opt.getLongOpt();
+                String value = commandLine.getOptionValue(name);
+                if (value != null) {
+                    properties.setProperty(name, value);
+                }
+            }
+        }
+
+        return properties;
+    }
+
+}