You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/19 07:23:48 UTC
[4/8] incubator-rocketmq git commit: initialize RocketMQ5
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ByteUtils.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ByteUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ByteUtils.java
new file mode 100644
index 0000000..c298ce7
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ByteUtils.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.internal;
+
+/**
+ * Copy from Bouncy Castle Crypto APIs
+ *
+ * This class is a utility class for manipulating byte arrays.
+ */
+public final class ByteUtils {
+
+ private static final char[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+
+ /**
+ * Default constructor (private)
+ */
+ private ByteUtils() {
+ // empty
+ }
+
+ /**
+ * Compare two byte arrays (perform null checks beforehand).
+ *
+ * @param left the first byte array
+ * @param right the second byte array
+ * @return the result of the comparison
+ */
+ public static boolean equals(byte[] left, byte[] right) {
+ if (left == null) {
+ return right == null;
+ }
+ if (right == null) {
+ return false;
+ }
+
+ if (left.length != right.length) {
+ return false;
+ }
+ boolean result = true;
+ for (int i = left.length - 1; i >= 0; i--) {
+ result &= left[i] == right[i];
+ }
+ return result;
+ }
+
+ /**
+ * Compare two two-dimensional byte arrays. No null checks are performed.
+ *
+ * @param left the first byte array
+ * @param right the second byte array
+ * @return the result of the comparison
+ */
+ public static boolean equals(byte[][] left, byte[][] right) {
+ if (left.length != right.length) {
+ return false;
+ }
+
+ boolean result = true;
+ for (int i = left.length - 1; i >= 0; i--) {
+ result &= ByteUtils.equals(left[i], right[i]);
+ }
+
+ return result;
+ }
+
+ /**
+ * Compare two three-dimensional byte arrays. No null checks are performed.
+ *
+ * @param left the first byte array
+ * @param right the second byte array
+ * @return the result of the comparison
+ */
+ public static boolean equals(byte[][][] left, byte[][][] right) {
+ if (left.length != right.length) {
+ return false;
+ }
+
+ boolean result = true;
+ for (int i = left.length - 1; i >= 0; i--) {
+ if (left[i].length != right[i].length) {
+ return false;
+ }
+ for (int j = left[i].length - 1; j >= 0; j--) {
+ result &= ByteUtils.equals(left[i][j], right[i][j]);
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Computes a hashcode based on the contents of a one-dimensional byte array
+ * rather than its identity.
+ *
+ * @param array the array to compute the hashcode of
+ * @return the hashcode
+ */
+ public static int deepHashCode(byte[] array) {
+ int result = 1;
+ for (int i = 0; i < array.length; i++) {
+ result = 31 * result + array[i];
+ }
+ return result;
+ }
+
+ /**
+ * Computes a hashcode based on the contents of a two-dimensional byte array
+ * rather than its identity.
+ *
+ * @param array the array to compute the hashcode of
+ * @return the hashcode
+ */
+ public static int deepHashCode(byte[][] array) {
+ int result = 1;
+ for (int i = 0; i < array.length; i++) {
+ result = 31 * result + deepHashCode(array[i]);
+ }
+ return result;
+ }
+
+ /**
+ * Computes a hashcode based on the contents of a three-dimensional byte
+ * array rather than its identity.
+ *
+ * @param array the array to compute the hashcode of
+ * @return the hashcode
+ */
+ public static int deepHashCode(byte[][][] array) {
+ int result = 1;
+ for (int i = 0; i < array.length; i++) {
+ result = 31 * result + deepHashCode(array[i]);
+ }
+ return result;
+ }
+
+ /**
+ * Return a clone of the given byte array (performs null check beforehand).
+ *
+ * @param array the array to clone
+ * @return the clone of the given array, or <tt>null</tt> if the array is
+ * <tt>null</tt>
+ */
+ public static byte[] clone(byte[] array) {
+ if (array == null) {
+ return null;
+ }
+ byte[] result = new byte[array.length];
+ System.arraycopy(array, 0, result, 0, array.length);
+ return result;
+ }
+
+ /**
+ * Convert a string containing hexadecimal characters to a byte-array.
+ *
+ * @param s a hex string
+ * @return a byte array with the corresponding value
+ */
+ public static byte[] fromHexString(String s) {
+ char[] rawChars = s.toUpperCase().toCharArray();
+
+ int hexChars = 0;
+ for (int i = 0; i < rawChars.length; i++) {
+ if ((rawChars[i] >= '0' && rawChars[i] <= '9')
+ || (rawChars[i] >= 'A' && rawChars[i] <= 'F')) {
+ hexChars++;
+ }
+ }
+
+ byte[] byteString = new byte[(hexChars + 1) >> 1];
+
+ int pos = hexChars & 1;
+
+ for (int i = 0; i < rawChars.length; i++) {
+ if (rawChars[i] >= '0' && rawChars[i] <= '9') {
+ byteString[pos >> 1] <<= 4;
+ byteString[pos >> 1] |= rawChars[i] - '0';
+ } else if (rawChars[i] >= 'A' && rawChars[i] <= 'F') {
+ byteString[pos >> 1] <<= 4;
+ byteString[pos >> 1] |= rawChars[i] - 'A' + 10;
+ } else {
+ continue;
+ }
+ pos++;
+ }
+
+ return byteString;
+ }
+
+ /**
+ * Convert a byte array to the corresponding hexstring.
+ *
+ * @param input the byte array to be converted
+ * @return the corresponding hexstring
+ */
+ public static String toHexString(byte[] input) {
+ String result = "";
+ for (int i = 0; i < input.length; i++) {
+ result += HEX_CHARS[(input[i] >>> 4) & 0x0f];
+ result += HEX_CHARS[(input[i]) & 0x0f];
+ }
+ return result;
+ }
+
+ /**
+ * Convert a byte array to the corresponding hex string.
+ *
+ * @param input the byte array to be converted
+ * @param prefix the prefix to put at the beginning of the hex string
+ * @param seperator a separator string
+ * @return the corresponding hex string
+ */
+ public static String toHexString(byte[] input, String prefix,
+ String seperator) {
+ String result = new String(prefix);
+ for (int i = 0; i < input.length; i++) {
+ result += HEX_CHARS[(input[i] >>> 4) & 0x0f];
+ result += HEX_CHARS[(input[i]) & 0x0f];
+ if (i < input.length - 1) {
+ result += seperator;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Convert a byte array to the corresponding bit string.
+ *
+ * @param input the byte array to be converted
+ * @return the corresponding bit string
+ */
+ public static String toBinaryString(byte[] input) {
+ String result = "";
+ int i;
+ for (i = 0; i < input.length; i++) {
+ int e = input[i];
+ for (int ii = 0; ii < 8; ii++) {
+ int b = (e >>> ii) & 1;
+ result += b;
+ }
+ if (i != input.length - 1) {
+ result += " ";
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Compute the bitwise XOR of two arrays of bytes. The arrays have to be of
+ * same length. No length checking is performed.
+ *
+ * @param x1 the first array
+ * @param x2 the second array
+ * @return x1 XOR x2
+ */
+ public static byte[] xor(byte[] x1, byte[] x2) {
+ byte[] out = new byte[x1.length];
+
+ for (int i = x1.length - 1; i >= 0; i--) {
+ out[i] = (byte) (x1[i] ^ x2[i]);
+ }
+ return out;
+ }
+
+ /**
+ * Concatenate two byte arrays. No null checks are performed.
+ *
+ * @param x1 the first array
+ * @param x2 the second array
+ * @return (x2||x1) (little-endian order, i.e. x1 is at lower memory
+ * addresses)
+ */
+ public static byte[] concatenate(byte[] x1, byte[] x2) {
+ byte[] result = new byte[x1.length + x2.length];
+
+ System.arraycopy(x1, 0, result, 0, x1.length);
+ System.arraycopy(x2, 0, result, x1.length, x2.length);
+
+ return result;
+ }
+
+ /**
+ * Convert a 2-dimensional byte array into a 1-dimensional byte array by
+ * concatenating all entries.
+ *
+ * @param array a 2-dimensional byte array
+ * @return the concatenated input array
+ */
+ public static byte[] concatenate(byte[][] array) {
+ int rowLength = array[0].length;
+ byte[] result = new byte[array.length * rowLength];
+ int index = 0;
+ for (int i = 0; i < array.length; i++) {
+ System.arraycopy(array[i], 0, result, index, rowLength);
+ index += rowLength;
+ }
+ return result;
+ }
+
+ /**
+ * Split a byte array <tt>input</tt> into two arrays at <tt>index</tt>,
+ * i.e. the first array will have the lower <tt>index</tt> bytes, the
+ * second one the higher <tt>input.length - index</tt> bytes.
+ *
+ * @param input the byte array to be split
+ * @param index the index where the byte array is split
+ * @return the splitted input array as an array of two byte arrays
+ * @throws ArrayIndexOutOfBoundsException if <tt>index</tt> is out of bounds
+ */
+ public static byte[][] split(byte[] input, int index)
+ throws ArrayIndexOutOfBoundsException {
+ if (index > input.length) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ byte[][] result = new byte[2][];
+ result[0] = new byte[index];
+ result[1] = new byte[input.length - index];
+ System.arraycopy(input, 0, result[0], 0, index);
+ System.arraycopy(input, index, result[1], 0, input.length - index);
+ return result;
+ }
+
+ /**
+ * Generate a subarray of a given byte array.
+ *
+ * @param input the input byte array
+ * @param start the start index
+ * @param end the end index
+ * @return a subarray of <tt>input</tt>, ranging from <tt>start</tt>
+ * (inclusively) to <tt>end</tt> (exclusively)
+ */
+ public static byte[] subArray(byte[] input, int start, int end) {
+ byte[] result = new byte[end - start];
+ System.arraycopy(input, start, result, 0, end - start);
+ return result;
+ }
+
+ /**
+ * Generate a subarray of a given byte array.
+ *
+ * @param input the input byte array
+ * @param start the start index
+ * @return a subarray of <tt>input</tt>, ranging from <tt>start</tt> to
+ * the end of the array
+ */
+ public static byte[] subArray(byte[] input, int start) {
+ return subArray(input, start, input.length);
+ }
+
+ /**
+ * Rewrite a byte array as a char array
+ *
+ * @param input -
+ * the byte array
+ * @return char array
+ */
+ public static char[] toCharArray(byte[] input) {
+ char[] result = new char[input.length];
+ for (int i = 0; i < input.length; i++) {
+ result[i] = (char) input[i];
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ExceptionUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ExceptionUtils.java
new file mode 100644
index 0000000..6386ca0
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ExceptionUtils.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.internal;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+public class ExceptionUtils {
+
+ private static final String LINE_SEPARATOR = System.getProperty("line.separator");
+
+ /**
+ * <p>Gets the stack trace from a Throwable as a String.</p>
+ *
+ * <p>The result of this method vary by JDK version as this method
+ * uses {@link Throwable#printStackTrace(java.io.PrintWriter)}.
+ * On JDK1.3 and earlier, the cause exception will not be shown
+ * unless the specified throwable alters printStackTrace.</p>
+ *
+ * @param throwable the <code>Throwable</code> to be examined
+ * @return the stack trace as generated by the exception's
+ * <code>printStackTrace(PrintWriter)</code> method
+ */
+ public static String getStackTrace(final Throwable throwable) {
+ final StringWriter sw = new StringWriter();
+ final PrintWriter pw = new PrintWriter(sw, true);
+ throwable.printStackTrace(pw);
+ return sw.getBuffer().toString();
+ }
+
+ /**
+ * <p>Produces a <code>List</code> of stack frames - the message
+ * is not included. Only the trace of the specified exception is
+ * returned, any caused by trace is stripped.</p>
+ *
+ * <p>This works in most cases - it will only fail if the exception
+ * message contains a line that starts with:
+ * <code>" at".</code></p>
+ *
+ * @param t is any throwable
+ * @return List of stack frames
+ */
+ static List<String> getStackFrameList(final Throwable t) {
+ final String stackTrace = getStackTrace(t);
+ final String linebreak = LINE_SEPARATOR;
+ final StringTokenizer frames = new StringTokenizer(stackTrace, linebreak);
+ final List<String> list = new ArrayList<String>();
+ boolean traceStarted = false;
+ while (frames.hasMoreTokens()) {
+ final String token = frames.nextToken();
+ // Determine if the line starts with <whitespace>at
+ final int at = token.indexOf("at");
+ if (at != -1 && token.substring(0, at).trim().isEmpty()) {
+ traceStarted = true;
+ list.add(token);
+ } else if (traceStarted) {
+ break;
+ }
+ }
+ return list;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/JvmUtils.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/JvmUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/JvmUtils.java
new file mode 100644
index 0000000..fb97a14
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/JvmUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.internal;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Random;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class JvmUtils {
+ public static final String OS_NAME = System.getProperty("os.name").toLowerCase();
+ private final static Logger LOG = LoggerFactory.getLogger(JvmUtils.class);
+ //public static final String OS_VERSION = System.getProperty("os.version").toLowerCase();
+
+ /**
+ * A constructor to stop this class being constructed.
+ */
+ private JvmUtils() {
+ // Unused
+ }
+
+ public static boolean isWindows() {
+ return OS_NAME.startsWith("win");
+ }
+
+ public static boolean isWindows10() {
+ return OS_NAME.startsWith("win") && OS_NAME.endsWith("10");
+ }
+
+ public static boolean isMacOSX() {
+ return OS_NAME.contains("mac");
+ }
+
+ public static boolean isLinux() {
+ return OS_NAME.startsWith("linux");
+ }
+
+ public static boolean isUnix() {
+ return OS_NAME.contains("nix") ||
+ OS_NAME.contains("nux") ||
+ OS_NAME.contains("aix") ||
+ OS_NAME.contains("bsd") ||
+ OS_NAME.contains("sun") ||
+ OS_NAME.contains("hpux");
+ }
+
+ public static boolean isSolaris() {
+ return OS_NAME.startsWith("sun");
+ }
+
+ public static int getProcessId() {
+ String pid = null;
+ final File self = new File("/proc/self");
+ try {
+ if (self.exists()) {
+ pid = self.getCanonicalFile().getName();
+ }
+ } catch (IOException ignored) {
+ //Ignore it
+ }
+
+ if (pid == null) {
+ pid = ManagementFactory.getRuntimeMXBean().getName().split("@", 0)[0];
+ }
+
+ if (pid == null) {
+ int rpid = new Random().nextInt(1 << 16);
+ LOG.warn("Unable to determine PID, picked a random number {}", rpid);
+
+ return rpid;
+ } else {
+ return Integer.parseInt(pid);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/NetworkUtils.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/NetworkUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/NetworkUtils.java
new file mode 100644
index 0000000..0e8ae21
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/NetworkUtils.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.internal;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.Enumeration;
+import org.jetbrains.annotations.NotNull;
+
+public final class NetworkUtils {
+
+ public static final String DEFAULT_LOCAL_ADDRESS = "127.0.0.1";
+ public static final String DEFAULT_LOCAL_HOSTNAME = "localhost";
+
+ /**
+ * A constructor to stop this class being constructed.
+ */
+ private NetworkUtils() {
+ // Unused
+ }
+
+ public static InetAddress getLoopbackAddress() {
+ try {
+ return InetAddress.getByName(null);
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static boolean isLocalhost(@NotNull String host) {
+ return host.equalsIgnoreCase(DEFAULT_LOCAL_HOSTNAME) || host.equals(DEFAULT_LOCAL_ADDRESS);
+ }
+
+ public static String getLocalHostIp() {
+ try {
+ for (Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces(); ifaces.hasMoreElements(); ) {
+ NetworkInterface iface = ifaces.nextElement();
+ // Workaround for docker0 bridge
+ if ("docker0".equals(iface.getName()) || !iface.isUp()) {
+ continue;
+ }
+ InetAddress ia;
+ for (Enumeration<InetAddress> ips = iface.getInetAddresses(); ips.hasMoreElements(); ) {
+ ia = ips.nextElement();
+ if (ia instanceof Inet4Address) {
+ // Check if the address is any local or loop back(127.0.0.1 or ::1)
+ if (!ia.isLoopbackAddress() && ia.getHostAddress().indexOf(':') == -1) {
+ if (ia.isSiteLocalAddress()) {
+ return ia.getHostAddress();
+ } else if (!ia.isLinkLocalAddress() && !ia.isAnyLocalAddress()
+ && !ia.isMulticastAddress()) {
+ return ia.getHostAddress();
+ }
+ }
+ }
+ }
+ }
+ } catch (SocketException e) {
+ throw new RuntimeException("Could not get local host ip", e);
+ }
+ return DEFAULT_LOCAL_ADDRESS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/PropertyUtils.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/PropertyUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/PropertyUtils.java
new file mode 100644
index 0000000..7e76a7e
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/PropertyUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.internal;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class PropertyUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(PropertyUtils.class);
+
+ public static String getPropertyIgnoreCase(final Properties properties, final String key) {
+ String value = null;
+ if (properties != null) {
+ for (Map.Entry<Object, Object> next : properties.entrySet()) {
+ if (next.getKey().toString().equalsIgnoreCase(key)) {
+ return next.getValue().toString();
+ }
+ }
+ }
+ return value;
+ }
+
+ /**
+ * Read a properties file from the given path
+ *
+ * @param filename The path of the file to read
+ * @return Property file instance
+ */
+ public static Properties loadProps(String filename) {
+ Properties props = new Properties();
+ try (InputStream propStream = new FileInputStream(filename)) {
+ props.load(propStream);
+ } catch (IOException e) {
+ LOG.error(String.format("Loading properties from file %s error !", filename), e);
+ System.exit(1);
+ }
+ return props;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java
new file mode 100644
index 0000000..a4b1293
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.internal;
+
+import java.nio.ByteBuffer;
+import java.util.Calendar;
+
+public class UIDGenerator {
+ private static ThreadLocal<UIDGenerator> generatorLocal = new ThreadLocal<UIDGenerator>() {
+ @Override
+ protected UIDGenerator initialValue() {
+ return new UIDGenerator();
+ }
+ };
+ private short counter;
+ private int basePos = 0;
+ private long startTime;
+ private long nextStartTime;
+ private StringBuilder sb = null;
+ private ByteBuffer buffer = ByteBuffer.allocate(6);
+
+ private UIDGenerator() {
+ int len = 4 + 2 + 4 + 4 + 2;
+
+ sb = new StringBuilder(len * 2);
+ ByteBuffer tempBuffer = ByteBuffer.allocate(len - buffer.limit());
+ tempBuffer.position(2);
+ tempBuffer.putInt(JvmUtils.getProcessId());
+ tempBuffer.position(0);
+ try {
+ tempBuffer.put((byte) 1);
+ } catch (Exception e) {
+ tempBuffer.put(createFakeIP());
+ }
+ tempBuffer.position(6);
+ tempBuffer.putInt(UIDGenerator.class.getClassLoader().hashCode());
+ sb.append(ByteUtils.toHexString(tempBuffer.array()));
+ basePos = sb.length();
+ setStartTime(System.currentTimeMillis());
+ counter = 0;
+ }
+
+ public static UIDGenerator instance() {
+ return generatorLocal.get();
+ }
+
+ public String createUID() {
+ long current = System.currentTimeMillis();
+ if (current >= nextStartTime) {
+ setStartTime(current);
+ }
+ buffer.position(0);
+ sb.setLength(basePos);
+ buffer.putInt((int) (System.currentTimeMillis() - startTime));
+ buffer.putShort(counter++);
+ sb.append(ByteUtils.toHexString(buffer.array()));
+ return sb.toString();
+ }
+
+ private void setStartTime(long millis) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(millis);
+ cal.set(Calendar.DAY_OF_MONTH, 1);
+ cal.set(Calendar.HOUR, 0);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ startTime = cal.getTimeInMillis();
+ cal.add(Calendar.MONTH, 1);
+ nextStartTime = cal.getTimeInMillis();
+ }
+
+ public byte[] createFakeIP() {
+ ByteBuffer bb = ByteBuffer.allocate(8);
+ bb.putLong(System.currentTimeMillis());
+ bb.position(4);
+ byte[] fakeIP = new byte[4];
+ bb.get(fakeIP);
+ return fakeIP;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/package-info.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/package-info.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/package-info.java
new file mode 100644
index 0000000..e64f66b
--- /dev/null
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/package-info.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains all the transport classes that can be reused any times.
+ *
+ * Remoting wire-format protocol description:
+ *
+ * <pre>
+ * 2015-04-29 16:07:14 v1.0
+ * 2016-04-23 16:18:05 v2.0
+ * 2016-05-31 09:33:11 v3.0
+ * 2016-11-10 09:33:11 v3.1 remove deprecated tag field
+ *
+ *
+ * 1.Protocol Type 1 byte
+ * 2.Total Length 4 byte,exclude protocol type size
+ * 3.RequestID 4 byte,used for repeatable requests,connection reuse.an requestID string
+ * representing a client-generated, globally unique for some time unit, identifier for the request
+ * 4.Serializer Type 1 byte
+ * 5.Traffic Type 1 byte,0-sync;1-async;2-oneway;3-response
+ * 6.OpCode Length 2 byte
+ * 7.OpCode variant length,utf8 string
+ * 8.Remark Length 2 byte
+ * 9.Remark variant length,utf8 string
+ * 10.Properties Size 2 byte
+ * Property Length 2 byte
+ * Property Body variant length,utf8,Key\nValue
+ * 11.Inbound or OutBound payload length 4 byte
+ * 12.Inbound or OutBound payload variant length, max size limitation is 16M
+ * 13.Extra payload variant length
+ *
+ * </pre>
+ */
+package org.apache.rocketmq.remoting;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/DecoderTest.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/DecoderTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/DecoderTest.java
new file mode 100644
index 0000000..4fb664f
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/handler/DecoderTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.remoting.api.protocol.Protocol;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class DecoderTest {
+
+ private EmbeddedChannel channel;
+ private AtomicInteger messagesReceived;
+ private AtomicInteger exceptionsCaught;
+
+ @Before
+ public void setUp() {
+ exceptionsCaught = new AtomicInteger(0);
+ messagesReceived = new AtomicInteger(0);
+ Decoder decoder = new Decoder();
+
+ channel = new EmbeddedChannel(decoder, new SimpleChannelInboundHandler<Object>() {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+ messagesReceived.incrementAndGet();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ exceptionsCaught.incrementAndGet();
+ }
+ });
+ }
+
+ @After
+ public void tearDown() {
+ channel.close();
+ }
+
+ @Test
+ public void decodeEmptyBufferTest() throws Exception {
+ // Send an empty buffer and make sure nothing breaks
+ channel.writeInbound(Unpooled.EMPTY_BUFFER);
+ channel.finish();
+
+ Assert.assertEquals(exceptionsCaught.get(), 0);
+ Assert.assertEquals(messagesReceived.get(), 0);
+ }
+
+ @Test
+ public void decodeHalfMessageTest() throws Exception {
+ //Case 1
+ ByteBuf buf = Unpooled.buffer().writeBytes(new byte[] {'a', 'b', 'c'});
+
+ channel.writeInbound(buf.duplicate());
+ channel.finish();
+
+ Assert.assertEquals(exceptionsCaught.get(), 0);
+ Assert.assertEquals(messagesReceived.get(), 0);
+
+ //Case 2
+ buf = Unpooled.buffer();
+ setUp();
+
+ buf.writeByte(Protocol.HTTP_2_MAGIC);
+ buf.writeInt(22);
+
+ channel.writeInbound(buf.duplicate());
+ TimeUnit.MILLISECONDS.sleep(100);
+
+ buf = Unpooled.buffer();
+ buf.writeInt(12);
+ buf.writeByte(1);
+ buf.writeByte(1);
+ buf.writeShort(2);
+ buf.writeBytes(new byte[] {'c', 'd'});
+ buf.writeShort(2);
+ buf.writeBytes(new byte[] {'e', 'f'});
+ buf.writeShort(0);
+ buf.writeInt(2);
+ buf.writeBytes(new byte[] {'g', 'h'});
+
+ channel.writeInbound(buf.duplicate());
+ channel.finish();
+
+ Assert.assertEquals(exceptionsCaught.get(), 0);
+ Assert.assertEquals(messagesReceived.get(), 1);
+ }
+
+ @Test
+ public void decodeIllegalLengthTest() throws Exception {
+ ByteBuf buf = Unpooled.buffer();
+
+ buf.writeByte(Protocol.HTTP_2_MAGIC);
+ buf.writeInt(0);
+
+ buf.writeInt(12);
+ buf.writeByte(1);
+ buf.writeByte(1);
+ buf.writeBytes(new byte[] {'a', 'b'});
+ buf.writeBytes(new byte[] {'c', 'd'});
+ buf.writeBytes(new byte[] {'e', 'f'});
+ buf.writeInt(15);
+
+ channel.writeInbound(buf.duplicate());
+ channel.finish();
+
+ Assert.assertEquals(exceptionsCaught.get(), 1);
+ }
+
+ @Test
+ public void decodeTest() throws Exception {
+ ByteBuf buf = Unpooled.buffer();
+
+ buf.writeByte(Protocol.HTTP_2_MAGIC);
+ buf.writeInt(22);
+
+ buf.writeInt(12);
+ buf.writeByte(1);
+ buf.writeByte(1);
+ buf.writeShort(2);
+ buf.writeBytes(new byte[] {'c', 'd'});
+ buf.writeShort(2);
+ buf.writeBytes(new byte[] {'e', 'f'});
+ buf.writeShort(0);
+ buf.writeInt(2);
+ buf.writeBytes(new byte[] {'g', 'h'});
+
+ channel.writeInbound(buf.duplicate());
+ channel.finish();
+
+ Assert.assertEquals(exceptionsCaught.get(), 0);
+ Assert.assertEquals(messagesReceived.get(), 1);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializersTest.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializersTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializersTest.java
new file mode 100644
index 0000000..2aea401
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/protocol/serializer/SerializersTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.protocol.serializer;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.remoting.api.serializable.Serializer;
+import org.apache.rocketmq.remoting.common.TypePresentation;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.msgpack.annotation.Message;
+
+/**
+ *
+ */
+public class SerializersTest {
+ private static MsgPackSerializer msgPackSerializer;
+ private static Kryo3Serializer kryo3Serializer;
+ private static JsonSerializer jsonSerializer;
+
+ @BeforeClass
+ public static void init() {
+ msgPackSerializer = new MsgPackSerializer();
+ kryo3Serializer = new Kryo3Serializer();
+ jsonSerializer = new JsonSerializer();
+ }
+
+ @Test
+ public void msgPackSerializerTest() {
+ runOneByOne(msgPackSerializer);
+ }
+
+ @Test
+ public void kyroSerializerTest() {
+ runOneByOne(kryo3Serializer);
+ }
+
+ @Test
+ public void fastJsonSerializerTest() {
+ runOneByOne(jsonSerializer);
+ }
+
+ private void runOneByOne(Serializer serializer) {
+ listStringTest(serializer);
+ listModelTest(serializer);
+ modelTest(serializer);
+ mapTest(serializer);
+ listAndMapTest(serializer);
+ arrayTest(serializer);
+ }
+
+ private void listStringTest(Serializer serializer) {
+ // Create serialize objects.
+ List<String> src = new ArrayList<>();
+ src.add("msgpack");
+ src.add("kumofs");
+ src.add("viver");
+
+ ByteBuffer srcBuf = serializer.encode(src);
+ List<String> dst = serializer.decode(srcBuf.array(), new TypePresentation<List<String>>() {
+ });
+ Assert.assertEquals(dst, src);
+
+ List<List<String>> ll = new ArrayList<>();
+ ll.add(src);
+
+ srcBuf = serializer.encode(ll);
+ List<List<String>> llDst = serializer.decode(srcBuf.array(), new TypePresentation<List<List<String>>>() {
+ });
+ Assert.assertEquals(llDst, ll);
+
+ }
+
+ private void listModelTest(Serializer serializer) {
+ // Create serialize objects.
+ List<Model> src = new ArrayList<>();
+ src.add(new Model(10));
+ src.add(new Model(12));
+ src.add(new Model(14));
+
+ ByteBuffer srcBuf = serializer.encode(src);
+ List<Model> dst = serializer.decode(srcBuf.array(), new TypePresentation<List<Model>>() {
+ });
+ Assert.assertEquals(dst, src);
+ }
+
+ private void modelTest(Serializer serializer) {
+ Model src = new Model(123);
+
+ ByteBuffer srcBuf = serializer.encode(src);
+
+ Model dst = serializer.decode(srcBuf.array(), Model.class);
+ Assert.assertEquals(dst, src);
+ }
+
+ private void mapTest(Serializer serializer) {
+ Map<String, Model> src = new HashMap<>();
+
+ src.put("foo", new Model(123));
+ src.put("bar", new Model(234));
+
+ ByteBuffer srcBuf = serializer.encode(src);
+
+ Map<String, Model> dst = serializer.decode(srcBuf.array(), new TypePresentation<Map<String, Model>>() {
+ });
+ Assert.assertEquals(dst, src);
+ }
+
+ private void listAndMapTest(Serializer serializer) {
+ Map<String, List<Model>> src = new HashMap<>();
+
+ List<Model> list = new ArrayList<>();
+ list.add(new Model(123));
+ list.add(new Model(456));
+ src.put("foo", list);
+
+ ByteBuffer srcBuf = serializer.encode(src);
+
+ Map<String, List<Model>> dst = serializer.decode(srcBuf.array(), new TypePresentation<Map<String, List<Model>>>() {
+ });
+ Assert.assertEquals(dst, src);
+ }
+
+ private void arrayTest(Serializer serializer) {
+ Model[] models = new Model[3];
+ models[0] = new Model(1);
+ models[1] = new Model(2);
+ models[2] = new Model(3);
+
+ ByteBuffer srcBuf = serializer.encode(models);
+
+ Model[] models1 = serializer.decode(srcBuf.array(), Model[].class);
+ Assert.assertArrayEquals(models1, models);
+
+ List<Model[]> arrayInList = new LinkedList<>();
+ arrayInList.add(models);
+ srcBuf = serializer.encode(arrayInList);
+ List<Model[]> arrayInList1 = serializer.decode(srcBuf.array(), new TypePresentation<List<Model[]>>() {
+ });
+ Assert.assertArrayEquals(arrayInList.get(0), arrayInList1.get(0));
+ }
+}
+
+@Message
+class Model {
+ private int a;
+
+ private boolean isCCP = Boolean.TRUE;
+ private byte multilingualer = Byte.MAX_VALUE;
+ private short age = Short.MAX_VALUE;
+ private char education = Character.MAX_VALUE;
+ private int phoneNumber = Integer.MAX_VALUE;
+ private long anniversary = Long.MAX_VALUE;
+ private float cet4Score = Float.MAX_VALUE;
+ private double cet6Score = Double.MAX_VALUE;
+
+ private Map<String, Long> map = new HashMap<>();
+
+ //protected Date birthday = Calendar.getInstance().getTime();
+ private BigDecimal salary = BigDecimal.valueOf(11000.13);
+
+ // protected TimeZone location = Calendar.getInstance().getTimeZone();
+ //protected Timestamp location_time = new java.sql.Timestamp(Calendar.getInstance().getTimeInMillis());
+
+ //protected Locale locale = Locale.getDefault();
+ //protected EnumSet certificates = EnumSet.allOf(Certificates.class);
+ //protected BitSet qRCode = BitSet.valueOf(new long[]{123, 345});
+
+ public Model() {
+ init();
+ }
+
+ Model(final int a) {
+ this.a = a;
+ init();
+ }
+
+ private void init() {
+ map.put("Hehe", 123L);
+ map.put("grsgfg", 656L);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Model model = (Model) o;
+
+ if (a != model.a)
+ return false;
+ if (isCCP != model.isCCP)
+ return false;
+ if (multilingualer != model.multilingualer)
+ return false;
+ if (age != model.age)
+ return false;
+ if (education != model.education)
+ return false;
+ if (phoneNumber != model.phoneNumber)
+ return false;
+ if (anniversary != model.anniversary)
+ return false;
+ if (Float.compare(model.cet4Score, cet4Score) != 0)
+ return false;
+ if (Double.compare(model.cet6Score, cet6Score) != 0)
+ return false;
+ if (map != null ? !map.equals(model.map) : model.map != null)
+ return false;
+ return salary != null ? salary.equals(model.salary) : model.salary == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result;
+ long temp;
+ result = a;
+ result = 31 * result + (isCCP ? 1 : 0);
+ result = 31 * result + (int) multilingualer;
+ result = 31 * result + (int) age;
+ result = 31 * result + (int) education;
+ result = 31 * result + phoneNumber;
+ result = 31 * result + (int) (anniversary ^ (anniversary >>> 32));
+ result = 31 * result + (cet4Score != +0.0f ? Float.floatToIntBits(cet4Score) : 0);
+ temp = Double.doubleToLongBits(cet6Score);
+ result = 31 * result + (int) (temp ^ (temp >>> 32));
+ result = 31 * result + (map != null ? map.hashCode() : 0);
+ result = 31 * result + (salary != null ? salary.hashCode() : 0);
+ return result;
+ }
+
+ public int getA() {
+ return a;
+ }
+
+ public void setA(final int a) {
+ this.a = a;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/internal/BeanUtilsTest.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/internal/BeanUtilsTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/internal/BeanUtilsTest.java
new file mode 100644
index 0000000..c86bc4c
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/internal/BeanUtilsTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.internal;
+
+import java.util.Properties;
+import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BeanUtilsTest {
+ private Properties properties = new Properties();
+ private final static String CLIENT_CONFIG_PREFIX = "client.";
+ private final static String SERVER_CONFIG_PREFIX = "server.";
+ private final static String TCP_CONFIG_PREFIX = "tcp.";
+ private final static String CONNECTION_CHANNEL_IDLE_SECONDS = "connection.channel.idle.seconds";
+ private final static String CLIENT_ASYNC_CALLBACK_EXECUTOR_THREADS = CLIENT_CONFIG_PREFIX + "async.callback.executor.threads";
+ private final static String CLIENT_POOLEDBYTEBUF_ALLOCATOR_ENABLE = CLIENT_CONFIG_PREFIX + "pooled.bytebuf.allocator.enable";
+ private final static String SERVER_LISTEN_PORT = SERVER_CONFIG_PREFIX + "listen.port";
+ private final static String TCP_SO_REUSE_ADDRESS = TCP_CONFIG_PREFIX + "so.reuse.address";
+ private final static String TCP_SO_KEEPALIVE = TCP_CONFIG_PREFIX + "so.keep.alive";
+ private final static String TCP_SO_NODELAY = TCP_CONFIG_PREFIX + "so.no.delay";
+ private final static String TCP_SO_SNDBUF_SIZE = TCP_CONFIG_PREFIX + "so.snd.buf.size";
+ private final static String TCP_SO_RCVBUF_SIZE = TCP_CONFIG_PREFIX + "so.rcv.buf.size";
+ private final static String TCP_SO_BACKLOG_SIZE = TCP_CONFIG_PREFIX + "so.backlog.size";
+ private final static String TCP_SO_TIMEOUT = TCP_CONFIG_PREFIX + "so.timeout";
+ private final static String TCP_SO_LINGER = TCP_CONFIG_PREFIX + "so.linger";
+
+ @Before
+ public void init() {
+ properties.put(CONNECTION_CHANNEL_IDLE_SECONDS, 3);
+
+ properties.put(CLIENT_ASYNC_CALLBACK_EXECUTOR_THREADS, 10);
+ properties.put(CLIENT_POOLEDBYTEBUF_ALLOCATOR_ENABLE, true);
+
+ properties.put(SERVER_LISTEN_PORT, 900);
+
+ properties.put(TCP_SO_REUSE_ADDRESS, false);
+ properties.put(TCP_SO_KEEPALIVE, false);
+ properties.put(TCP_SO_NODELAY, false);
+ properties.put(TCP_SO_SNDBUF_SIZE, 100);
+ properties.put(TCP_SO_RCVBUF_SIZE, 100);
+ properties.put(TCP_SO_BACKLOG_SIZE, 100);
+ properties.put(TCP_SO_TIMEOUT, 5000);
+ properties.put(TCP_SO_LINGER, 100);
+
+ properties.put(ClientConfig.STRING_TEST, "kaka");
+ }
+
+ @Test
+ public void populateTest() {
+ ClientConfig config = BeanUtils.populate(properties, ClientConfig.class);
+
+ //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+ Assert.assertEquals(config.getConnectionChannelIdleSeconds(), 3);
+
+ Assert.assertEquals(config.getClientAsyncCallbackExecutorThreads(), 10);
+ Assert.assertTrue(config.isClientPooledBytebufAllocatorEnable());
+
+ Assert.assertEquals(config.getServerListenPort(), 900);
+
+ Assert.assertFalse(config.isTcpSoReuseAddress());
+
+ Assert.assertFalse(config.isTcpSoKeepAlive());
+ Assert.assertFalse(config.isTcpSoNoDelay());
+ Assert.assertEquals(config.getTcpSoSndBufSize(), 100);
+ Assert.assertEquals(config.getTcpSoRcvBufSize(), 100);
+ Assert.assertEquals(config.getTcpSoBacklogSize(), 100);
+ Assert.assertEquals(config.getTcpSoTimeout(), 5000);
+ Assert.assertEquals(config.getTcpSoLinger(), 100);
+
+ Assert.assertEquals(config.getStringTest(), "kaka");
+ }
+
+ @Test
+ public void populateExistObj() {
+ ClientConfig config = new ClientConfig();
+ config.setServerListenPort(8118);
+
+ Assert.assertEquals(config.getServerListenPort(), 8118);
+
+ config = BeanUtils.populate(properties, config);
+
+ Assert.assertEquals(config.getConnectionChannelIdleSeconds(), 3);
+
+ Assert.assertEquals(config.getClientAsyncCallbackExecutorThreads(), 10);
+ Assert.assertTrue(config.isClientPooledBytebufAllocatorEnable());
+
+ Assert.assertEquals(config.getServerListenPort(), 900);
+
+ Assert.assertFalse(config.isTcpSoReuseAddress());
+ Assert.assertFalse(config.isTcpSoKeepAlive());
+ Assert.assertFalse(config.isTcpSoNoDelay());
+ Assert.assertEquals(config.getTcpSoSndBufSize(), 100);
+ Assert.assertEquals(config.getTcpSoRcvBufSize(), 100);
+ Assert.assertEquals(config.getTcpSoBacklogSize(), 100);
+ Assert.assertEquals(config.getTcpSoTimeout(), 5000);
+ Assert.assertEquals(config.getTcpSoLinger(), 100);
+
+ Assert.assertEquals(config.getStringTest(), "kaka");
+ }
+
+ public static class ClientConfig extends RemotingConfig {
+ public final static String STRING_TEST = "string.test";
+ String stringTest = "foobar";
+
+ public ClientConfig() {
+ }
+
+ public String getStringTest() {
+ return stringTest;
+ }
+
+ public void setStringTest(String stringTest) {
+ this.stringTest = stringTest;
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/internal/ExceptionUtilsTest.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/internal/ExceptionUtilsTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/internal/ExceptionUtilsTest.java
new file mode 100644
index 0000000..bb46558
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/internal/ExceptionUtilsTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.internal;
+
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ExceptionUtilsTest {
+
+ @Test(expected = Error.class)
+ public void getStackTraceTest() {
+ try {
+ try {
+ throw new IllegalArgumentException("Faked exception 1");
+ } catch (IllegalArgumentException e) {
+ System.out.println(ExceptionUtils.getStackTrace(e));
+ throw new RuntimeException(e);
+ }
+ } catch (RuntimeException e) {
+ System.out.println(ExceptionUtils.getStackTrace(e));
+ throw new Error(e);
+ }
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/internal/JvmUtilsTest.java
----------------------------------------------------------------------
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/internal/JvmUtilsTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/internal/JvmUtilsTest.java
new file mode 100644
index 0000000..475bf64
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/internal/JvmUtilsTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.internal;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class JvmUtilsTest {
+
+ @Test
+ public void getProcessIdTest() {
+ Assert.assertTrue(JvmUtils.getProcessId() > 0);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-api/pom.xml
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-api/pom.xml b/remoting-core/rpc-api/pom.xml
new file mode 100644
index 0000000..ad1ab62
--- /dev/null
+++ b/remoting-core/rpc-api/pom.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>remoting-core</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rpc-api</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>remoting-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/MethodType.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/MethodType.java b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/MethodType.java
new file mode 100644
index 0000000..d67c018
--- /dev/null
+++ b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/MethodType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.annotation;
+
+public enum MethodType {
+ /**
+ * Mark a RemoteMethod as a synchronous method.
+ */
+ SYNC,
+
+ /**
+ * Mark a RemoteMethod as a asynchronous method.
+ */
+ ASYNC,
+
+ /**
+ * Mark a RemoteMethod as a oneway method.
+ */
+ ONEWAY
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/RemoteMethod.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/RemoteMethod.java b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/RemoteMethod.java
new file mode 100644
index 0000000..a545342
--- /dev/null
+++ b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/RemoteMethod.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RemoteMethod {
+ String name() default "";
+
+ /**
+ * The API version (NOT the product version), composed as a dot delimited
+ * string with major, minor, and patch level components.
+ * <pre>
+ * - Major: Incremented for backward incompatible changes. An example would
+ * be changes to the number or disposition of method arguments.
+ * - Minor: Incremented for backward compatible changes. An example would
+ * be the addition of a new (optional) method.
+ * - Patch: Incremented for bug fixes. The patch level should be increased
+ * for every edit that doesn't result in a change to major/minor.
+ * </pre>
+ * See the Semantic Versioning Specification (SemVer) http://semver.org.
+ *
+ * @return the string version presentation
+ */
+ String version() default "1.0.0";
+
+ String description() default "";
+
+ MethodType type() default MethodType.SYNC;
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/RemoteService.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/RemoteService.java b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/RemoteService.java
new file mode 100644
index 0000000..461d92e
--- /dev/null
+++ b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/RemoteService.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RemoteService {
+ String name();
+
+ /**
+ * The API version (NOT the product version), composed as a dot delimited
+ * string with major, minor, and patch level components.
+ * <pre>
+ * - Major: Incremented for backward incompatible changes. An example would
+ * be changes to the number or disposition of method arguments.
+ * - Minor: Incremented for backward compatible changes. An example would
+ * be the addition of a new (optional) method.
+ * - Patch: Incremented for bug fixes. The patch level should be increased
+ * for every edit that doesn't result in a change to major/minor.
+ * </pre>
+ * See the Semantic Versioning Specification (SemVer) http://semver.org.
+ *
+ * @return the string version presentation
+ */
+ String version() default "1.0.0";
+
+ String description() default "";
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/VisibleForInternal.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/VisibleForInternal.java b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/VisibleForInternal.java
new file mode 100644
index 0000000..fe9986d
--- /dev/null
+++ b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/annotation/VisibleForInternal.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotates a program element that exists, or is more widely visible than otherwise necessary, only
+ * for internal communication.
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface VisibleForInternal {
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/AdvancedClient.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/AdvancedClient.java b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/AdvancedClient.java
new file mode 100644
index 0000000..881dc68
--- /dev/null
+++ b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/AdvancedClient.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.rpc.api;
+
+public interface AdvancedClient {
+ <T> T callSync(String address,
+ final String serviceCode,
+ final String version,
+ final Object[] parameter,
+ final Class<T> responseType) throws Exception;
+
+ <T> Promise<T> callAsync(String address,
+ final String serviceCode,
+ final String version,
+ final Object[] parameter,
+ final Class<T> responseType) throws Exception;
+
+ void callOneway(String address,
+ final String serviceCode,
+ final String version,
+ final Object[] parameter) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/AdvancedServer.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/AdvancedServer.java b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/AdvancedServer.java
new file mode 100644
index 0000000..3dd4188
--- /dev/null
+++ b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/AdvancedServer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.rpc.api;
+
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+
+public interface AdvancedServer {
+ <T> T callSync(final RemotingChannel channel,
+ final String serviceCode,
+ final String version,
+ final Object[] parameter,
+ final Class<T> responseType) throws Exception;
+
+ <T> Promise<T> callAsync(final RemotingChannel channel,
+ final String serviceCode,
+ final String version,
+ final Object[] parameter,
+ final Class<T> responseType) throws Exception;
+
+ void callOneway(final RemotingChannel channel,
+ final String serviceCode,
+ final String version,
+ final Object[] parameter) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/Promise.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/Promise.java b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/Promise.java
new file mode 100644
index 0000000..ec94be3
--- /dev/null
+++ b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/Promise.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.api;
+
+/**
+ * A {@code Promise} represents the result of an asynchronous computation. Methods are provided to check if the
+ * computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can
+ * only be retrieved using method {@code get} when the computation has completed, blocking if necessary until it is
+ * ready. Cancellation is performed by the {@code cancel} method. Additional methods are provided to determine if the
+ * task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If
+ * you would like to use a {@code Promise} for the sake of cancellability but not provide a usable result, you can
+ * declare types of the form {@code Promise<?>} and return {@code null} as a result of the underlying task.
+ *
+ * @since 1.0.0
+ */
+public interface Promise<V> {
+
+ /**
+ * Attempts to cancel execution of this task. This attempt will fail if the task has already completed, has already
+ * been cancelled, or could not be cancelled for some other reason. If successful, and this task has not started
+ * when {@code cancel} is called, this task should never run. If the task has already started, then the {@code
+ * mayInterruptIfRunning} parameter determines whether the thread executing this task should be interrupted in an
+ * attempt to stop the task.
+ * <p>
+ * After this method returns, subsequent calls to {@link #isDone} will always return {@code true}. Subsequent calls
+ * to {@link #isCancelled} will always return {@code true} if this method returned {@code true}.
+ *
+ * @param mayInterruptIfRunning {@code true} if the thread executing this task should be interrupted; otherwise,
+ * in-progress tasks are allowed to complete
+ * @return {@code false} if the task could not be cancelled, typically because it has already completed normally;
+ * {@code true} otherwise
+ */
+ boolean cancel(boolean mayInterruptIfRunning);
+
+ /**
+ * Returns {@code true} if this task was cancelled before it completed
+ * normally.
+ *
+ * @return {@code true} if this task was cancelled before it completed
+ */
+ boolean isCancelled();
+
+ /**
+ * Returns {@code true} if this task completed.
+ * <p>
+ * Completion may be due to normal termination, an exception, or
+ * cancellation -- in all of these cases, this method will return
+ * {@code true}.
+ *
+ * @return {@code true} if this task completed
+ */
+ boolean isDone();
+
+ /**
+ * Waits if necessary for the computation to complete, and then
+ * retrieves its result.
+ *
+ * @return the computed result
+ */
+ V get();
+
+ /**
+ * Waits if necessary for at most the given time for the computation
+ * to complete, and then retrieves its result, if available.
+ *
+ * @param timeout the maximum time to wait
+ * @return the computed result <p> if the computation was cancelled
+ */
+ V get(long timeout);
+
+ /**
+ * Set the value to this promise and mark it completed if set successfully.
+ *
+ * @param value Value
+ * @return Whether set is success
+ */
+ boolean set(V value);
+
+ /**
+ * Marks this promise as a failure and notifies all listeners.
+ *
+ * @param cause the cause
+ * @return Whether set is success
+ */
+ boolean setFailure(Throwable cause);
+
+ /**
+ * Adds the specified listener to this promise. The specified listener is notified when this promise is done. If
+ * this promise is already completed, the specified listener will be notified immediately.
+ *
+ * @param listener PromiseListener
+ */
+ void addListener(PromiseListener<V> listener);
+
+ /**
+ * @return a throwable caught by the promise
+ */
+ Throwable getThrowable();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/PromiseListener.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/PromiseListener.java b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/PromiseListener.java
new file mode 100644
index 0000000..88dd731
--- /dev/null
+++ b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/PromiseListener.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.api;
+
+/**
+ * A listener that is called back when a Promise is done.
+ * {@code PromiseListener} instances are attached to {@link Promise} by passing
+ * them in to {@link Promise#addListener(PromiseListener)}.
+ *
+ * @since 1.0.0
+ */
+public interface PromiseListener<V> {
+ /**
+ * Invoked when the operation associated with the {@code Promise} has been completed successfully.
+ *
+ * @param promise the source {@code Promise} which called this callback
+ */
+ void operationCompleted(Promise<V> promise);
+
+ /**
+ * Invoked when the operation associated with the {@code Promise} has been completed unsuccessfully.
+ *
+ * @param promise the source {@code Promise} which called this callback
+ */
+ void operationFailed(Promise<V> promise);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/SimpleClient.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/SimpleClient.java b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/SimpleClient.java
new file mode 100644
index 0000000..ece1f60
--- /dev/null
+++ b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/SimpleClient.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.api;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import org.apache.rocketmq.remoting.api.ObjectLifecycle;
+
+public interface SimpleClient extends ObjectLifecycle {
+ <T> T bind(Class<T> service, String address, Properties properties); //keyValue
+
+ void publish(Object service);
+
+ void publish(Object service, ExecutorService executorService);
+
+ AdvancedClient advancedClient();
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/SimpleServer.java
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/SimpleServer.java b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/SimpleServer.java
new file mode 100644
index 0000000..4bf5372
--- /dev/null
+++ b/remoting-core/rpc-api/src/main/java/org/apache/rocketmq/rpc/api/SimpleServer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.rpc.api;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import org.apache.rocketmq.remoting.api.ObjectLifecycle;
+import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
+
+public interface SimpleServer extends ObjectLifecycle {
+ <T> T bind(final Class<T> service, final RemotingChannel channel, final Properties properties);
+
+ AdvancedServer advancedServer();
+
+ void publish(final Object service);
+
+ void publish(Object service, ExecutorService executorService);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0b88e66f/remoting-core/rpc-impl/pom.xml
----------------------------------------------------------------------
diff --git a/remoting-core/rpc-impl/pom.xml b/remoting-core/rpc-impl/pom.xml
new file mode 100644
index 0000000..162d0b8
--- /dev/null
+++ b/remoting-core/rpc-impl/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>remoting-core</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rpc-impl</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rpc-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>remoting-impl</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file