You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:32 UTC

[34/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-186] Implement the OpenMessaging specification 0.1.0-alpha version

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
new file mode 100644
index 0000000..84b6c2d
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.rocketmq.promise;
+
+public enum FutureState {
+    /**
+     * the task is doing
+     **/
+    DOING(0),
+    /**
+     * the task is done
+     **/
+    DONE(1),
+    /**
+     * ths task is cancelled
+     **/
+    CANCELLED(2);
+
+    public final int value;
+
+    private FutureState(int value) {
+        this.value = value;
+    }
+
+    public boolean isCancelledState() {
+        return this == CANCELLED;
+    }
+
+    public boolean isDoneState() {
+        return this == DONE;
+    }
+
+    public boolean isDoingState() {
+        return this == DOING;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
new file mode 100644
index 0000000..104d3d9
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.utils;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.slf4j.Logger;
+
+public final class BeanUtils {
+    final static Logger log = ClientLogger.getLog();
+
+    /**
+     * Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
+     */
+    private static Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<Class<?>, Class<?>>();
+
+    static {
+        primitiveWrapperMap.put(Boolean.TYPE, Boolean.class);
+        primitiveWrapperMap.put(Byte.TYPE, Byte.class);
+        primitiveWrapperMap.put(Character.TYPE, Character.class);
+        primitiveWrapperMap.put(Short.TYPE, Short.class);
+        primitiveWrapperMap.put(Integer.TYPE, Integer.class);
+        primitiveWrapperMap.put(Long.TYPE, Long.class);
+        primitiveWrapperMap.put(Double.TYPE, Double.class);
+        primitiveWrapperMap.put(Float.TYPE, Float.class);
+        primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
+    }
+
+    private static Map<Class<?>, Class<?>> wrapperMap = new HashMap<Class<?>, Class<?>>();
+
+    static {
+        for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) {
+            final Class<?> wrapperClass = primitiveWrapperMap.get(primitiveClass);
+            if (!primitiveClass.equals(wrapperClass)) {
+                wrapperMap.put(wrapperClass, primitiveClass);
+            }
+        }
+        wrapperMap.put(String.class, String.class);
+    }
+
+    /**
+     * <p>Populate the JavaBeans properties of the specified bean, based on
+     * the specified name/value pairs.  This method uses Java reflection APIs
+     * to identify corresponding "property setter" method names, and deals
+     * with setter arguments of type <Code>String</Code>, <Code>boolean</Code>,
+     * <Code>int</Code>, <Code>long</Code>, <Code>float</Code>, and
+     * <Code>double</Code>.</p>
+     *
+     * <p>The particular setter method to be called for each property is
+     * determined using the usual JavaBeans introspection mechanisms.  Thus,
+     * you may identify custom setter methods using a BeanInfo class that is
+     * associated with the class of the bean itself.  If no such BeanInfo
+     * class is available, the standard method name conversion ("set" plus
+     * the capitalized name of the property in question) is used.</p>
+     *
+     * <p><strong>NOTE</strong>:  It is contrary to the JavaBeans Specification
+     * to have more than one setter method (with different argument
+     * signatures) for the same property.</p>
+     *
+     * @param clazz JavaBean class whose properties are being populated
+     * @param properties Map keyed by property name, with the corresponding (String or String[]) value(s) to be set
+     * @param <T> Class type
+     * @return Class instance
+     */
+    public static <T> T populate(final Properties properties, final Class<T> clazz) {
+        T obj = null;
+        try {
+            obj = clazz.newInstance();
+            return populate(properties, obj);
+        } catch (Throwable e) {
+            log.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+
+    public static <T> T populate(final KeyValue properties, final Class<T> clazz) {
+        T obj = null;
+        try {
+            obj = clazz.newInstance();
+            return populate(properties, obj);
+        } catch (Throwable e) {
+            log.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+
+    public static Class<?> getMethodClass(Class<?> clazz, String methodName) {
+        Method[] methods = clazz.getMethods();
+        for (Method method : methods) {
+            if (method.getName().equalsIgnoreCase(methodName)) {
+                return method.getParameterTypes()[0];
+            }
+        }
+        return null;
+    }
+
+    public static void setProperties(Class<?> clazz, Object obj, String methodName,
+        Object value) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+        Class<?> parameterClass = getMethodClass(clazz, methodName);
+        Method setterMethod = clazz.getMethod(methodName, parameterClass);
+        if (parameterClass == Boolean.TYPE) {
+            setterMethod.invoke(obj, Boolean.valueOf(value.toString()));
+        } else if (parameterClass == Integer.TYPE) {
+            setterMethod.invoke(obj, Integer.valueOf(value.toString()));
+        } else if (parameterClass == Double.TYPE) {
+            setterMethod.invoke(obj, Double.valueOf(value.toString()));
+        } else if (parameterClass == Float.TYPE) {
+            setterMethod.invoke(obj, Float.valueOf(value.toString()));
+        } else if (parameterClass == Long.TYPE) {
+            setterMethod.invoke(obj, Long.valueOf(value.toString()));
+        } else
+            setterMethod.invoke(obj, value);
+    }
+
+    public static <T> T populate(final Properties properties, final T obj) {
+        Class<?> clazz = obj.getClass();
+        try {
+
+            Set<Map.Entry<Object, Object>> entries = properties.entrySet();
+            for (Map.Entry<Object, Object> entry : entries) {
+                String entryKey = entry.getKey().toString();
+                String[] keyGroup = entryKey.split("\\.");
+                for (int i = 0; i < keyGroup.length; i++) {
+                    keyGroup[i] = keyGroup[i].toLowerCase();
+                    keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
+                }
+                String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
+                try {
+                    setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue());
+                } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
+                    //ignored...
+                }
+            }
+        } catch (RuntimeException e) {
+            log.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+
+    public static <T> T populate(final KeyValue properties, final T obj) {
+        Class<?> clazz = obj.getClass();
+        try {
+
+            final Set<String> keySet = properties.keySet();
+            for (String key : keySet) {
+                String[] keyGroup = key.split("\\.");
+                for (int i = 0; i < keyGroup.length; i++) {
+                    keyGroup[i] = keyGroup[i].toLowerCase();
+                    keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
+                }
+                String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
+                try {
+                    setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getString(key));
+                } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
+                    //ignored...
+                }
+            }
+        } catch (RuntimeException e) {
+            log.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
new file mode 100644
index 0000000..60c8408
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.utils;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.OMS;
+import io.openmessaging.SendResult;
+import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.domain.SendResultImpl;
+import java.lang.reflect.Field;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
+
+public class OMSUtil {
+
+    /**
+     * Builds a OMS client instance name.
+     *
+     * @return a unique instance name
+     */
+    public static String buildInstanceName() {
+        return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime();
+    }
+
+    public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
+        org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
+        rmqMessage.setBody(omsMessage.getBody());
+
+        KeyValue headers = omsMessage.headers();
+        KeyValue properties = omsMessage.properties();
+
+        //All destinations in RocketMQ use Topic
+        if (headers.containsKey(MessageHeader.TOPIC)) {
+            rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
+            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        } else {
+            rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
+            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
+        }
+
+        for (String key : properties.keySet()) {
+            MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
+        }
+
+        //Headers has a high priority
+        for (String key : headers.keySet()) {
+            MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
+        }
+
+        return rmqMessage;
+    }
+
+    public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) {
+        BytesMessage omsMsg = new BytesMessageImpl();
+        omsMsg.setBody(rmqMsg.getBody());
+
+        KeyValue headers = omsMsg.headers();
+        KeyValue properties = omsMsg.properties();
+
+        final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
+
+        for (final Map.Entry<String, String> entry : entries) {
+            if (isOMSHeader(entry.getKey())) {
+                headers.put(entry.getKey(), entry.getValue());
+            } else {
+                properties.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
+        if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
+            rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) {
+            omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
+        } else {
+            omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
+        }
+        omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys());
+        omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
+        omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
+        omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
+        omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
+        return omsMsg;
+    }
+
+    public static boolean isOMSHeader(String value) {
+        for (Field field : MessageHeader.class.getDeclaredFields()) {
+            try {
+                if (field.get(MessageHeader.class).equals(value)) {
+                    return true;
+                }
+            } catch (IllegalAccessException e) {
+                return false;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
+     */
+    public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) {
+        assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK);
+        return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue());
+    }
+
+    public static KeyValue buildKeyValue(KeyValue... keyValues) {
+        KeyValue keyValue = OMS.newKeyValue();
+        for (KeyValue properties : keyValues) {
+            for (String key : properties.keySet()) {
+                keyValue.put(key, properties.getString(key));
+            }
+        }
+        return keyValue;
+    }
+
+    /**
+     * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
+     */
+    public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
+        return new Iterator<T>() {
+            Iterator<T> iterator = new Iterator<T>() {
+                @Override
+                public synchronized boolean hasNext() {
+                    return false;
+                }
+
+                @Override
+                public synchronized T next() {
+                    throw new NoSuchElementException();
+                }
+
+                @Override
+                public synchronized void remove() {
+                    //Ignore
+                }
+            };
+
+            @Override
+            public synchronized boolean hasNext() {
+                return iterator.hasNext() || iterable.iterator().hasNext();
+            }
+
+            @Override
+            public synchronized T next() {
+                if (!iterator.hasNext()) {
+                    iterator = iterable.iterator();
+                    if (!iterator.hasNext()) {
+                        throw new NoSuchElementException();
+                    }
+                }
+                return iterator.next();
+            }
+
+            @Override
+            public synchronized void remove() {
+                iterator.remove();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
new file mode 100644
index 0000000..ae4d3ed
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LocalMessageCacheTest {
+    private LocalMessageCache localMessageCache;
+    @Mock
+    private DefaultMQPullConsumer rocketmqPullConsume;
+    @Mock
+    private ConsumeRequest consumeRequest;
+
+    @Before
+    public void init() {
+        ClientConfig clientConfig = new ClientConfig();
+        clientConfig.setRmqPullMessageBatchNums(512);
+        clientConfig.setRmqPullMessageCacheCapacity(1024);
+        localMessageCache = new LocalMessageCache(rocketmqPullConsume, clientConfig);
+    }
+
+    @Test
+    public void testNextPullBatchNums() throws Exception {
+        assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(512);
+        for (int i = 0; i < 513; i++) {
+            localMessageCache.submitConsumeRequest(consumeRequest);
+        }
+        assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(511);
+    }
+
+    @Test
+    public void testNextPullOffset() throws Exception {
+        MessageQueue messageQueue = new MessageQueue();
+        when(rocketmqPullConsume.fetchConsumeOffset(any(MessageQueue.class), anyBoolean()))
+            .thenReturn(123L);
+        assertThat(localMessageCache.nextPullOffset(new MessageQueue())).isEqualTo(123L);
+    }
+
+    @Test
+    public void testUpdatePullOffset() throws Exception {
+        MessageQueue messageQueue = new MessageQueue();
+        localMessageCache.updatePullOffset(messageQueue, 124L);
+        assertThat(localMessageCache.nextPullOffset(messageQueue)).isEqualTo(124L);
+    }
+
+    @Test
+    public void testSubmitConsumeRequest() throws Exception {
+        byte [] body = new byte[]{'1', '2', '3'};
+        MessageExt consumedMsg = new MessageExt();
+        consumedMsg.setMsgId("NewMsgId");
+        consumedMsg.setBody(body);
+        consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg.setTopic("HELLO_QUEUE");
+
+        when(consumeRequest.getMessageExt()).thenReturn(consumedMsg);
+        localMessageCache.submitConsumeRequest(consumeRequest);
+        assertThat(localMessageCache.poll()).isEqualTo(consumedMsg);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
new file mode 100644
index 0000000..277a5c6
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.PullConsumer;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PullConsumerImplTest {
+    private PullConsumer consumer;
+    private String queueName = "HELLO_QUEUE";
+
+    @Mock
+    private DefaultMQPullConsumer rocketmqPullConsumer;
+    private LocalMessageCache localMessageCache =
+        spy(new LocalMessageCache(rocketmqPullConsumer, new ClientConfig()));
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        consumer = messagingAccessPoint.createPullConsumer(queueName,
+            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
+
+        Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
+        field.setAccessible(true);
+        field.set(consumer, rocketmqPullConsumer); //Replace
+
+        field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
+        field.setAccessible(true);
+        field.set(consumer, localMessageCache);
+
+        messagingAccessPoint.startup();
+        consumer.startup();
+    }
+
+    @Test
+    public void testPoll() {
+        final byte[] testBody = new byte[] {'a', 'b'};
+        MessageExt consumedMsg = new MessageExt();
+        consumedMsg.setMsgId("NewMsgId");
+        consumedMsg.setBody(testBody);
+        consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg.setTopic(queueName);
+
+        when(localMessageCache.poll()).thenReturn(consumedMsg);
+
+        Message message = consumer.poll();
+        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+        assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
+    }
+
+    @Test
+    public void testPoll_WithTimeout() {
+        //There is a default timeout value, @see ClientConfig#omsOperationTimeout.
+        Message message = consumer.poll();
+        assertThat(message).isNull();
+
+        message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100));
+        assertThat(message).isNull();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
new file mode 100644
index 0000000..882e57e
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessageListener;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PushConsumer;
+import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PushConsumerImplTest {
+    private PushConsumer consumer;
+
+    @Mock
+    private DefaultMQPushConsumer rocketmqPushConsumer;
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        consumer = messagingAccessPoint.createPushConsumer(
+            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
+
+        Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
+        field.setAccessible(true);
+        DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(consumer);
+        field.set(consumer, rocketmqPushConsumer); //Replace
+
+        when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener());
+        messagingAccessPoint.startup();
+        consumer.startup();
+    }
+
+    @Test
+    public void testConsumeMessage() {
+        final byte[] testBody = new byte[] {'a', 'b'};
+
+        MessageExt consumedMsg = new MessageExt();
+        consumedMsg.setMsgId("NewMsgId");
+        consumedMsg.setBody(testBody);
+        consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg.setTopic("HELLO_QUEUE");
+        consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
+            @Override
+            public void onMessage(final Message message, final ReceivedMessageContext context) {
+                assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+                assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
+                context.ack();
+            }
+        });
+        ((MessageListenerConcurrently) rocketmqPushConsumer
+            .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
new file mode 100644
index 0000000..1db80c3
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.Producer;
+import io.openmessaging.exception.OMSRuntimeException;
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProducerImplTest {
+    private Producer producer;
+
+    @Mock
+    private DefaultMQProducer rocketmqProducer;
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        producer = messagingAccessPoint.createProducer();
+
+        Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
+        field.setAccessible(true);
+        field.set(producer, rocketmqProducer);
+
+        messagingAccessPoint.startup();
+        producer.startup();
+    }
+
+    @Test
+    public void testSend_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("TestMsgID");
+        sendResult.setSendStatus(SendStatus.SEND_OK);
+        when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
+        io.openmessaging.SendResult omsResult =
+            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+
+        assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
+    }
+
+    @Test
+    public void testSend_Not_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        SendResult sendResult = new SendResult();
+        sendResult.setSendStatus(SendStatus.FLUSH_DISK_TIMEOUT);
+
+        when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
+        try {
+            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+            failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
+        } catch (Exception e) {
+            assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
+        }
+    }
+
+    @Test
+    public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
+        try {
+            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+            failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
+        } catch (Exception e) {
+            assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
new file mode 100644
index 0000000..823fe01
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.SequenceProducer;
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SequenceProducerImplTest {
+
+    private SequenceProducer producer;
+
+    @Mock
+    private DefaultMQProducer rocketmqProducer;
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        producer = messagingAccessPoint.createSequenceProducer();
+
+        Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
+        field.setAccessible(true);
+        field.set(producer, rocketmqProducer);
+
+        messagingAccessPoint.startup();
+        producer.startup();
+    }
+
+    @Test
+    public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("TestMsgID");
+        sendResult.setSendStatus(SendStatus.SEND_OK);
+        when(rocketmqProducer.send(ArgumentMatchers.<Message>anyList())).thenReturn(sendResult);
+        when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
+        final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
+        producer.send(message);
+        producer.commit();
+        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID");
+    }
+
+    @Test
+    public void testRollback() {
+        when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
+        final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
+        producer.send(message);
+        producer.rollback();
+        producer.commit(); //Commit nothing.
+        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
new file mode 100644
index 0000000..2240ff2
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.promise;
+
+import io.openmessaging.Promise;
+import io.openmessaging.PromiseListener;
+import io.openmessaging.exception.OMSRuntimeException;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+
+public class DefaultPromiseTest {
+    private Promise<String> promise;
+
+    @Before
+    public void init() {
+        promise = new DefaultPromise<>();
+    }
+
+    @Test
+    public void testIsCancelled() throws Exception {
+        assertThat(promise.isCancelled()).isEqualTo(false);
+    }
+
+    @Test
+    public void testIsDone() throws Exception {
+        assertThat(promise.isDone()).isEqualTo(false);
+        promise.set("Done");
+        assertThat(promise.isDone()).isEqualTo(true);
+    }
+
+    @Test
+    public void testGet() throws Exception {
+        promise.set("Done");
+        assertThat(promise.get()).isEqualTo("Done");
+    }
+
+    @Test
+    public void testGet_WithTimeout() throws Exception {
+        try {
+            promise.get(100);
+            failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
+        } catch (OMSRuntimeException e) {
+            assertThat(e).hasMessageContaining("Get request result is timeout or interrupted");
+        }
+    }
+
+    @Test
+    public void testAddListener() throws Exception {
+        promise.addListener(new PromiseListener<String>() {
+            @Override
+            public void operationCompleted(final Promise<String> promise) {
+                assertThat(promise.get()).isEqualTo("Done");
+            }
+
+            @Override
+            public void operationFailed(final Promise<String> promise) {
+
+            }
+        });
+        promise.set("Done");
+    }
+
+    @Test
+    public void testAddListener_ListenerAfterSet() throws Exception {
+        promise.set("Done");
+        promise.addListener(new PromiseListener<String>() {
+            @Override
+            public void operationCompleted(final Promise<String> promise) {
+                assertThat(promise.get()).isEqualTo("Done");
+            }
+
+            @Override
+            public void operationFailed(final Promise<String> promise) {
+
+            }
+        });
+    }
+
+    @Test
+    public void testAddListener_WithException_ListenerAfterSet() throws Exception {
+        final Throwable exception = new OMSRuntimeException("-1", "Test Error");
+        promise.setFailure(exception);
+        promise.addListener(new PromiseListener<String>() {
+            @Override
+            public void operationCompleted(final Promise<String> promise) {
+            }
+
+            @Override
+            public void operationFailed(final Promise<String> promise) {
+                assertThat(promise.getThrowable()).isEqualTo(exception);
+            }
+        });
+    }
+
+    @Test
+    public void testAddListener_WithException() throws Exception {
+        final Throwable exception = new OMSRuntimeException("-1", "Test Error");
+        promise.addListener(new PromiseListener<String>() {
+            @Override
+            public void operationCompleted(final Promise<String> promise) {
+            }
+
+            @Override
+            public void operationFailed(final Promise<String> promise) {
+                assertThat(promise.getThrowable()).isEqualTo(exception);
+            }
+        });
+        promise.setFailure(exception);
+    }
+
+    @Test
+    public void getThrowable() throws Exception {
+        assertThat(promise.getThrowable()).isNull();
+        Throwable exception = new OMSRuntimeException("-1", "Test Error");
+        promise.setFailure(exception);
+        assertThat(promise.getThrowable()).isEqualTo(exception);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
new file mode 100644
index 0000000..71ca11c
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.utils;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.OMS;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BeanUtilsTest {
+    private KeyValue properties = OMS.newKeyValue();
+
+    public static class CustomizedConfig extends ClientConfig {
+        final static String STRING_TEST = "string.test";
+        String stringTest = "foobar";
+
+        final static String DOUBLE_TEST = "double.test";
+        double doubleTest = 123.0;
+
+        final static String LONG_TEST = "long.test";
+        long longTest = 123L;
+
+        String getStringTest() {
+            return stringTest;
+        }
+
+        public void setStringTest(String stringTest) {
+            this.stringTest = stringTest;
+        }
+
+        double getDoubleTest() {
+            return doubleTest;
+        }
+
+        public void setDoubleTest(final double doubleTest) {
+            this.doubleTest = doubleTest;
+        }
+
+        long getLongTest() {
+            return longTest;
+        }
+
+        public void setLongTest(final long longTest) {
+            this.longTest = longTest;
+        }
+
+        CustomizedConfig() {
+        }
+    }
+
+    @Before
+    public void init() {
+        properties.put(NonStandardKeys.MAX_REDELIVERY_TIMES, 120);
+        properties.put(CustomizedConfig.STRING_TEST, "kaka");
+        properties.put(NonStandardKeys.CONSUMER_GROUP, "Default_Consumer_Group");
+        properties.put(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT, 101);
+
+        properties.put(CustomizedConfig.LONG_TEST, 1234567890L);
+        properties.put(CustomizedConfig.DOUBLE_TEST, 10.234);
+    }
+
+    @Test
+    public void testPopulate() {
+        CustomizedConfig config = BeanUtils.populate(properties, CustomizedConfig.class);
+
+        //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+        Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120);
+        Assert.assertEquals(config.getStringTest(), "kaka");
+        Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group");
+        Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101);
+        Assert.assertEquals(config.getLongTest(), 1234567890L);
+        Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001);
+    }
+
+    @Test
+    public void testPopulate_ExistObj() {
+        CustomizedConfig config = new CustomizedConfig();
+        config.setOmsConsumerId("NewConsumerId");
+
+        Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId");
+
+        config = BeanUtils.populate(properties, config);
+
+        //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+        Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120);
+        Assert.assertEquals(config.getStringTest(), "kaka");
+        Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group");
+        Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101);
+        Assert.assertEquals(config.getLongTest(), 1234567890L);
+        Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c8e84adf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 05ead63..25e4c84 100644
--- a/pom.xml
+++ b/pom.xml
@@ -181,6 +181,7 @@
         <module>filter</module>
         <module>test</module>
         <module>distribution</module>
+        <module>openmessaging</module>
     </modules>
 
     <build>
@@ -617,6 +618,11 @@
                 <artifactId>guava</artifactId>
                 <version>19.0</version>
             </dependency>
+            <dependency>
+                <groupId>io.openmessaging</groupId>
+                <artifactId>openmessaging-api</artifactId>
+                <version>0.1.0-alpha</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>