You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:30 UTC
[13/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyServerConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyServerConfig.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyServerConfig.java
new file mode 100644
index 0000000..7922206
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyServerConfig.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+/**
+
+ *
+ * @author shijia.wxr
+ *
+ */
+public class NettyServerConfig implements Cloneable {
+ private int listenPort = 8888;
+ private int serverWorkerThreads = 8;
+ private int serverCallbackExecutorThreads = 0;
+ private int serverSelectorThreads = 3;
+ private int serverOnewaySemaphoreValue = 256;
+ private int serverAsyncSemaphoreValue = 64;
+ private int serverChannelMaxIdleTimeSeconds = 120;
+
+ private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
+ private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
+ private boolean serverPooledByteBufAllocatorEnable = true;
+
+ /**
+ * make make install
+ *
+ *
+ * ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
+ * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
+ */
+ private boolean useEpollNativeSelector = false;
+
+
+ public int getListenPort() {
+ return listenPort;
+ }
+
+
+ public void setListenPort(int listenPort) {
+ this.listenPort = listenPort;
+ }
+
+
+ public int getServerWorkerThreads() {
+ return serverWorkerThreads;
+ }
+
+
+ public void setServerWorkerThreads(int serverWorkerThreads) {
+ this.serverWorkerThreads = serverWorkerThreads;
+ }
+
+
+ public int getServerSelectorThreads() {
+ return serverSelectorThreads;
+ }
+
+
+ public void setServerSelectorThreads(int serverSelectorThreads) {
+ this.serverSelectorThreads = serverSelectorThreads;
+ }
+
+
+ public int getServerOnewaySemaphoreValue() {
+ return serverOnewaySemaphoreValue;
+ }
+
+
+ public void setServerOnewaySemaphoreValue(int serverOnewaySemaphoreValue) {
+ this.serverOnewaySemaphoreValue = serverOnewaySemaphoreValue;
+ }
+
+
+ public int getServerCallbackExecutorThreads() {
+ return serverCallbackExecutorThreads;
+ }
+
+
+ public void setServerCallbackExecutorThreads(int serverCallbackExecutorThreads) {
+ this.serverCallbackExecutorThreads = serverCallbackExecutorThreads;
+ }
+
+
+ public int getServerAsyncSemaphoreValue() {
+ return serverAsyncSemaphoreValue;
+ }
+
+
+ public void setServerAsyncSemaphoreValue(int serverAsyncSemaphoreValue) {
+ this.serverAsyncSemaphoreValue = serverAsyncSemaphoreValue;
+ }
+
+
+ public int getServerChannelMaxIdleTimeSeconds() {
+ return serverChannelMaxIdleTimeSeconds;
+ }
+
+
+ public void setServerChannelMaxIdleTimeSeconds(int serverChannelMaxIdleTimeSeconds) {
+ this.serverChannelMaxIdleTimeSeconds = serverChannelMaxIdleTimeSeconds;
+ }
+
+
+ public int getServerSocketSndBufSize() {
+ return serverSocketSndBufSize;
+ }
+
+
+ public void setServerSocketSndBufSize(int serverSocketSndBufSize) {
+ this.serverSocketSndBufSize = serverSocketSndBufSize;
+ }
+
+
+ public int getServerSocketRcvBufSize() {
+ return serverSocketRcvBufSize;
+ }
+
+
+ public void setServerSocketRcvBufSize(int serverSocketRcvBufSize) {
+ this.serverSocketRcvBufSize = serverSocketRcvBufSize;
+ }
+
+
+ public boolean isServerPooledByteBufAllocatorEnable() {
+ return serverPooledByteBufAllocatorEnable;
+ }
+
+
+ public void setServerPooledByteBufAllocatorEnable(boolean serverPooledByteBufAllocatorEnable) {
+ this.serverPooledByteBufAllocatorEnable = serverPooledByteBufAllocatorEnable;
+ }
+
+
+ public boolean isUseEpollNativeSelector() {
+ return useEpollNativeSelector;
+ }
+
+
+ public void setUseEpollNativeSelector(boolean useEpollNativeSelector) {
+ this.useEpollNativeSelector = useEpollNativeSelector;
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ return (NettyServerConfig) super.clone();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettySystemConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettySystemConfig.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettySystemConfig.java
new file mode 100644
index 0000000..41589ce
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettySystemConfig.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting.netty;
+
+public class NettySystemConfig {
+ public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
+ "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable";
+ public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE = //
+ "com.rocketmq.remoting.socket.sndbuf.size";
+ public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE = //
+ "com.rocketmq.remoting.socket.rcvbuf.size";
+ public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE = //
+ "com.rocketmq.remoting.clientAsyncSemaphoreValue";
+ public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = //
+ "com.rocketmq.remoting.clientOnewaySemaphoreValue";
+ public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
+ Boolean
+ .parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
+ public static int socketSndbufSize = //
+ Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
+ public static int socketRcvbufSize = //
+ Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
+ public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
+ Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
+ public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = //
+ Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/RequestTask.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/RequestTask.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/RequestTask.java
new file mode 100644
index 0000000..e02ae48
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/RequestTask.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting.netty;
+
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.Channel;
+
+public class RequestTask implements Runnable {
+ private final Runnable runnable;
+ private final long createTimestamp = System.currentTimeMillis();
+ private final Channel channel;
+ private final RemotingCommand request;
+ private boolean stopRun = false;
+
+ public RequestTask(final Runnable runnable, final Channel channel, final RemotingCommand request) {
+ this.runnable = runnable;
+ this.channel = channel;
+ this.request = request;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = runnable != null ? runnable.hashCode() : 0;
+ result = 31 * result + (int) (getCreateTimestamp() ^ (getCreateTimestamp() >>> 32));
+ result = 31 * result + (channel != null ? channel.hashCode() : 0);
+ result = 31 * result + (request != null ? request.hashCode() : 0);
+ result = 31 * result + (isStopRun() ? 1 : 0);
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (!(o instanceof RequestTask)) return false;
+
+ final RequestTask that = (RequestTask) o;
+
+ if (getCreateTimestamp() != that.getCreateTimestamp()) return false;
+ if (isStopRun() != that.isStopRun()) return false;
+ if (channel != null ? !channel.equals(that.channel) : that.channel != null) return false;
+ return request != null ? request.getOpaque() == that.request.getOpaque() : that.request == null;
+
+ }
+
+ public long getCreateTimestamp() {
+ return createTimestamp;
+ }
+
+ public boolean isStopRun() {
+ return stopRun;
+ }
+
+ public void setStopRun(final boolean stopRun) {
+ this.stopRun = stopRun;
+ }
+
+ @Override
+ public void run() {
+ if (!this.stopRun) this.runnable.run();
+ }
+
+ public void returnResponse(int code, String remark) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(code, remark);
+ response.setOpaque(request.getOpaque());
+ this.channel.writeAndFlush(response);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/ResponseFuture.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/ResponseFuture.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/ResponseFuture.java
new file mode 100644
index 0000000..b6185f1
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/ResponseFuture.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.InvokeCallback;
+import com.alibaba.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ResponseFuture {
+ private final int opaque;
+ private final long timeoutMillis;
+ private final InvokeCallback invokeCallback;
+ private final long beginTimestamp = System.currentTimeMillis();
+ private final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ private final SemaphoreReleaseOnlyOnce once;
+
+ private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
+ private volatile RemotingCommand responseCommand;
+ private volatile boolean sendRequestOK = true;
+ private volatile Throwable cause;
+
+
+ public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
+ SemaphoreReleaseOnlyOnce once) {
+ this.opaque = opaque;
+ this.timeoutMillis = timeoutMillis;
+ this.invokeCallback = invokeCallback;
+ this.once = once;
+ }
+
+
+ public void executeInvokeCallback() {
+ if (invokeCallback != null) {
+ if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
+ invokeCallback.operationComplete(this);
+ }
+ }
+ }
+
+
+ public void release() {
+ if (this.once != null) {
+ this.once.release();
+ }
+ }
+
+
+ public boolean isTimeout() {
+ long diff = System.currentTimeMillis() - this.beginTimestamp;
+ return diff > this.timeoutMillis;
+ }
+
+
+ public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
+ this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
+ return this.responseCommand;
+ }
+
+
+ public void putResponse(final RemotingCommand responseCommand) {
+ this.responseCommand = responseCommand;
+ this.countDownLatch.countDown();
+ }
+
+
+ public long getBeginTimestamp() {
+ return beginTimestamp;
+ }
+
+
+ public boolean isSendRequestOK() {
+ return sendRequestOK;
+ }
+
+
+ public void setSendRequestOK(boolean sendRequestOK) {
+ this.sendRequestOK = sendRequestOK;
+ }
+
+
+ public long getTimeoutMillis() {
+ return timeoutMillis;
+ }
+
+
+ public InvokeCallback getInvokeCallback() {
+ return invokeCallback;
+ }
+
+
+ public Throwable getCause() {
+ return cause;
+ }
+
+
+ public void setCause(Throwable cause) {
+ this.cause = cause;
+ }
+
+
+ public RemotingCommand getResponseCommand() {
+ return responseCommand;
+ }
+
+
+ public void setResponseCommand(RemotingCommand responseCommand) {
+ this.responseCommand = responseCommand;
+ }
+
+
+ public int getOpaque() {
+ return opaque;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ResponseFuture [responseCommand=" + responseCommand + ", sendRequestOK=" + sendRequestOK
+ + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + timeoutMillis
+ + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + beginTimestamp
+ + ", countDownLatch=" + countDownLatch + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java
new file mode 100644
index 0000000..9f4adbe
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/LanguageCode.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting.protocol;
+
+public enum LanguageCode {
+ JAVA((byte) 0),
+ CPP((byte) 1),
+ DOTNET((byte) 2),
+ PYTHON((byte) 3),
+ DELPHI((byte) 4),
+ ERLANG((byte) 5),
+ RUBY((byte) 6),
+ OTHER((byte) 7),
+ HTTP((byte) 8);
+
+ private byte code;
+
+ LanguageCode(byte code) {
+ this.code = code;
+ }
+
+ public static LanguageCode valueOf(byte code) {
+ for (LanguageCode languageCode : LanguageCode.values()) {
+ if (languageCode.getCode() == code) {
+ return languageCode;
+ }
+ }
+ return null;
+ }
+
+ public byte getCode() {
+ return code;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java
new file mode 100644
index 0000000..a09dd3b
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommand.java
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.protocol;
+
+import com.alibaba.fastjson.annotation.JSONField;
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingCommand {
+ public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
+ public static final String SERIALIZE_TYPE_ENV = "ROCKETMQ_SERIALIZE_TYPE";
+ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND
+ private static final int RPC_ONEWAY = 1; // 0, RPC
+
+ private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
+ new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
+ private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
+ // 1, RESPONSE_COMMAND
+ private static final Map<Field, Annotation> NOT_NULL_ANNOTATION_CACHE = new HashMap<Field, Annotation>();
+ // 1, Oneway
+
+ private static final String STRING_CANONICAL_NAME = String.class.getCanonicalName();
+ private static final String DOUBLE_CANONICAL_NAME_1 = Double.class.getCanonicalName();
+ private static final String DOUBLE_CANONICAL_NAME_2 = double.class.getCanonicalName();
+ private static final String INTEGER_CANONICAL_NAME_1 = Integer.class.getCanonicalName();
+ private static final String INTEGER_CANONICAL_NAME_2 = int.class.getCanonicalName();
+ private static final String LONG_CANONICAL_NAME_1 = Long.class.getCanonicalName();
+ private static final String LONG_CANONICAL_NAME_2 = long.class.getCanonicalName();
+ private static final String BOOLEAN_CANONICAL_NAME_1 = Boolean.class.getCanonicalName();
+ private static final String BOOLEAN_CANONICAL_NAME_2 = boolean.class.getCanonicalName();
+ public static final String REMOTING_VERSION_KEY = "rocketmq.remoting.version";
+ private static volatile int configVersion = -1;
+ private static AtomicInteger requestId = new AtomicInteger(0);
+
+ private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;
+
+ static {
+ final String protocol = System.getProperty(SERIALIZE_TYPE_PROPERTY, System.getenv(SERIALIZE_TYPE_ENV));
+ if (!isBlank(protocol)) {
+ try {
+ serializeTypeConfigInThisServer = SerializeType.valueOf(protocol);
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException("parser specified protocol error. protocol=" + protocol, e);
+ }
+ }
+ }
+
+ /**
+
+ */
+ private int code;
+ private LanguageCode language = LanguageCode.JAVA;
+ private int version = 0;
+ private int opaque = requestId.getAndIncrement();
+ private int flag = 0;
+ private String remark;
+ private HashMap<String, String> extFields;
+ private transient CommandCustomHeader customHeader;
+ /**
+
+ */
+ private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
+ /**
+
+ */
+ private transient byte[] body;
+
+
+ protected RemotingCommand() {
+ }
+
+ public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
+ RemotingCommand cmd = new RemotingCommand();
+ cmd.setCode(code);
+ cmd.customHeader = customHeader;
+ setCmdVersion(cmd);
+ return cmd;
+ }
+
+ private static void setCmdVersion(RemotingCommand cmd) {
+ if (configVersion >= 0) {
+ cmd.setVersion(configVersion);
+ } else {
+ String v = System.getProperty(REMOTING_VERSION_KEY);
+ if (v != null) {
+ int value = Integer.parseInt(v);
+ cmd.setVersion(value);
+ configVersion = value;
+ }
+ }
+ }
+
+ public static RemotingCommand createResponseCommand(Class<? extends CommandCustomHeader> classHeader) {
+ RemotingCommand cmd = createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
+
+ return cmd;
+ }
+
+ /**
+
+ */
+ public static RemotingCommand createResponseCommand(int code, String remark, Class<? extends CommandCustomHeader> classHeader) {
+ RemotingCommand cmd = new RemotingCommand();
+ cmd.markResponseType();
+ cmd.setCode(code);
+ cmd.setRemark(remark);
+ setCmdVersion(cmd);
+
+ if (classHeader != null) {
+ try {
+ CommandCustomHeader objectHeader = classHeader.newInstance();
+ cmd.customHeader = objectHeader;
+ } catch (InstantiationException e) {
+ return null;
+ } catch (IllegalAccessException e) {
+ return null;
+ }
+ }
+
+ return cmd;
+ }
+
+ public void markResponseType() {
+ int bits = 1 << RPC_TYPE;
+ this.flag |= bits;
+ }
+
+ public static RemotingCommand createResponseCommand(int code, String remark) {
+ return createResponseCommand(code, remark, null);
+ }
+
+ public static RemotingCommand decode(final byte[] array) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(array);
+ return decode(byteBuffer);
+ }
+
+ public static RemotingCommand decode(final ByteBuffer byteBuffer) {
+ int length = byteBuffer.limit();
+ int oriHeaderLen = byteBuffer.getInt();
+ int headerLength = getHeaderLength(oriHeaderLen);
+
+ byte[] headerData = new byte[headerLength];
+ byteBuffer.get(headerData);
+
+ RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
+
+ int bodyLength = length - 4 - headerLength;
+ byte[] bodyData = null;
+ if (bodyLength > 0) {
+ bodyData = new byte[bodyLength];
+ byteBuffer.get(bodyData);
+ }
+ cmd.body = bodyData;
+
+ return cmd;
+ }
+
+ public static int getHeaderLength(int length) {
+ return length & 0xFFFFFF;
+ }
+
+ private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
+ switch (type) {
+ case JSON:
+ RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
+ resultJson.setSerializeTypeCurrentRPC(type);
+ return resultJson;
+ case ROCKETMQ:
+ RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
+ resultRMQ.setSerializeTypeCurrentRPC(type);
+ return resultRMQ;
+ default:
+ break;
+ }
+
+ return null;
+ }
+
+ public static SerializeType getProtocolType(int source) {
+ return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
+ }
+
+ public static int createNewRequestId() {
+ return requestId.incrementAndGet();
+ }
+
+ public static SerializeType getSerializeTypeConfigInThisServer() {
+ return serializeTypeConfigInThisServer;
+ }
+
+ private static boolean isBlank(String str) {
+ int strLen;
+ if (str == null || (strLen = str.length()) == 0) {
+ return true;
+ }
+ for (int i = 0; i < strLen; i++) {
+ if (!Character.isWhitespace(str.charAt(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public CommandCustomHeader readCustomHeader() {
+ return customHeader;
+ }
+
+ public void writeCustomHeader(CommandCustomHeader customHeader) {
+ this.customHeader = customHeader;
+ }
+
+ public CommandCustomHeader decodeCommandCustomHeader(Class<? extends CommandCustomHeader> classHeader) throws RemotingCommandException {
+ CommandCustomHeader objectHeader;
+ try {
+ objectHeader = classHeader.newInstance();
+ } catch (InstantiationException e) {
+ return null;
+ } catch (IllegalAccessException e) {
+ return null;
+ }
+
+ if (this.extFields != null) {
+
+ Field[] fields = getClazzFields(classHeader);
+ for (Field field : fields) {
+ if (!Modifier.isStatic(field.getModifiers())) {
+ String fieldName = field.getName();
+ if (!fieldName.startsWith("this")) {
+ try {
+ String value = this.extFields.get(fieldName);
+ if (null == value) {
+ Annotation annotation = getNotNullAnnotation(field);
+ if (annotation != null) {
+ throw new RemotingCommandException("the custom field <" + fieldName + "> is null");
+ }
+
+ continue;
+ }
+
+ field.setAccessible(true);
+ String type = getCanonicalName(field.getType());
+ Object valueParsed;
+
+ if (type.equals(STRING_CANONICAL_NAME)) {
+ valueParsed = value;
+ } else if (type.equals(INTEGER_CANONICAL_NAME_1) || type.equals(INTEGER_CANONICAL_NAME_2)) {
+ valueParsed = Integer.parseInt(value);
+ } else if (type.equals(LONG_CANONICAL_NAME_1) || type.equals(LONG_CANONICAL_NAME_2)) {
+ valueParsed = Long.parseLong(value);
+ } else if (type.equals(BOOLEAN_CANONICAL_NAME_1) || type.equals(BOOLEAN_CANONICAL_NAME_2)) {
+ valueParsed = Boolean.parseBoolean(value);
+ } else if (type.equals(DOUBLE_CANONICAL_NAME_1) || type.equals(DOUBLE_CANONICAL_NAME_2)) {
+ valueParsed = Double.parseDouble(value);
+ } else {
+ throw new RemotingCommandException("the custom field <" + fieldName + "> type is not supported");
+ }
+
+ field.set(objectHeader, valueParsed);
+
+ } catch (Throwable e) {
+ }
+ }
+ }
+ }
+
+ objectHeader.checkFields();
+ }
+
+ return objectHeader;
+ }
+
+ private Field[] getClazzFields(Class<? extends CommandCustomHeader> classHeader) {
+ Field[] field = CLASS_HASH_MAP.get(classHeader);
+
+ if (field == null) {
+ field = classHeader.getDeclaredFields();
+ synchronized (CLASS_HASH_MAP) {
+ CLASS_HASH_MAP.put(classHeader, field);
+ }
+ }
+ return field;
+ }
+
+ private Annotation getNotNullAnnotation(Field field) {
+ Annotation annotation = NOT_NULL_ANNOTATION_CACHE.get(field);
+
+ if (annotation == null) {
+ annotation = field.getAnnotation(CFNotNull.class);
+ synchronized (NOT_NULL_ANNOTATION_CACHE) {
+ NOT_NULL_ANNOTATION_CACHE.put(field, annotation);
+ }
+ }
+ return annotation;
+ }
+
+ private String getCanonicalName(Class clazz) {
+ String name = CANONICAL_NAME_CACHE.get(clazz);
+
+ if (name == null) {
+ name = clazz.getCanonicalName();
+ synchronized (CANONICAL_NAME_CACHE) {
+ CANONICAL_NAME_CACHE.put(clazz, name);
+ }
+ }
+ return name;
+ }
+
+ public ByteBuffer encode() {
+ // 1> header length size
+ int length = 4;
+
+ // 2> header data length
+ byte[] headerData = this.headerEncode();
+ length += headerData.length;
+
+ // 3> body data length
+ if (this.body != null) {
+ length += body.length;
+ }
+
+ ByteBuffer result = ByteBuffer.allocate(4 + length);
+
+ // length
+ result.putInt(length);
+
+ // header length
+ result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
+
+ // header data
+ result.put(headerData);
+
+ // body data;
+ if (this.body != null) {
+ result.put(this.body);
+ }
+
+ result.flip();
+
+ return result;
+ }
+
+ private byte[] headerEncode() {
+ this.makeCustomHeaderToNet();
+ if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
+ return RocketMQSerializable.rocketMQProtocolEncode(this);
+ } else {
+ return RemotingSerializable.encode(this);
+ }
+ }
+
+ public static byte[] markProtocolType(int source, SerializeType type) {
+ byte[] result = new byte[4];
+
+ result[0] = type.getCode();
+ result[1] = (byte) ((source >> 16) & 0xFF);
+ result[2] = (byte) ((source >> 8) & 0xFF);
+ result[3] = (byte) (source & 0xFF);
+ return result;
+ }
+
+ public void makeCustomHeaderToNet() {
+ if (this.customHeader != null) {
+ Field[] fields = getClazzFields(customHeader.getClass());
+ if (null == this.extFields) {
+ this.extFields = new HashMap<String, String>();
+ }
+
+ for (Field field : fields) {
+ if (!Modifier.isStatic(field.getModifiers())) {
+ String name = field.getName();
+ if (!name.startsWith("this")) {
+ Object value = null;
+ try {
+ field.setAccessible(true);
+ value = field.get(this.customHeader);
+ } catch (IllegalArgumentException e) {
+ } catch (IllegalAccessException e) {
+ }
+
+ if (value != null) {
+ this.extFields.put(name, value.toString());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public ByteBuffer encodeHeader() {
+ return encodeHeader(this.body != null ? this.body.length : 0);
+ }
+
+ /**
+
+ */
+ public ByteBuffer encodeHeader(final int bodyLength) {
+ // 1> header length size
+ int length = 4;
+
+ // 2> header data length
+ byte[] headerData;
+ headerData = this.headerEncode();
+
+ length += headerData.length;
+
+ // 3> body data length
+ length += bodyLength;
+
+ ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
+
+ // length
+ result.putInt(length);
+
+ // header length
+ result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
+
+ // header data
+ result.put(headerData);
+
+ result.flip();
+
+ return result;
+ }
+
+ public void markOnewayRPC() {
+ int bits = 1 << RPC_ONEWAY;
+ this.flag |= bits;
+ }
+
+ @JSONField(serialize = false)
+ public boolean isOnewayRPC() {
+ int bits = 1 << RPC_ONEWAY;
+ return (this.flag & bits) == bits;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ @JSONField(serialize = false)
+ public RemotingCommandType getType() {
+ if (this.isResponseType()) {
+ return RemotingCommandType.RESPONSE_COMMAND;
+ }
+
+ return RemotingCommandType.REQUEST_COMMAND;
+ }
+
+ @JSONField(serialize = false)
+ public boolean isResponseType() {
+ int bits = 1 << RPC_TYPE;
+ return (this.flag & bits) == bits;
+ }
+
+ public LanguageCode getLanguage() {
+ return language;
+ }
+
+ public void setLanguage(LanguageCode language) {
+ this.language = language;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public int getOpaque() {
+ return opaque;
+ }
+
+ public void setOpaque(int opaque) {
+ this.opaque = opaque;
+ }
+
+ public int getFlag() {
+ return flag;
+ }
+
+ public void setFlag(int flag) {
+ this.flag = flag;
+ }
+
+ public String getRemark() {
+ return remark;
+ }
+
+ public void setRemark(String remark) {
+ this.remark = remark;
+ }
+
+ public byte[] getBody() {
+ return body;
+ }
+
+ public void setBody(byte[] body) {
+ this.body = body;
+ }
+
+ public HashMap<String, String> getExtFields() {
+ return extFields;
+ }
+
+ public void setExtFields(HashMap<String, String> extFields) {
+ this.extFields = extFields;
+ }
+
+ public void addExtField(String key, String value) {
+ if (null == extFields) {
+ extFields = new HashMap<String, String>();
+ }
+ extFields.put(key, value);
+ }
+
+ @Override
+ public String toString() {
+ return "RemotingCommand [code=" + code + ", language=" + language + ", version=" + version + ", opaque=" + opaque + ", flag(B)="
+ + Integer.toBinaryString(flag) + ", remark=" + remark + ", extFields=" + extFields + ", serializeTypeCurrentRPC="
+ + serializeTypeCurrentRPC + "]";
+ }
+
+
+ public SerializeType getSerializeTypeCurrentRPC() {
+ return serializeTypeCurrentRPC;
+ }
+
+
+ public void setSerializeTypeCurrentRPC(SerializeType serializeTypeCurrentRPC) {
+ this.serializeTypeCurrentRPC = serializeTypeCurrentRPC;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommandType.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommandType.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommandType.java
new file mode 100644
index 0000000..c0ac288
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingCommandType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.protocol;
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public enum RemotingCommandType {
+ REQUEST_COMMAND,
+ RESPONSE_COMMAND;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSerializable.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSerializable.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSerializable.java
new file mode 100644
index 0000000..c144ea6
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSerializable.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.protocol;
+
+import com.alibaba.fastjson.JSON;
+
+import java.nio.charset.Charset;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public abstract class RemotingSerializable {
+ public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+
+ public static byte[] encode(final Object obj) {
+ final String json = toJson(obj, false);
+ if (json != null) {
+ return json.getBytes(CHARSET_UTF8);
+ }
+ return null;
+ }
+
+ public static String toJson(final Object obj, boolean prettyFormat) {
+ return JSON.toJSONString(obj, prettyFormat);
+ }
+
+ public static <T> T decode(final byte[] data, Class<T> classOfT) {
+ final String json = new String(data, CHARSET_UTF8);
+ return fromJson(json, classOfT);
+ }
+
+ public static <T> T fromJson(String json, Class<T> classOfT) {
+ return JSON.parseObject(json, classOfT);
+ }
+
+ public byte[] encode() {
+ final String json = this.toJson();
+ if (json != null) {
+ return json.getBytes(CHARSET_UTF8);
+ }
+ return null;
+ }
+
+ public String toJson() {
+ return toJson(false);
+ }
+
+ public String toJson(final boolean prettyFormat) {
+ return toJson(this, prettyFormat);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSysResponseCode.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSysResponseCode.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSysResponseCode.java
new file mode 100644
index 0000000..085c750
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RemotingSysResponseCode.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting.protocol;
+
+public class RemotingSysResponseCode {
+
+ public static final int SUCCESS = 0;
+
+ public static final int SYSTEM_ERROR = 1;
+
+ public static final int SYSTEM_BUSY = 2;
+
+ public static final int REQUEST_CODE_NOT_SUPPORTED = 3;
+
+ public static final int TRANSACTION_FAILED = 4;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RocketMQSerializable.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RocketMQSerializable.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RocketMQSerializable.java
new file mode 100644
index 0000000..6956aab
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/RocketMQSerializable.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.protocol;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+
+/**
+ * @author manhong.yqd
+ *
+ */
+public class RocketMQSerializable {
+ public static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+
+ public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
+ // String remark
+ byte[] remarkBytes = null;
+ int remarkLen = 0;
+ if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
+ remarkBytes = cmd.getRemark().getBytes(RemotingSerializable.CHARSET_UTF8);
+ remarkLen = remarkBytes.length;
+ }
+
+ // HashMap<String, String> extFields
+ byte[] extFieldsBytes = null;
+ int extLen = 0;
+ if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
+ extFieldsBytes = mapSerialize(cmd.getExtFields());
+ extLen = extFieldsBytes.length;
+ }
+
+ // ################### cal total length
+ int totalLen = calTotalLen(remarkLen, extLen);
+
+ // ################### content
+ ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
+ // int code(~32767)
+ headerBuffer.putShort((short) cmd.getCode());
+ // LanguageCode language
+ headerBuffer.put(cmd.getLanguage().getCode());
+ // int version(~32767)
+ headerBuffer.putShort((short) cmd.getVersion());
+ // int opaque
+ headerBuffer.putInt(cmd.getOpaque());
+ // int flag
+ headerBuffer.putInt(cmd.getFlag());
+ // String remark
+ if (remarkBytes != null) {
+ headerBuffer.putInt(remarkBytes.length);
+ headerBuffer.put(remarkBytes);
+ } else {
+ headerBuffer.putInt(0);
+ }
+ // HashMap<String, String> extFields;
+ if (extFieldsBytes != null) {
+ headerBuffer.putInt(extFieldsBytes.length);
+ headerBuffer.put(extFieldsBytes);
+ } else {
+ headerBuffer.putInt(0);
+ }
+
+ return headerBuffer.array();
+ }
+
+ public static byte[] mapSerialize(HashMap<String, String> map) {
+ // keySize+key+valSize+val
+ // keySize+key+valSize+val
+ if (null == map || map.isEmpty())
+ return null;
+
+ int totalLength = 0;
+ int kvLength;
+ Iterator<Map.Entry<String, String>> it = map.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, String> entry = it.next();
+ if (entry.getKey() != null && entry.getValue() != null) {
+ kvLength =
+ // keySize + Key
+ 2 + entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8).length
+ // valSize + val
+ + 4 + entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8).length;
+ totalLength += kvLength;
+ }
+ }
+
+ ByteBuffer content = ByteBuffer.allocate(totalLength);
+ byte[] key;
+ byte[] val;
+ it = map.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, String> entry = it.next();
+ if (entry.getKey() != null && entry.getValue() != null) {
+ key = entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8);
+ val = entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8);
+
+ content.putShort((short) key.length);
+ content.put(key);
+
+ content.putInt(val.length);
+ content.put(val);
+ }
+ }
+
+ return content.array();
+ }
+
+ private static int calTotalLen(int remark, int ext) {
+ // int code(~32767)
+ int length = 2
+ // LanguageCode language
+ + 1
+ // int version(~32767)
+ + 2
+ // int opaque
+ + 4
+ // int flag
+ + 4
+ // String remark
+ + 4 + remark
+ // HashMap<String, String> extFields
+ + 4 + ext;
+
+ return length;
+ }
+
+ public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
+ RemotingCommand cmd = new RemotingCommand();
+ ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
+ // int code(~32767)
+ cmd.setCode(headerBuffer.getShort());
+ // LanguageCode language
+ cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
+ // int version(~32767)
+ cmd.setVersion(headerBuffer.getShort());
+ // int opaque
+ cmd.setOpaque(headerBuffer.getInt());
+ // int flag
+ cmd.setFlag(headerBuffer.getInt());
+ // String remark
+ int remarkLength = headerBuffer.getInt();
+ if (remarkLength > 0) {
+ byte[] remarkContent = new byte[remarkLength];
+ headerBuffer.get(remarkContent);
+ cmd.setRemark(new String(remarkContent, RemotingSerializable.CHARSET_UTF8));
+ }
+
+ // HashMap<String, String> extFields
+ int extFieldsLength = headerBuffer.getInt();
+ if (extFieldsLength > 0) {
+ byte[] extFieldsBytes = new byte[extFieldsLength];
+ headerBuffer.get(extFieldsBytes);
+ cmd.setExtFields(mapDeserialize(extFieldsBytes));
+ }
+ return cmd;
+ }
+
+ public static HashMap<String, String> mapDeserialize(byte[] bytes) {
+ if (bytes == null || bytes.length <= 0)
+ return null;
+
+ HashMap<String, String> map = new HashMap<String, String>();
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+
+ short keySize = 0;
+ byte[] keyContent = null;
+ int valSize = 0;
+ byte[] valContent = null;
+ while (byteBuffer.hasRemaining()) {
+ keySize = byteBuffer.getShort();
+ keyContent = new byte[keySize];
+ byteBuffer.get(keyContent);
+
+ valSize = byteBuffer.getInt();
+ valContent = new byte[valSize];
+ byteBuffer.get(valContent);
+
+ map.put(new String(keyContent, RemotingSerializable.CHARSET_UTF8), new String(valContent,
+ RemotingSerializable.CHARSET_UTF8));
+ }
+ return map;
+ }
+
+
+ public static boolean isBlank(String str) {
+ int strLen;
+ if (str == null || (strLen = str.length()) == 0) {
+ return true;
+ }
+ for (int i = 0; i < strLen; i++) {
+ if (!Character.isWhitespace(str.charAt(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/SerializeType.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/SerializeType.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/SerializeType.java
new file mode 100644
index 0000000..b46d8d8
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/SerializeType.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting.protocol;
+
+public enum SerializeType {
+ JSON((byte) 0),
+ ROCKETMQ((byte) 1);
+
+ private byte code;
+
+ SerializeType(byte code) {
+ this.code = code;
+ }
+
+ public static SerializeType valueOf(byte code) {
+ for (SerializeType serializeType : SerializeType.values()) {
+ if (serializeType.getCode() == code) {
+ return serializeType;
+ }
+ }
+ return null;
+ }
+
+ public byte getCode() {
+ return code;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.sevialize.txt
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.sevialize.txt b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.sevialize.txt
new file mode 100644
index 0000000..976cdf2
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.sevialize.txt
@@ -0,0 +1,6 @@
+//
+// Remoting protocol V0.1 draft
+//
+// protocol <length> <header length> <header data> <body data>
+// 1 2 3 4
+//
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.txt
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.txt b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.txt
new file mode 100644
index 0000000..7444cbb
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/protocol/protocol.txt
@@ -0,0 +1,6 @@
+//
+// Remoting protocol V0.1 draft
+//
+// protocol <length> <header length> <header data> <body data>
+// 1 2 3 4
+//
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/ExceptionTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/ExceptionTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/ExceptionTest.java
new file mode 100644
index 0000000..6baa013
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/ExceptionTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: ExceptionTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.netty.*;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+import org.junit.Test;
+
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ExceptionTest {
+ private static RemotingServer createRemotingServer() throws InterruptedException {
+ NettyServerConfig config = new NettyServerConfig();
+ RemotingServer client = new NettyRemotingServer(config);
+ client.registerProcessor(0, new NettyRequestProcessor() {
+ private int i = 0;
+
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+ System.out.println("processRequest=" + request + " " + (i++));
+ request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress());
+ return request;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+ }, Executors.newCachedThreadPool());
+ client.start();
+ return client;
+ }
+
+ @Test
+ public void test_CONNECT_EXCEPTION() {
+ RemotingClient client = createRemotingClient();
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+ RemotingCommand response = null;
+ try {
+ response = client.invokeSync("localhost:8888", request, 1000 * 3);
+ } catch (RemotingConnectException e) {
+ e.printStackTrace();
+ } catch (RemotingSendRequestException e) {
+ e.printStackTrace();
+ } catch (RemotingTimeoutException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ System.out.println("invoke result = " + response);
+ assertTrue(null == response);
+
+ client.shutdown();
+ System.out.println("-----------------------------------------------------------------");
+ }
+
+ private static RemotingClient createRemotingClient() {
+ NettyClientConfig config = new NettyClientConfig();
+ RemotingClient client = new NettyRemotingClient(config);
+ client.start();
+ return client;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/MixTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/MixTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/MixTest.java
new file mode 100644
index 0000000..97d1663
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/MixTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: MixTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.remoting;
+
+import org.junit.Test;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MixTest {
+ @Test
+ public void test_extFieldsValue() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
new file mode 100644
index 0000000..e4ff948
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Test;
+
+
+/**
+
+ *
+ * @author shijia.wxr
+ *
+ */
+public class NettyConnectionTest {
+ @Test
+ public void test_connect_timeout() throws InterruptedException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException {
+ RemotingClient client = createRemotingClient();
+
+ for (int i = 0; i < 100; i++) {
+ try {
+ RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+ RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ client.shutdown();
+ System.out.println("-----------------------------------------------------------------");
+ }
+
+ public static RemotingClient createRemotingClient() {
+ NettyClientConfig config = new NettyClientConfig();
+ config.setClientChannelMaxIdleTimeSeconds(15);
+ RemotingClient client = new NettyRemotingClient(config);
+ client.start();
+ return client;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyIdleTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyIdleTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyIdleTest.java
new file mode 100644
index 0000000..23145f4
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyIdleTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.netty.*;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class NettyIdleTest {
+ // @Test
+ public void test_idle_event() throws InterruptedException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException {
+ RemotingServer server = createRemotingServer();
+ RemotingClient client = createRemotingClient();
+
+ for (int i = 0; i < 10; i++) {
+ RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+ RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3);
+ System.out.println(i + " invoke result = " + response);
+ assertTrue(response != null);
+
+ Thread.sleep(1000 * 10);
+ }
+
+ Thread.sleep(1000 * 60);
+
+ client.shutdown();
+ server.shutdown();
+ System.out.println("-----------------------------------------------------------------");
+ }
+
+ public static RemotingServer createRemotingServer() throws InterruptedException {
+ NettyServerConfig config = new NettyServerConfig();
+ config.setServerChannelMaxIdleTimeSeconds(30);
+ RemotingServer remotingServer = new NettyRemotingServer(config);
+ remotingServer.registerProcessor(0, new NettyRequestProcessor() {
+ private int i = 0;
+
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+ System.out.println("processRequest=" + request + " " + (i++));
+ request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress());
+ return request;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+ }, Executors.newCachedThreadPool());
+ remotingServer.start();
+ return remotingServer;
+ }
+
+ public static RemotingClient createRemotingClient() {
+ NettyClientConfig config = new NettyClientConfig();
+ config.setClientChannelMaxIdleTimeSeconds(15);
+ RemotingClient client = new NettyRemotingClient(config);
+ client.start();
+ return client;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyRPCTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyRPCTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyRPCTest.java
new file mode 100644
index 0000000..7d4f68e
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyRPCTest.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: NettyRPCTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.annotation.CFNullable;
+import com.alibaba.rocketmq.remoting.exception.*;
+import com.alibaba.rocketmq.remoting.netty.*;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+import org.junit.Test;
+
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class NettyRPCTest {
+ @Test
+ public void test_RPC_Sync() throws InterruptedException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException {
+ RemotingServer server = createRemotingServer();
+ RemotingClient client = createRemotingClient();
+
+ for (int i = 0; i < 100; i++) {
+ TestRequestHeader requestHeader = new TestRequestHeader();
+ requestHeader.setCount(i);
+ requestHeader.setMessageTitle("HelloMessageTitle");
+ RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
+ RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3000);
+ System.out.println("invoke result = " + response);
+ assertTrue(response != null);
+ }
+
+ client.shutdown();
+ server.shutdown();
+ System.out.println("-----------------------------------------------------------------");
+ }
+
+ public static RemotingServer createRemotingServer() throws InterruptedException {
+ NettyServerConfig config = new NettyServerConfig();
+ RemotingServer remotingServer = new NettyRemotingServer(config);
+ remotingServer.registerProcessor(0, new NettyRequestProcessor() {
+ private int i = 0;
+
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+ System.out.println("processRequest=" + request + " " + (i++));
+ request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress());
+ return request;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+ }, Executors.newCachedThreadPool());
+ remotingServer.start();
+ return remotingServer;
+ }
+
+ public static RemotingClient createRemotingClient() {
+ NettyClientConfig config = new NettyClientConfig();
+ RemotingClient client = new NettyRemotingClient(config);
+ client.start();
+ return client;
+ }
+
+ @Test
+ public void test_RPC_Oneway() throws InterruptedException, RemotingConnectException,
+ RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
+ RemotingServer server = createRemotingServer();
+ RemotingClient client = createRemotingClient();
+
+ for (int i = 0; i < 100; i++) {
+ RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+ request.setRemark(String.valueOf(i));
+ client.invokeOneway("localhost:8888", request, 1000 * 3);
+ }
+
+ client.shutdown();
+ server.shutdown();
+ System.out.println("-----------------------------------------------------------------");
+ }
+
+
+ @Test
+ public void test_RPC_Async() throws InterruptedException, RemotingConnectException,
+ RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
+ RemotingServer server = createRemotingServer();
+ RemotingClient client = createRemotingClient();
+
+ for (int i = 0; i < 100; i++) {
+ RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+ request.setRemark(String.valueOf(i));
+ client.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ System.out.println(responseFuture.getResponseCommand());
+ }
+ });
+ }
+
+ Thread.sleep(1000 * 3);
+
+ client.shutdown();
+ server.shutdown();
+ System.out.println("-----------------------------------------------------------------");
+ }
+
+
+ @Test
+ public void test_server_call_client() throws InterruptedException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException {
+ final RemotingServer server = createRemotingServer();
+ final RemotingClient client = createRemotingClient();
+
+ server.registerProcessor(0, new NettyRequestProcessor() {
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+ try {
+ return server.invokeSync(ctx.channel(), request, 1000 * 10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (RemotingSendRequestException e) {
+ e.printStackTrace();
+ } catch (RemotingTimeoutException e) {
+ e.printStackTrace();
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+ }, Executors.newCachedThreadPool());
+
+ client.registerProcessor(0, new NettyRequestProcessor() {
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
+ System.out.println("client receive server request = " + request);
+ request.setRemark("client remark");
+ return request;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+ }, Executors.newCachedThreadPool());
+
+ for (int i = 0; i < 3; i++) {
+ RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+ RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3);
+ System.out.println("invoke result = " + response);
+ assertTrue(response != null);
+ }
+
+ client.shutdown();
+ server.shutdown();
+ System.out.println("-----------------------------------------------------------------");
+ }
+
+}
+
+
+class TestRequestHeader implements CommandCustomHeader {
+ @CFNullable
+ private Integer count;
+
+ @CFNullable
+ private String messageTitle;
+
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+
+
+ public Integer getCount() {
+ return count;
+ }
+
+
+ public void setCount(Integer count) {
+ this.count = count;
+ }
+
+
+ public String getMessageTitle() {
+ return messageTitle;
+ }
+
+
+ public void setMessageTitle(String messageTitle) {
+ this.messageTitle = messageTitle;
+ }
+}
+
+
+class TestResponseHeader implements CommandCustomHeader {
+ @CFNullable
+ private Integer count;
+
+ @CFNullable
+ private String messageTitle;
+
+ public Integer getCount() {
+ return count;
+ }
+
+ public void setCount(Integer count) {
+ this.count = count;
+ }
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
+ }
+
+ public String getMessageTitle() {
+ return messageTitle;
+ }
+
+ public void setMessageTitle(String messageTitle) {
+ this.messageTitle = messageTitle;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/SyncInvokeTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/SyncInvokeTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/SyncInvokeTest.java
new file mode 100644
index 0000000..fc3e708
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/SyncInvokeTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: SyncInvokeTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.remoting;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SyncInvokeTest {
+ @Test
+ public void test_RPC_Sync() throws Exception {
+ RemotingServer server = NettyRPCTest.createRemotingServer();
+ RemotingClient client = NettyRPCTest.createRemotingClient();
+
+ for (int i = 0; i < 100; i++) {
+ try {
+ RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
+ RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3);
+ System.out.println(i + "\t" + "invoke result = " + response);
+ assertTrue(response != null);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ client.shutdown();
+ server.shutdown();
+ System.out.println("-----------------------------------------------------------------");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/subclass/TestSubClassAuto.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/subclass/TestSubClassAuto.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/subclass/TestSubClassAuto.java
new file mode 100644
index 0000000..3d18d23
--- /dev/null
+++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/subclass/TestSubClassAuto.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package com.alibaba.rocketmq.subclass;
+
+import org.junit.Test;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TestSubClassAuto {
+ @Test
+ public void test_sub() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-srvutil/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-srvutil/pom.xml b/rocketmq-srvutil/pom.xml
new file mode 100644
index 0000000..98b4e29
--- /dev/null
+++ b/rocketmq-srvutil/pom.xml
@@ -0,0 +1,51 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain producerGroup copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>com.alibaba.rocketmq</groupId>
+ <artifactId>rocketmq-all</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>rocketmq-srvutil</artifactId>
+ <name>rocketmq-srvutil ${project.version}</name>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-remoting</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-srvutil/src/main/java/com/alibaba/rocketmq/srvutil/ServerUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-srvutil/src/main/java/com/alibaba/rocketmq/srvutil/ServerUtil.java b/rocketmq-srvutil/src/main/java/com/alibaba/rocketmq/srvutil/ServerUtil.java
new file mode 100644
index 0000000..72fa9a4
--- /dev/null
+++ b/rocketmq-srvutil/src/main/java/com/alibaba/rocketmq/srvutil/ServerUtil.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.srvutil;
+
+import org.apache.commons.cli.*;
+
+import java.util.Properties;
+
+public class ServerUtil {
+
+ public static Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("h", "help", false, "Print help");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt =
+ new Option("n", "namesrvAddr", true,
+ "Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+
+ public static CommandLine parseCmdLine(final String appName, String[] args, Options options,
+ CommandLineParser parser) {
+ HelpFormatter hf = new HelpFormatter();
+ hf.setWidth(110);
+ CommandLine commandLine = null;
+ try {
+ commandLine = parser.parse(options, args);
+ if (commandLine.hasOption('h')) {
+ hf.printHelp(appName, options, true);
+ return null;
+ }
+ } catch (ParseException e) {
+ hf.printHelp(appName, options, true);
+ }
+
+ return commandLine;
+ }
+
+
+ public static void printCommandLineHelp(final String appName, final Options options) {
+ HelpFormatter hf = new HelpFormatter();
+ hf.setWidth(110);
+ hf.printHelp(appName, options, true);
+ }
+
+
+ public static Properties commandLine2Properties(final CommandLine commandLine) {
+ Properties properties = new Properties();
+ Option[] opts = commandLine.getOptions();
+
+ if (opts != null) {
+ for (Option opt : opts) {
+ String name = opt.getLongOpt();
+ String value = commandLine.getOptionValue(name);
+ if (value != null) {
+ properties.setProperty(name, value);
+ }
+ }
+ }
+
+ return properties;
+ }
+
+}