You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/05/24 08:51:12 UTC

[1/2] incubator-rocketmq git commit: [ROCKETMQ-186] Implement the OpenMessaging specification 0.1.0-alpha version

Repository: incubator-rocketmq
Updated Branches:
  refs/heads/develop 1630f277b -> 1d966b50c


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
new file mode 100644
index 0000000..84b6c2d
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.rocketmq.promise;
+
+public enum FutureState {
+    /**
+     * the task is doing
+     **/
+    DOING(0),
+    /**
+     * the task is done
+     **/
+    DONE(1),
+    /**
+     * ths task is cancelled
+     **/
+    CANCELLED(2);
+
+    public final int value;
+
+    private FutureState(int value) {
+        this.value = value;
+    }
+
+    public boolean isCancelledState() {
+        return this == CANCELLED;
+    }
+
+    public boolean isDoneState() {
+        return this == DONE;
+    }
+
+    public boolean isDoingState() {
+        return this == DOING;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
new file mode 100644
index 0000000..104d3d9
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.utils;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.slf4j.Logger;
+
+public final class BeanUtils {
+    final static Logger log = ClientLogger.getLog();
+
+    /**
+     * Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
+     */
+    private static Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<Class<?>, Class<?>>();
+
+    static {
+        primitiveWrapperMap.put(Boolean.TYPE, Boolean.class);
+        primitiveWrapperMap.put(Byte.TYPE, Byte.class);
+        primitiveWrapperMap.put(Character.TYPE, Character.class);
+        primitiveWrapperMap.put(Short.TYPE, Short.class);
+        primitiveWrapperMap.put(Integer.TYPE, Integer.class);
+        primitiveWrapperMap.put(Long.TYPE, Long.class);
+        primitiveWrapperMap.put(Double.TYPE, Double.class);
+        primitiveWrapperMap.put(Float.TYPE, Float.class);
+        primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
+    }
+
+    private static Map<Class<?>, Class<?>> wrapperMap = new HashMap<Class<?>, Class<?>>();
+
+    static {
+        for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) {
+            final Class<?> wrapperClass = primitiveWrapperMap.get(primitiveClass);
+            if (!primitiveClass.equals(wrapperClass)) {
+                wrapperMap.put(wrapperClass, primitiveClass);
+            }
+        }
+        wrapperMap.put(String.class, String.class);
+    }
+
+    /**
+     * <p>Populate the JavaBeans properties of the specified bean, based on
+     * the specified name/value pairs.  This method uses Java reflection APIs
+     * to identify corresponding "property setter" method names, and deals
+     * with setter arguments of type <Code>String</Code>, <Code>boolean</Code>,
+     * <Code>int</Code>, <Code>long</Code>, <Code>float</Code>, and
+     * <Code>double</Code>.</p>
+     *
+     * <p>The particular setter method to be called for each property is
+     * determined using the usual JavaBeans introspection mechanisms.  Thus,
+     * you may identify custom setter methods using a BeanInfo class that is
+     * associated with the class of the bean itself.  If no such BeanInfo
+     * class is available, the standard method name conversion ("set" plus
+     * the capitalized name of the property in question) is used.</p>
+     *
+     * <p><strong>NOTE</strong>:  It is contrary to the JavaBeans Specification
+     * to have more than one setter method (with different argument
+     * signatures) for the same property.</p>
+     *
+     * @param clazz JavaBean class whose properties are being populated
+     * @param properties Map keyed by property name, with the corresponding (String or String[]) value(s) to be set
+     * @param <T> Class type
+     * @return Class instance
+     */
+    public static <T> T populate(final Properties properties, final Class<T> clazz) {
+        T obj = null;
+        try {
+            obj = clazz.newInstance();
+            return populate(properties, obj);
+        } catch (Throwable e) {
+            log.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+
+    public static <T> T populate(final KeyValue properties, final Class<T> clazz) {
+        T obj = null;
+        try {
+            obj = clazz.newInstance();
+            return populate(properties, obj);
+        } catch (Throwable e) {
+            log.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+
+    public static Class<?> getMethodClass(Class<?> clazz, String methodName) {
+        Method[] methods = clazz.getMethods();
+        for (Method method : methods) {
+            if (method.getName().equalsIgnoreCase(methodName)) {
+                return method.getParameterTypes()[0];
+            }
+        }
+        return null;
+    }
+
+    public static void setProperties(Class<?> clazz, Object obj, String methodName,
+        Object value) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+        Class<?> parameterClass = getMethodClass(clazz, methodName);
+        Method setterMethod = clazz.getMethod(methodName, parameterClass);
+        if (parameterClass == Boolean.TYPE) {
+            setterMethod.invoke(obj, Boolean.valueOf(value.toString()));
+        } else if (parameterClass == Integer.TYPE) {
+            setterMethod.invoke(obj, Integer.valueOf(value.toString()));
+        } else if (parameterClass == Double.TYPE) {
+            setterMethod.invoke(obj, Double.valueOf(value.toString()));
+        } else if (parameterClass == Float.TYPE) {
+            setterMethod.invoke(obj, Float.valueOf(value.toString()));
+        } else if (parameterClass == Long.TYPE) {
+            setterMethod.invoke(obj, Long.valueOf(value.toString()));
+        } else
+            setterMethod.invoke(obj, value);
+    }
+
+    public static <T> T populate(final Properties properties, final T obj) {
+        Class<?> clazz = obj.getClass();
+        try {
+
+            Set<Map.Entry<Object, Object>> entries = properties.entrySet();
+            for (Map.Entry<Object, Object> entry : entries) {
+                String entryKey = entry.getKey().toString();
+                String[] keyGroup = entryKey.split("\\.");
+                for (int i = 0; i < keyGroup.length; i++) {
+                    keyGroup[i] = keyGroup[i].toLowerCase();
+                    keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
+                }
+                String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
+                try {
+                    setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue());
+                } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
+                    //ignored...
+                }
+            }
+        } catch (RuntimeException e) {
+            log.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+
+    public static <T> T populate(final KeyValue properties, final T obj) {
+        Class<?> clazz = obj.getClass();
+        try {
+
+            final Set<String> keySet = properties.keySet();
+            for (String key : keySet) {
+                String[] keyGroup = key.split("\\.");
+                for (int i = 0; i < keyGroup.length; i++) {
+                    keyGroup[i] = keyGroup[i].toLowerCase();
+                    keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
+                }
+                String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
+                try {
+                    setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getString(key));
+                } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
+                    //ignored...
+                }
+            }
+        } catch (RuntimeException e) {
+            log.warn("Error occurs !", e);
+        }
+        return obj;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
new file mode 100644
index 0000000..60c8408
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.utils;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.OMS;
+import io.openmessaging.SendResult;
+import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.domain.SendResultImpl;
+import java.lang.reflect.Field;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
+
+public class OMSUtil {
+
+    /**
+     * Builds a OMS client instance name.
+     *
+     * @return a unique instance name
+     */
+    public static String buildInstanceName() {
+        return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime();
+    }
+
+    public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
+        org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
+        rmqMessage.setBody(omsMessage.getBody());
+
+        KeyValue headers = omsMessage.headers();
+        KeyValue properties = omsMessage.properties();
+
+        //All destinations in RocketMQ use Topic
+        if (headers.containsKey(MessageHeader.TOPIC)) {
+            rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
+            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        } else {
+            rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
+            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
+        }
+
+        for (String key : properties.keySet()) {
+            MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
+        }
+
+        //Headers has a high priority
+        for (String key : headers.keySet()) {
+            MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
+        }
+
+        return rmqMessage;
+    }
+
+    public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) {
+        BytesMessage omsMsg = new BytesMessageImpl();
+        omsMsg.setBody(rmqMsg.getBody());
+
+        KeyValue headers = omsMsg.headers();
+        KeyValue properties = omsMsg.properties();
+
+        final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
+
+        for (final Map.Entry<String, String> entry : entries) {
+            if (isOMSHeader(entry.getKey())) {
+                headers.put(entry.getKey(), entry.getValue());
+            } else {
+                properties.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
+        if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
+            rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) {
+            omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
+        } else {
+            omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
+        }
+        omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys());
+        omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
+        omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
+        omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
+        omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
+        return omsMsg;
+    }
+
+    public static boolean isOMSHeader(String value) {
+        for (Field field : MessageHeader.class.getDeclaredFields()) {
+            try {
+                if (field.get(MessageHeader.class).equals(value)) {
+                    return true;
+                }
+            } catch (IllegalAccessException e) {
+                return false;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
+     */
+    public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) {
+        assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK);
+        return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue());
+    }
+
+    public static KeyValue buildKeyValue(KeyValue... keyValues) {
+        KeyValue keyValue = OMS.newKeyValue();
+        for (KeyValue properties : keyValues) {
+            for (String key : properties.keySet()) {
+                keyValue.put(key, properties.getString(key));
+            }
+        }
+        return keyValue;
+    }
+
+    /**
+     * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
+     */
+    public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
+        return new Iterator<T>() {
+            Iterator<T> iterator = new Iterator<T>() {
+                @Override
+                public synchronized boolean hasNext() {
+                    return false;
+                }
+
+                @Override
+                public synchronized T next() {
+                    throw new NoSuchElementException();
+                }
+
+                @Override
+                public synchronized void remove() {
+                    //Ignore
+                }
+            };
+
+            @Override
+            public synchronized boolean hasNext() {
+                return iterator.hasNext() || iterable.iterator().hasNext();
+            }
+
+            @Override
+            public synchronized T next() {
+                if (!iterator.hasNext()) {
+                    iterator = iterable.iterator();
+                    if (!iterator.hasNext()) {
+                        throw new NoSuchElementException();
+                    }
+                }
+                return iterator.next();
+            }
+
+            @Override
+            public synchronized void remove() {
+                iterator.remove();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
new file mode 100644
index 0000000..ae4d3ed
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LocalMessageCacheTest {
+    private LocalMessageCache localMessageCache;
+    @Mock
+    private DefaultMQPullConsumer rocketmqPullConsume;
+    @Mock
+    private ConsumeRequest consumeRequest;
+
+    @Before
+    public void init() {
+        ClientConfig clientConfig = new ClientConfig();
+        clientConfig.setRmqPullMessageBatchNums(512);
+        clientConfig.setRmqPullMessageCacheCapacity(1024);
+        localMessageCache = new LocalMessageCache(rocketmqPullConsume, clientConfig);
+    }
+
+    @Test
+    public void testNextPullBatchNums() throws Exception {
+        assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(512);
+        for (int i = 0; i < 513; i++) {
+            localMessageCache.submitConsumeRequest(consumeRequest);
+        }
+        assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(511);
+    }
+
+    @Test
+    public void testNextPullOffset() throws Exception {
+        MessageQueue messageQueue = new MessageQueue();
+        when(rocketmqPullConsume.fetchConsumeOffset(any(MessageQueue.class), anyBoolean()))
+            .thenReturn(123L);
+        assertThat(localMessageCache.nextPullOffset(new MessageQueue())).isEqualTo(123L);
+    }
+
+    @Test
+    public void testUpdatePullOffset() throws Exception {
+        MessageQueue messageQueue = new MessageQueue();
+        localMessageCache.updatePullOffset(messageQueue, 124L);
+        assertThat(localMessageCache.nextPullOffset(messageQueue)).isEqualTo(124L);
+    }
+
+    @Test
+    public void testSubmitConsumeRequest() throws Exception {
+        byte [] body = new byte[]{'1', '2', '3'};
+        MessageExt consumedMsg = new MessageExt();
+        consumedMsg.setMsgId("NewMsgId");
+        consumedMsg.setBody(body);
+        consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg.setTopic("HELLO_QUEUE");
+
+        when(consumeRequest.getMessageExt()).thenReturn(consumedMsg);
+        localMessageCache.submitConsumeRequest(consumeRequest);
+        assertThat(localMessageCache.poll()).isEqualTo(consumedMsg);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
new file mode 100644
index 0000000..277a5c6
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.PullConsumer;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PullConsumerImplTest {
+    private PullConsumer consumer;
+    private String queueName = "HELLO_QUEUE";
+
+    @Mock
+    private DefaultMQPullConsumer rocketmqPullConsumer;
+    private LocalMessageCache localMessageCache =
+        spy(new LocalMessageCache(rocketmqPullConsumer, new ClientConfig()));
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        consumer = messagingAccessPoint.createPullConsumer(queueName,
+            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
+
+        Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
+        field.setAccessible(true);
+        field.set(consumer, rocketmqPullConsumer); //Replace
+
+        field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
+        field.setAccessible(true);
+        field.set(consumer, localMessageCache);
+
+        messagingAccessPoint.startup();
+        consumer.startup();
+    }
+
+    @Test
+    public void testPoll() {
+        final byte[] testBody = new byte[] {'a', 'b'};
+        MessageExt consumedMsg = new MessageExt();
+        consumedMsg.setMsgId("NewMsgId");
+        consumedMsg.setBody(testBody);
+        consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg.setTopic(queueName);
+
+        when(localMessageCache.poll()).thenReturn(consumedMsg);
+
+        Message message = consumer.poll();
+        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+        assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
+    }
+
+    @Test
+    public void testPoll_WithTimeout() {
+        //There is a default timeout value, @see ClientConfig#omsOperationTimeout.
+        Message message = consumer.poll();
+        assertThat(message).isNull();
+
+        message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100));
+        assertThat(message).isNull();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
new file mode 100644
index 0000000..882e57e
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessageListener;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PushConsumer;
+import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import java.lang.reflect.Field;
+import java.util.Collections;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PushConsumerImplTest {
+    private PushConsumer consumer;
+
+    @Mock
+    private DefaultMQPushConsumer rocketmqPushConsumer;
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        consumer = messagingAccessPoint.createPushConsumer(
+            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
+
+        Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
+        field.setAccessible(true);
+        DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(consumer);
+        field.set(consumer, rocketmqPushConsumer); //Replace
+
+        when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener());
+        messagingAccessPoint.startup();
+        consumer.startup();
+    }
+
+    @Test
+    public void testConsumeMessage() {
+        final byte[] testBody = new byte[] {'a', 'b'};
+
+        MessageExt consumedMsg = new MessageExt();
+        consumedMsg.setMsgId("NewMsgId");
+        consumedMsg.setBody(testBody);
+        consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
+        consumedMsg.setTopic("HELLO_QUEUE");
+        consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
+            @Override
+            public void onMessage(final Message message, final ReceivedMessageContext context) {
+                assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
+                assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
+                context.ack();
+            }
+        });
+        ((MessageListenerConcurrently) rocketmqPushConsumer
+            .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
new file mode 100644
index 0000000..1db80c3
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.Producer;
+import io.openmessaging.exception.OMSRuntimeException;
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProducerImplTest {
+    private Producer producer;
+
+    @Mock
+    private DefaultMQProducer rocketmqProducer;
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        producer = messagingAccessPoint.createProducer();
+
+        Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
+        field.setAccessible(true);
+        field.set(producer, rocketmqProducer);
+
+        messagingAccessPoint.startup();
+        producer.startup();
+    }
+
+    @Test
+    public void testSend_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("TestMsgID");
+        sendResult.setSendStatus(SendStatus.SEND_OK);
+        when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
+        io.openmessaging.SendResult omsResult =
+            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+
+        assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
+    }
+
+    @Test
+    public void testSend_Not_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        SendResult sendResult = new SendResult();
+        sendResult.setSendStatus(SendStatus.FLUSH_DISK_TIMEOUT);
+
+        when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
+        try {
+            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+            failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
+        } catch (Exception e) {
+            assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
+        }
+    }
+
+    @Test
+    public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
+        try {
+            producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
+            failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
+        } catch (Exception e) {
+            assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
new file mode 100644
index 0000000..823fe01
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.SequenceProducer;
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SequenceProducerImplTest {
+
+    private SequenceProducer producer;
+
+    @Mock
+    private DefaultMQProducer rocketmqProducer;
+
+    @Before
+    public void init() throws NoSuchFieldException, IllegalAccessException {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+        producer = messagingAccessPoint.createSequenceProducer();
+
+        Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
+        field.setAccessible(true);
+        field.set(producer, rocketmqProducer);
+
+        messagingAccessPoint.startup();
+        producer.startup();
+    }
+
+    @Test
+    public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        SendResult sendResult = new SendResult();
+        sendResult.setMsgId("TestMsgID");
+        sendResult.setSendStatus(SendStatus.SEND_OK);
+        when(rocketmqProducer.send(ArgumentMatchers.<Message>anyList())).thenReturn(sendResult);
+        when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
+        final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
+        producer.send(message);
+        producer.commit();
+        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID");
+    }
+
+    @Test
+    public void testRollback() {
+        when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
+        final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
+        producer.send(message);
+        producer.rollback();
+        producer.commit(); //Commit nothing.
+        assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
new file mode 100644
index 0000000..2240ff2
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.promise;
+
+import io.openmessaging.Promise;
+import io.openmessaging.PromiseListener;
+import io.openmessaging.exception.OMSRuntimeException;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+
+public class DefaultPromiseTest {
+    private Promise<String> promise;
+
+    @Before
+    public void init() {
+        promise = new DefaultPromise<>();
+    }
+
+    @Test
+    public void testIsCancelled() throws Exception {
+        assertThat(promise.isCancelled()).isEqualTo(false);
+    }
+
+    @Test
+    public void testIsDone() throws Exception {
+        assertThat(promise.isDone()).isEqualTo(false);
+        promise.set("Done");
+        assertThat(promise.isDone()).isEqualTo(true);
+    }
+
+    @Test
+    public void testGet() throws Exception {
+        promise.set("Done");
+        assertThat(promise.get()).isEqualTo("Done");
+    }
+
+    @Test
+    public void testGet_WithTimeout() throws Exception {
+        try {
+            promise.get(100);
+            failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
+        } catch (OMSRuntimeException e) {
+            assertThat(e).hasMessageContaining("Get request result is timeout or interrupted");
+        }
+    }
+
+    @Test
+    public void testAddListener() throws Exception {
+        promise.addListener(new PromiseListener<String>() {
+            @Override
+            public void operationCompleted(final Promise<String> promise) {
+                assertThat(promise.get()).isEqualTo("Done");
+            }
+
+            @Override
+            public void operationFailed(final Promise<String> promise) {
+
+            }
+        });
+        promise.set("Done");
+    }
+
+    @Test
+    public void testAddListener_ListenerAfterSet() throws Exception {
+        promise.set("Done");
+        promise.addListener(new PromiseListener<String>() {
+            @Override
+            public void operationCompleted(final Promise<String> promise) {
+                assertThat(promise.get()).isEqualTo("Done");
+            }
+
+            @Override
+            public void operationFailed(final Promise<String> promise) {
+
+            }
+        });
+    }
+
+    @Test
+    public void testAddListener_WithException_ListenerAfterSet() throws Exception {
+        final Throwable exception = new OMSRuntimeException("-1", "Test Error");
+        promise.setFailure(exception);
+        promise.addListener(new PromiseListener<String>() {
+            @Override
+            public void operationCompleted(final Promise<String> promise) {
+            }
+
+            @Override
+            public void operationFailed(final Promise<String> promise) {
+                assertThat(promise.getThrowable()).isEqualTo(exception);
+            }
+        });
+    }
+
+    @Test
+    public void testAddListener_WithException() throws Exception {
+        final Throwable exception = new OMSRuntimeException("-1", "Test Error");
+        promise.addListener(new PromiseListener<String>() {
+            @Override
+            public void operationCompleted(final Promise<String> promise) {
+            }
+
+            @Override
+            public void operationFailed(final Promise<String> promise) {
+                assertThat(promise.getThrowable()).isEqualTo(exception);
+            }
+        });
+        promise.setFailure(exception);
+    }
+
+    @Test
+    public void getThrowable() throws Exception {
+        assertThat(promise.getThrowable()).isNull();
+        Throwable exception = new OMSRuntimeException("-1", "Test Error");
+        promise.setFailure(exception);
+        assertThat(promise.getThrowable()).isEqualTo(exception);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
new file mode 100644
index 0000000..71ca11c
--- /dev/null
+++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.utils;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.OMS;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BeanUtilsTest {
+    private KeyValue properties = OMS.newKeyValue();
+
+    public static class CustomizedConfig extends ClientConfig {
+        final static String STRING_TEST = "string.test";
+        String stringTest = "foobar";
+
+        final static String DOUBLE_TEST = "double.test";
+        double doubleTest = 123.0;
+
+        final static String LONG_TEST = "long.test";
+        long longTest = 123L;
+
+        String getStringTest() {
+            return stringTest;
+        }
+
+        public void setStringTest(String stringTest) {
+            this.stringTest = stringTest;
+        }
+
+        double getDoubleTest() {
+            return doubleTest;
+        }
+
+        public void setDoubleTest(final double doubleTest) {
+            this.doubleTest = doubleTest;
+        }
+
+        long getLongTest() {
+            return longTest;
+        }
+
+        public void setLongTest(final long longTest) {
+            this.longTest = longTest;
+        }
+
+        CustomizedConfig() {
+        }
+    }
+
+    @Before
+    public void init() {
+        properties.put(NonStandardKeys.MAX_REDELIVERY_TIMES, 120);
+        properties.put(CustomizedConfig.STRING_TEST, "kaka");
+        properties.put(NonStandardKeys.CONSUMER_GROUP, "Default_Consumer_Group");
+        properties.put(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT, 101);
+
+        properties.put(CustomizedConfig.LONG_TEST, 1234567890L);
+        properties.put(CustomizedConfig.DOUBLE_TEST, 10.234);
+    }
+
+    @Test
+    public void testPopulate() {
+        CustomizedConfig config = BeanUtils.populate(properties, CustomizedConfig.class);
+
+        //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+        Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120);
+        Assert.assertEquals(config.getStringTest(), "kaka");
+        Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group");
+        Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101);
+        Assert.assertEquals(config.getLongTest(), 1234567890L);
+        Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001);
+    }
+
+    @Test
+    public void testPopulate_ExistObj() {
+        CustomizedConfig config = new CustomizedConfig();
+        config.setOmsConsumerId("NewConsumerId");
+
+        Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId");
+
+        config = BeanUtils.populate(properties, config);
+
+        //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
+        Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120);
+        Assert.assertEquals(config.getStringTest(), "kaka");
+        Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group");
+        Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101);
+        Assert.assertEquals(config.getLongTest(), 1234567890L);
+        Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 05ead63..25e4c84 100644
--- a/pom.xml
+++ b/pom.xml
@@ -181,6 +181,7 @@
         <module>filter</module>
         <module>test</module>
         <module>distribution</module>
+        <module>openmessaging</module>
     </modules>
 
     <build>
@@ -617,6 +618,11 @@
                 <artifactId>guava</artifactId>
                 <version>19.0</version>
             </dependency>
+            <dependency>
+                <groupId>io.openmessaging</groupId>
+                <artifactId>openmessaging-api</artifactId>
+                <version>0.1.0-alpha</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>


[2/2] incubator-rocketmq git commit: [ROCKETMQ-186] Implement the OpenMessaging specification 0.1.0-alpha version

Posted by yu...@apache.org.
[ROCKETMQ-186] Implement the OpenMessaging specification 0.1.0-alpha version


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/1d966b50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/1d966b50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/1d966b50

Branch: refs/heads/develop
Commit: 1d966b50c2ec189ca4f1bf81420959a33159a8ad
Parents: 1630f27
Author: yukon <yu...@apache.org>
Authored: Wed May 24 16:50:51 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Wed May 24 16:50:51 2017 +0800

----------------------------------------------------------------------
 distribution/release-client.xml                 |   1 +
 distribution/release.xml                        |   1 +
 example/pom.xml                                 |   9 +
 .../example/openmessaging/SimpleProducer.java   |  76 +++++++
 .../openmessaging/SimplePullConsumer.java       |  58 +++++
 .../openmessaging/SimplePushConsumer.java       |  59 +++++
 openmessaging/pom.xml                           |  42 ++++
 .../rocketmq/MessagingAccessPointImpl.java      | 132 +++++++++++
 .../rocketmq/config/ClientConfig.java           | 194 ++++++++++++++++
 .../rocketmq/consumer/LocalMessageCache.java    | 213 +++++++++++++++++
 .../rocketmq/consumer/PullConsumerImpl.java     | 166 ++++++++++++++
 .../rocketmq/consumer/PushConsumerImpl.java     | 181 +++++++++++++++
 .../rocketmq/domain/BytesMessageImpl.java       | 108 +++++++++
 .../rocketmq/domain/ConsumeRequest.java         |  55 +++++
 .../rocketmq/domain/NonStandardKeys.java        |  30 +++
 .../rocketmq/domain/SendResultImpl.java         |  40 ++++
 .../rocketmq/producer/AbstractOMSProducer.java  | 138 +++++++++++
 .../rocketmq/producer/ProducerImpl.java         | 124 ++++++++++
 .../rocketmq/producer/SequenceProducerImpl.java |  95 ++++++++
 .../rocketmq/promise/DefaultPromise.java        | 227 +++++++++++++++++++
 .../rocketmq/promise/FutureState.java           |  51 +++++
 .../openmessaging/rocketmq/utils/BeanUtils.java | 185 +++++++++++++++
 .../openmessaging/rocketmq/utils/OMSUtil.java   | 182 +++++++++++++++
 .../consumer/LocalMessageCacheTest.java         |  89 ++++++++
 .../rocketmq/consumer/PullConsumerImplTest.java |  96 ++++++++
 .../rocketmq/consumer/PushConsumerImplTest.java |  87 +++++++
 .../rocketmq/producer/ProducerImplTest.java     | 101 +++++++++
 .../producer/SequenceProducerImplTest.java      |  86 +++++++
 .../rocketmq/promise/DefaultPromiseTest.java    | 136 +++++++++++
 .../rocketmq/utils/BeanUtilsTest.java           | 110 +++++++++
 pom.xml                                         |   6 +
 31 files changed, 3078 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/distribution/release-client.xml
----------------------------------------------------------------------
diff --git a/distribution/release-client.xml b/distribution/release-client.xml
index 46563eb..84d33a0 100644
--- a/distribution/release-client.xml
+++ b/distribution/release-client.xml
@@ -47,6 +47,7 @@
             <useAllReactorProjects>true</useAllReactorProjects>
             <includes>
                 <include>org.apache.rocketmq:rocketmq-client</include>
+                <include>org.apache.rocketmq:rocketmq-openmessaging</include>
             </includes>
             <binaries>
                 <outputDirectory>./</outputDirectory>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/distribution/release.xml
----------------------------------------------------------------------
diff --git a/distribution/release.xml b/distribution/release.xml
index 9e4ef2a..c67d23e 100644
--- a/distribution/release.xml
+++ b/distribution/release.xml
@@ -68,6 +68,7 @@
                 <include>org.apache.rocketmq:rocketmq-filtersrv</include>
                 <include>org.apache.rocketmq:rocketmq-example</include>
                 <include>org.apache.rocketmq:rocketmq-filter</include>
+                <include>org.apache.rocketmq:rocketmq-openmessaging</include>
             </includes>
             <binaries>
                 <outputDirectory>lib/</outputDirectory>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/example/pom.xml
----------------------------------------------------------------------
diff --git a/example/pom.xml b/example/pom.xml
index 785a4ca..840fa36 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -48,5 +48,14 @@
             <groupId>org.javassist</groupId>
             <artifactId>javassist</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>4.1.0-incubating-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
new file mode 100644
index 0000000..9d162ac
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.openmessaging;
+
+import io.openmessaging.Message;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.Producer;
+import io.openmessaging.Promise;
+import io.openmessaging.PromiseListener;
+import io.openmessaging.SendResult;
+import java.nio.charset.Charset;
+
+public class SimpleProducer {
+    public static void main(String[] args) {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+        final Producer producer = messagingAccessPoint.createProducer();
+
+        messagingAccessPoint.startup();
+        System.out.printf("MessagingAccessPoint startup OK%n");
+
+        producer.startup();
+        System.out.printf("Producer startup OK%n");
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                producer.shutdown();
+                messagingAccessPoint.shutdown();
+            }
+        }));
+
+        {
+            Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+            SendResult sendResult = producer.send(message);
+            //final Void aVoid = result.get(3000L);
+            System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
+        }
+
+        {
+            final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+            result.addListener(new PromiseListener<SendResult>() {
+                @Override
+                public void operationCompleted(Promise<SendResult> promise) {
+                    System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
+                }
+
+                @Override
+                public void operationFailed(Promise<SendResult> promise) {
+                    System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
+                }
+            });
+        }
+
+        {
+            producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+            System.out.printf("Send oneway message OK%n");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
new file mode 100644
index 0000000..8e06772
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.openmessaging;
+
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PullConsumer;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+
+public class SimplePullConsumer {
+    public static void main(String[] args) {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+        final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
+            OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
+
+        messagingAccessPoint.startup();
+        System.out.printf("MessagingAccessPoint startup OK%n");
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                consumer.shutdown();
+                messagingAccessPoint.shutdown();
+            }
+        }));
+
+        consumer.startup();
+        System.out.printf("Consumer startup OK%n");
+
+        while (true) {
+            Message message = consumer.poll();
+            if (message != null) {
+                String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
+                System.out.printf("Received one message: %s%n", msgId);
+                consumer.ack(msgId);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
new file mode 100644
index 0000000..b0935d4
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.openmessaging;
+
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessageListener;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PushConsumer;
+import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+
+public class SimplePushConsumer {
+    public static void main(String[] args) {
+        final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+            .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+        final PushConsumer consumer = messagingAccessPoint.
+            createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
+
+        messagingAccessPoint.startup();
+        System.out.printf("MessagingAccessPoint startup OK%n");
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                consumer.shutdown();
+                messagingAccessPoint.shutdown();
+            }
+        }));
+
+        consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
+            @Override
+            public void onMessage(final Message message, final ReceivedMessageContext context) {
+                System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
+                context.ack();
+            }
+        });
+
+        consumer.startup();
+        System.out.printf("Consumer startup OK%n");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/pom.xml
----------------------------------------------------------------------
diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml
new file mode 100644
index 0000000..e853642
--- /dev/null
+++ b/openmessaging/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-all</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>4.1.0-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-openmessaging</artifactId>
+    <name>rocketmq-openmessaging ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
new file mode 100644
index 0000000..65caf84
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq;
+
+import io.openmessaging.IterableConsumer;
+import io.openmessaging.KeyValue;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.Producer;
+import io.openmessaging.PullConsumer;
+import io.openmessaging.PushConsumer;
+import io.openmessaging.ResourceManager;
+import io.openmessaging.SequenceProducer;
+import io.openmessaging.ServiceEndPoint;
+import io.openmessaging.exception.OMSNotSupportedException;
+import io.openmessaging.observer.Observer;
+import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
+import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
+import io.openmessaging.rocketmq.producer.ProducerImpl;
+import io.openmessaging.rocketmq.producer.SequenceProducerImpl;
+import io.openmessaging.rocketmq.utils.OMSUtil;
+
+public class MessagingAccessPointImpl implements MessagingAccessPoint {
+    private final KeyValue accessPointProperties;
+
+    public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
+        this.accessPointProperties = accessPointProperties;
+    }
+
+    @Override
+    public KeyValue properties() {
+        return accessPointProperties;
+    }
+
+    @Override
+    public Producer createProducer() {
+        return new ProducerImpl(this.accessPointProperties);
+    }
+
+    @Override
+    public Producer createProducer(KeyValue properties) {
+        return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
+    }
+
+    @Override
+    public SequenceProducer createSequenceProducer() {
+        return new SequenceProducerImpl(this.accessPointProperties);
+    }
+
+    @Override
+    public SequenceProducer createSequenceProducer(KeyValue properties) {
+        return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
+    }
+
+    @Override
+    public PushConsumer createPushConsumer() {
+        return new PushConsumerImpl(accessPointProperties);
+    }
+
+    @Override
+    public PushConsumer createPushConsumer(KeyValue properties) {
+        return new PushConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
+    }
+
+    @Override
+    public PullConsumer createPullConsumer(String queueName) {
+        return new PullConsumerImpl(queueName, accessPointProperties);
+    }
+
+    @Override
+    public PullConsumer createPullConsumer(String queueName, KeyValue properties) {
+        return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties, properties));
+    }
+
+    @Override
+    public IterableConsumer createIterableConsumer(String queueName) {
+        throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version");
+    }
+
+    @Override
+    public IterableConsumer createIterableConsumer(String queueName, KeyValue properties) {
+        throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version");
+    }
+
+    @Override
+    public ResourceManager getResourceManager() {
+        throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version.");
+    }
+
+    @Override
+    public ServiceEndPoint createServiceEndPoint() {
+        throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version.");
+    }
+
+    @Override
+    public ServiceEndPoint createServiceEndPoint(KeyValue properties) {
+        throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version.");
+    }
+
+    @Override
+    public void addObserver(Observer observer) {
+        //Ignore
+    }
+
+    @Override
+    public void deleteObserver(Observer observer) {
+        //Ignore
+    }
+
+    @Override
+    public void startup() {
+        //Ignore
+    }
+
+    @Override
+    public void shutdown() {
+        //Ignore
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
new file mode 100644
index 0000000..7077c6d
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.config;
+
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+
+public class ClientConfig implements PropertyKeys, NonStandardKeys {
+    private String omsDriverImpl;
+    private String omsAccessPoints;
+    private String omsNamespace;
+    private String omsProducerId;
+    private String omsConsumerId;
+    private int omsOperationTimeout = 5000;
+    private String omsRoutingName;
+    private String omsOperatorName;
+    private String omsDstQueue;
+    private String omsSrcTopic;
+    private String rmqConsumerGroup;
+    private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP";
+    private int rmqMaxRedeliveryTimes = 16;
+    private int rmqMessageConsumeTimeout = 15; //In minutes
+    private int rmqMaxConsumeThreadNums = 64;
+    private int rmqMinConsumeThreadNums = 20;
+    private String rmqMessageDestination;
+    private int rmqPullMessageBatchNums = 32;
+    private int rmqPullMessageCacheCapacity = 1000;
+
+    public String getOmsDriverImpl() {
+        return omsDriverImpl;
+    }
+
+    public void setOmsDriverImpl(final String omsDriverImpl) {
+        this.omsDriverImpl = omsDriverImpl;
+    }
+
+    public String getOmsAccessPoints() {
+        return omsAccessPoints;
+    }
+
+    public void setOmsAccessPoints(final String omsAccessPoints) {
+        this.omsAccessPoints = omsAccessPoints;
+    }
+
+    public String getOmsNamespace() {
+        return omsNamespace;
+    }
+
+    public void setOmsNamespace(final String omsNamespace) {
+        this.omsNamespace = omsNamespace;
+    }
+
+    public String getOmsProducerId() {
+        return omsProducerId;
+    }
+
+    public void setOmsProducerId(final String omsProducerId) {
+        this.omsProducerId = omsProducerId;
+    }
+
+    public String getOmsConsumerId() {
+        return omsConsumerId;
+    }
+
+    public void setOmsConsumerId(final String omsConsumerId) {
+        this.omsConsumerId = omsConsumerId;
+    }
+
+    public int getOmsOperationTimeout() {
+        return omsOperationTimeout;
+    }
+
+    public void setOmsOperationTimeout(final int omsOperationTimeout) {
+        this.omsOperationTimeout = omsOperationTimeout;
+    }
+
+    public String getOmsRoutingName() {
+        return omsRoutingName;
+    }
+
+    public void setOmsRoutingName(final String omsRoutingName) {
+        this.omsRoutingName = omsRoutingName;
+    }
+
+    public String getOmsOperatorName() {
+        return omsOperatorName;
+    }
+
+    public void setOmsOperatorName(final String omsOperatorName) {
+        this.omsOperatorName = omsOperatorName;
+    }
+
+    public String getOmsDstQueue() {
+        return omsDstQueue;
+    }
+
+    public void setOmsDstQueue(final String omsDstQueue) {
+        this.omsDstQueue = omsDstQueue;
+    }
+
+    public String getOmsSrcTopic() {
+        return omsSrcTopic;
+    }
+
+    public void setOmsSrcTopic(final String omsSrcTopic) {
+        this.omsSrcTopic = omsSrcTopic;
+    }
+
+    public String getRmqConsumerGroup() {
+        return rmqConsumerGroup;
+    }
+
+    public void setRmqConsumerGroup(final String rmqConsumerGroup) {
+        this.rmqConsumerGroup = rmqConsumerGroup;
+    }
+
+    public String getRmqProducerGroup() {
+        return rmqProducerGroup;
+    }
+
+    public void setRmqProducerGroup(final String rmqProducerGroup) {
+        this.rmqProducerGroup = rmqProducerGroup;
+    }
+
+    public int getRmqMaxRedeliveryTimes() {
+        return rmqMaxRedeliveryTimes;
+    }
+
+    public void setRmqMaxRedeliveryTimes(final int rmqMaxRedeliveryTimes) {
+        this.rmqMaxRedeliveryTimes = rmqMaxRedeliveryTimes;
+    }
+
+    public int getRmqMessageConsumeTimeout() {
+        return rmqMessageConsumeTimeout;
+    }
+
+    public void setRmqMessageConsumeTimeout(final int rmqMessageConsumeTimeout) {
+        this.rmqMessageConsumeTimeout = rmqMessageConsumeTimeout;
+    }
+
+    public int getRmqMaxConsumeThreadNums() {
+        return rmqMaxConsumeThreadNums;
+    }
+
+    public void setRmqMaxConsumeThreadNums(final int rmqMaxConsumeThreadNums) {
+        this.rmqMaxConsumeThreadNums = rmqMaxConsumeThreadNums;
+    }
+
+    public int getRmqMinConsumeThreadNums() {
+        return rmqMinConsumeThreadNums;
+    }
+
+    public void setRmqMinConsumeThreadNums(final int rmqMinConsumeThreadNums) {
+        this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums;
+    }
+
+    public String getRmqMessageDestination() {
+        return rmqMessageDestination;
+    }
+
+    public void setRmqMessageDestination(final String rmqMessageDestination) {
+        this.rmqMessageDestination = rmqMessageDestination;
+    }
+
+    public int getRmqPullMessageBatchNums() {
+        return rmqPullMessageBatchNums;
+    }
+
+    public void setRmqPullMessageBatchNums(final int rmqPullMessageBatchNums) {
+        this.rmqPullMessageBatchNums = rmqPullMessageBatchNums;
+    }
+
+    public int getRmqPullMessageCacheCapacity() {
+        return rmqPullMessageCacheCapacity;
+    }
+
+    public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) {
+        this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
new file mode 100644
index 0000000..90f9e03
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.ServiceLifecycle;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.slf4j.Logger;
+
+class LocalMessageCache implements ServiceLifecycle {
+    private final BlockingQueue<ConsumeRequest> consumeRequestCache;
+    private final Map<String, ConsumeRequest> consumedRequest;
+    private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable;
+    private final DefaultMQPullConsumer rocketmqPullConsumer;
+    private final ClientConfig clientConfig;
+    private final ScheduledExecutorService cleanExpireMsgExecutors;
+
+    private final static Logger log = ClientLogger.getLog();
+
+    LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) {
+        consumeRequestCache = new LinkedBlockingQueue<>(clientConfig.getRmqPullMessageCacheCapacity());
+        this.consumedRequest = new ConcurrentHashMap<>();
+        this.pullOffsetTable = new ConcurrentHashMap<>();
+        this.rocketmqPullConsumer = rocketmqPullConsumer;
+        this.clientConfig = clientConfig;
+        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "OMS_CleanExpireMsgScheduledThread_"));
+    }
+
+    int nextPullBatchNums() {
+        return Math.min(clientConfig.getRmqPullMessageBatchNums(), consumeRequestCache.remainingCapacity());
+    }
+
+    long nextPullOffset(MessageQueue remoteQueue) {
+        if (!pullOffsetTable.containsKey(remoteQueue)) {
+            try {
+                pullOffsetTable.putIfAbsent(remoteQueue,
+                    rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
+            } catch (MQClientException e) {
+                log.error("A error occurred in fetch consume offset process.", e);
+            }
+        }
+        return pullOffsetTable.get(remoteQueue);
+    }
+
+    void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
+        pullOffsetTable.put(remoteQueue, nextPullOffset);
+    }
+
+    void submitConsumeRequest(ConsumeRequest consumeRequest) {
+        try {
+            consumeRequestCache.put(consumeRequest);
+        } catch (InterruptedException ignore) {
+        }
+    }
+
+    MessageExt poll() {
+        return poll(clientConfig.getOmsOperationTimeout());
+    }
+
+    MessageExt poll(final KeyValue properties) {
+        int currentPollTimeout = clientConfig.getOmsOperationTimeout();
+        if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
+            currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
+        }
+        return poll(currentPollTimeout);
+    }
+
+    private MessageExt poll(long timeout) {
+        try {
+            ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
+            if (consumeRequest != null) {
+                MessageExt messageExt = consumeRequest.getMessageExt();
+                consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
+                MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
+                consumedRequest.put(messageExt.getMsgId(), consumeRequest);
+                return messageExt;
+            }
+        } catch (InterruptedException ignore) {
+        }
+        return null;
+    }
+
+    void ack(final String messageId) {
+        ConsumeRequest consumeRequest = consumedRequest.remove(messageId);
+        if (consumeRequest != null) {
+            long offset = consumeRequest.getProcessQueue().removeMessage(Collections.singletonList(consumeRequest.getMessageExt()));
+            try {
+                rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
+            } catch (MQClientException e) {
+                log.error("A error occurred in update consume offset process.", e);
+            }
+        }
+    }
+
+    void ack(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) {
+        consumedRequest.remove(messageExt.getMsgId());
+        long offset = processQueue.removeMessage(Collections.singletonList(messageExt));
+        try {
+            rocketmqPullConsumer.updateConsumeOffset(messageQueue, offset);
+        } catch (MQClientException e) {
+            log.error("A error occurred in update consume offset process.", e);
+        }
+    }
+
+    @Override
+    public void startup() {
+        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                cleanExpireMsg();
+            }
+        }, clientConfig.getRmqMessageConsumeTimeout(), clientConfig.getRmqMessageConsumeTimeout(), TimeUnit.MINUTES);
+    }
+
+    @Override
+    public void shutdown() {
+        ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS);
+    }
+
+    private void cleanExpireMsg() {
+        for (final Map.Entry<MessageQueue, ProcessQueue> next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl()
+            .getRebalanceImpl().getProcessQueueTable().entrySet()) {
+            ProcessQueue pq = next.getValue();
+            MessageQueue mq = next.getKey();
+            ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
+            if (lockTreeMap == null) {
+                log.error("Gets tree map lock in process queue error, may be has compatibility issue");
+                return;
+            }
+
+            TreeMap<Long, MessageExt> msgTreeMap = pq.getMsgTreeMap();
+
+            int loop = msgTreeMap.size();
+            for (int i = 0; i < loop; i++) {
+                MessageExt msg = null;
+                try {
+                    lockTreeMap.readLock().lockInterruptibly();
+                    try {
+                        if (!msgTreeMap.isEmpty()) {
+                            msg = msgTreeMap.firstEntry().getValue();
+                            if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
+                                > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
+                                //Expired, ack and remove it.
+                            } else {
+                                break;
+                            }
+                        } else {
+                            break;
+                        }
+                    } finally {
+                        lockTreeMap.readLock().unlock();
+                    }
+                } catch (InterruptedException e) {
+                    log.error("Gets expired message exception", e);
+                }
+
+                try {
+                    rocketmqPullConsumer.sendMessageBack(msg, 3);
+                    log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
+                        msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
+                    ack(mq, pq, msg);
+                } catch (Exception e) {
+                    log.error("Send back expired msg exception", e);
+                }
+            }
+        }
+    }
+
+    private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) {
+        try {
+            return (ReadWriteLock) FieldUtils.readDeclaredField(pq, "lockTreeMap", true);
+        } catch (IllegalAccessException e) {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
new file mode 100644
index 0000000..8d396d4
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.PullConsumer;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import io.openmessaging.rocketmq.utils.BeanUtils;
+import io.openmessaging.rocketmq.utils.OMSUtil;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullTaskCallback;
+import org.apache.rocketmq.client.consumer.PullTaskContext;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+public class PullConsumerImpl implements PullConsumer {
+    private final DefaultMQPullConsumer rocketmqPullConsumer;
+    private final KeyValue properties;
+    private boolean started = false;
+    private String targetQueueName;
+    private final MQPullConsumerScheduleService pullConsumerScheduleService;
+    private final LocalMessageCache localMessageCache;
+    private final ClientConfig clientConfig;
+
+    final static Logger log = ClientLogger.getLog();
+
+    public PullConsumerImpl(final String queueName, final KeyValue properties) {
+        this.properties = properties;
+        this.targetQueueName = queueName;
+
+        this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
+
+        String consumerGroup = clientConfig.getRmqConsumerGroup();
+        if (null == consumerGroup || consumerGroup.isEmpty()) {
+            throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
+        }
+        pullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroup);
+
+        this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
+
+        String accessPoints = clientConfig.getOmsAccessPoints();
+        if (accessPoints == null || accessPoints.isEmpty()) {
+            throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
+        }
+        this.rocketmqPullConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
+
+        this.rocketmqPullConsumer.setConsumerGroup(consumerGroup);
+
+        int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes();
+        this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
+
+        String consumerId = OMSUtil.buildInstanceName();
+        this.rocketmqPullConsumer.setInstanceName(consumerId);
+        properties.put(PropertyKeys.CONSUMER_ID, consumerId);
+
+        this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
+    }
+
+    @Override
+    public KeyValue properties() {
+        return properties;
+    }
+
+    @Override
+    public Message poll() {
+        MessageExt rmqMsg = localMessageCache.poll();
+        return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
+    }
+
+    @Override
+    public Message poll(final KeyValue properties) {
+        MessageExt rmqMsg = localMessageCache.poll(properties);
+        return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
+    }
+
+    @Override
+    public void ack(final String messageId) {
+        localMessageCache.ack(messageId);
+    }
+
+    @Override
+    public void ack(final String messageId, final KeyValue properties) {
+        localMessageCache.ack(messageId);
+    }
+
+    @Override
+    public synchronized void startup() {
+        if (!started) {
+            try {
+                registerPullTaskCallback();
+                this.pullConsumerScheduleService.start();
+                this.localMessageCache.startup();
+            } catch (MQClientException e) {
+                throw new OMSRuntimeException("-1", e);
+            }
+        }
+        this.started = true;
+    }
+
+    private void registerPullTaskCallback() {
+        this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() {
+            @Override
+            public void doPullTask(final MessageQueue mq, final PullTaskContext context) {
+                MQPullConsumer consumer = context.getPullConsumer();
+                try {
+                    long offset = localMessageCache.nextPullOffset(mq);
+
+                    PullResult pullResult = consumer.pull(mq, "*",
+                        offset, localMessageCache.nextPullBatchNums());
+                    ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
+                        .getProcessQueueTable().get(mq);
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            if (pq != null) {
+                                pq.putMessage(pullResult.getMsgFoundList());
+                                for (final MessageExt messageExt : pullResult.getMsgFoundList()) {
+                                    localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq));
+                                }
+                            }
+                            break;
+                        default:
+                            break;
+                    }
+                    localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset());
+                } catch (Exception e) {
+                    log.error("A error occurred in pull message process.", e);
+                }
+            }
+        });
+    }
+
+    @Override
+    public synchronized void shutdown() {
+        if (this.started) {
+            this.localMessageCache.shutdown();
+            this.pullConsumerScheduleService.shutdown();
+            this.rocketmqPullConsumer.shutdown();
+        }
+        this.started = false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
new file mode 100644
index 0000000..f9b8058
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.MessageListener;
+import io.openmessaging.OMS;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.PushConsumer;
+import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.utils.BeanUtils;
+import io.openmessaging.rocketmq.utils.OMSUtil;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class PushConsumerImpl implements PushConsumer {
+    private final DefaultMQPushConsumer rocketmqPushConsumer;
+    private final KeyValue properties;
+    private boolean started = false;
+    private final Map<String, MessageListener> subscribeTable = new ConcurrentHashMap<>();
+    private final ClientConfig clientConfig;
+
+    public PushConsumerImpl(final KeyValue properties) {
+        this.rocketmqPushConsumer = new DefaultMQPushConsumer();
+        this.properties = properties;
+        this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
+
+        String accessPoints = clientConfig.getOmsAccessPoints();
+        if (accessPoints == null || accessPoints.isEmpty()) {
+            throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
+        }
+        this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
+
+        String consumerGroup = clientConfig.getRmqConsumerGroup();
+        if (null == consumerGroup || consumerGroup.isEmpty()) {
+            throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
+        }
+        this.rocketmqPushConsumer.setConsumerGroup(consumerGroup);
+        this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes());
+        this.rocketmqPushConsumer.setConsumeTimeout(clientConfig.getRmqMessageConsumeTimeout());
+        this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums());
+        this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums());
+
+        String consumerId = OMSUtil.buildInstanceName();
+        this.rocketmqPushConsumer.setInstanceName(consumerId);
+        properties.put(PropertyKeys.CONSUMER_ID, consumerId);
+
+        this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
+    }
+
+    @Override
+    public KeyValue properties() {
+        return properties;
+    }
+
+    @Override
+    public void resume() {
+        this.rocketmqPushConsumer.resume();
+    }
+
+    @Override
+    public void suspend() {
+        this.rocketmqPushConsumer.suspend();
+    }
+
+    @Override
+    public boolean isSuspended() {
+        return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();
+    }
+
+    @Override
+    public PushConsumer attachQueue(final String queueName, final MessageListener listener) {
+        this.subscribeTable.put(queueName, listener);
+        try {
+            this.rocketmqPushConsumer.subscribe(queueName, "*");
+        } catch (MQClientException e) {
+            throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer can't attach to %s.", queueName));
+        }
+        return this;
+    }
+
+    @Override
+    public synchronized void startup() {
+        if (!started) {
+            try {
+                this.rocketmqPushConsumer.start();
+            } catch (MQClientException e) {
+                throw new OMSRuntimeException("-1", e);
+            }
+        }
+        this.started = true;
+    }
+
+    @Override
+    public synchronized void shutdown() {
+        if (this.started) {
+            this.rocketmqPushConsumer.shutdown();
+        }
+        this.started = false;
+    }
+
+    class MessageListenerImpl implements MessageListenerConcurrently {
+
+        @Override
+        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList,
+            ConsumeConcurrentlyContext contextRMQ) {
+            MessageExt rmqMsg = rmqMsgList.get(0);
+            BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg);
+
+            MessageListener listener = PushConsumerImpl.this.subscribeTable.get(rmqMsg.getTopic());
+
+            if (listener == null) {
+                throw new OMSRuntimeException("-1",
+                    String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic()));
+            }
+
+            final KeyValue contextProperties = OMS.newKeyValue();
+            final CountDownLatch sync = new CountDownLatch(1);
+
+            contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+
+            ReceivedMessageContext context = new ReceivedMessageContext() {
+                @Override
+                public KeyValue properties() {
+                    return contextProperties;
+                }
+
+                @Override
+                public void ack() {
+                    sync.countDown();
+                    contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                        ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+                }
+
+                @Override
+                public void ack(final KeyValue properties) {
+                    sync.countDown();
+                    contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+                        properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+                }
+            };
+            long begin = System.currentTimeMillis();
+            listener.onMessage(omsMsg, context);
+            long costs = System.currentTimeMillis() - begin;
+            long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000;
+            try {
+                sync.await(Math.max(0, timeoutMills - costs), TimeUnit.MILLISECONDS);
+            } catch (InterruptedException ignore) {
+            }
+
+            return ConsumeConcurrentlyStatus.valueOf(contextProperties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
new file mode 100644
index 0000000..43f80ae
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.OMS;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+public class BytesMessageImpl implements BytesMessage {
+    private KeyValue headers;
+    private KeyValue properties;
+    private byte[] body;
+
+    public BytesMessageImpl() {
+        this.headers = OMS.newKeyValue();
+        this.properties = OMS.newKeyValue();
+    }
+
+    @Override
+    public byte[] getBody() {
+        return body;
+    }
+
+    @Override
+    public BytesMessage setBody(final byte[] body) {
+        this.body = body;
+        return this;
+    }
+
+    @Override
+    public KeyValue headers() {
+        return headers;
+    }
+
+    @Override
+    public KeyValue properties() {
+        return properties;
+    }
+
+    @Override
+    public Message putHeaders(final String key, final int value) {
+        headers.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putHeaders(final String key, final long value) {
+        headers.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putHeaders(final String key, final double value) {
+        headers.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putHeaders(final String key, final String value) {
+        headers.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putProperties(final String key, final int value) {
+        properties.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putProperties(final String key, final long value) {
+        properties.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putProperties(final String key, final double value) {
+        properties.put(key, value);
+        return this;
+    }
+
+    @Override
+    public Message putProperties(final String key, final String value) {
+        properties.put(key, value);
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java
new file mode 100644
index 0000000..7ce4a9b
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class ConsumeRequest {
+    private final MessageExt messageExt;
+    private final MessageQueue messageQueue;
+    private final ProcessQueue processQueue;
+    private long startConsumeTimeMillis;
+
+    public ConsumeRequest(final MessageExt messageExt, final MessageQueue messageQueue,
+        final ProcessQueue processQueue) {
+        this.messageExt = messageExt;
+        this.messageQueue = messageQueue;
+        this.processQueue = processQueue;
+    }
+
+    public MessageExt getMessageExt() {
+        return messageExt;
+    }
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+    public ProcessQueue getProcessQueue() {
+        return processQueue;
+    }
+
+    public long getStartConsumeTimeMillis() {
+        return startConsumeTimeMillis;
+    }
+
+    public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
+        this.startConsumeTimeMillis = startConsumeTimeMillis;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
new file mode 100644
index 0000000..3639a3f
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+public interface NonStandardKeys {
+    String CONSUMER_GROUP = "rmq.consumer.group";
+    String PRODUCER_GROUP = "rmq.producer.group";
+    String MAX_REDELIVERY_TIMES = "rmq.max.redelivery.times";
+    String MESSAGE_CONSUME_TIMEOUT = "rmq.message.consume.timeout";
+    String MAX_CONSUME_THREAD_NUMS = "rmq.max.consume.thread.nums";
+    String MIN_CONSUME_THREAD_NUMS = "rmq.min.consume.thread.nums";
+    String MESSAGE_CONSUME_STATUS = "rmq.message.consume.status";
+    String MESSAGE_DESTINATION = "rmq.message.destination";
+    String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums";
+    String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity";
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
new file mode 100644
index 0000000..228a9f0
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.SendResult;
+
+public class SendResultImpl implements SendResult {
+    private String messageId;
+    private KeyValue properties;
+
+    public SendResultImpl(final String messageId, final KeyValue properties) {
+        this.messageId = messageId;
+        this.properties = properties;
+    }
+
+    @Override
+    public String messageId() {
+        return messageId;
+    }
+
+    @Override
+    public KeyValue properties() {
+        return properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
new file mode 100644
index 0000000..8246bcd
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.MessageFactory;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.ServiceLifecycle;
+import io.openmessaging.exception.OMSMessageFormatException;
+import io.openmessaging.exception.OMSNotSupportedException;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.exception.OMSTimeOutException;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.utils.BeanUtils;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.slf4j.Logger;
+
+import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
+
+abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
+    final static Logger log = ClientLogger.getLog();
+    final KeyValue properties;
+    final DefaultMQProducer rocketmqProducer;
+    private boolean started = false;
+    final ClientConfig clientConfig;
+
+    AbstractOMSProducer(final KeyValue properties) {
+        this.properties = properties;
+        this.rocketmqProducer = new DefaultMQProducer();
+        this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
+
+        String accessPoints = clientConfig.getOmsAccessPoints();
+        if (accessPoints == null || accessPoints.isEmpty()) {
+            throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
+        }
+        this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
+        this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup());
+
+        String producerId = buildInstanceName();
+        this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout());
+        this.rocketmqProducer.setInstanceName(producerId);
+        this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
+        properties.put(PropertyKeys.PRODUCER_ID, producerId);
+    }
+
+    @Override
+    public synchronized void startup() {
+        if (!started) {
+            try {
+                this.rocketmqProducer.start();
+            } catch (MQClientException e) {
+                throw new OMSRuntimeException("-1", e);
+            }
+        }
+        this.started = true;
+    }
+
+    @Override
+    public synchronized void shutdown() {
+        if (this.started) {
+            this.rocketmqProducer.shutdown();
+        }
+        this.started = false;
+    }
+
+    OMSRuntimeException checkProducerException(String topic, String msgId, Throwable e) {
+        if (e instanceof MQClientException) {
+            if (e.getCause() != null) {
+                if (e.getCause() instanceof RemotingTimeoutException) {
+                    return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s",
+                        this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e);
+                } else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) {
+                    MQBrokerException brokerException = (MQBrokerException) e.getCause();
+                    return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s",
+                        topic, msgId, brokerException.getErrorMessage()), e);
+                }
+            }
+            // Exception thrown by local.
+            else {
+                MQClientException clientException = (MQClientException) e;
+                if (-1 == clientException.getResponseCode()) {
+                    return new OMSRuntimeException("-1", String.format("Topic does not exist, Topic=%s, msgId=%s",
+                        topic, msgId), e);
+                } else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) {
+                    return new OMSMessageFormatException("-1", String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s",
+                        topic, msgId), e);
+                }
+            }
+        }
+        return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e);
+    }
+
+    protected void checkMessageType(Message message) {
+        if (!(message instanceof BytesMessage)) {
+            throw new OMSNotSupportedException("-1", "Only BytesMessage is supported.");
+        }
+    }
+
+    @Override
+    public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body) {
+        BytesMessage bytesMessage = new BytesMessageImpl();
+        bytesMessage.setBody(body);
+        bytesMessage.headers().put(MessageHeader.TOPIC, topic);
+        return bytesMessage;
+    }
+
+    @Override
+    public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body) {
+        BytesMessage bytesMessage = new BytesMessageImpl();
+        bytesMessage.setBody(body);
+        bytesMessage.headers().put(MessageHeader.QUEUE, queue);
+        return bytesMessage;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
new file mode 100644
index 0000000..2c00c60
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.Producer;
+import io.openmessaging.Promise;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.SendResult;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.rocketmq.promise.DefaultPromise;
+import io.openmessaging.rocketmq.utils.OMSUtil;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendStatus;
+
+import static io.openmessaging.rocketmq.utils.OMSUtil.msgConvert;
+
+public class ProducerImpl extends AbstractOMSProducer implements Producer {
+
+    public ProducerImpl(final KeyValue properties) {
+        super(properties);
+    }
+
+    @Override
+    public KeyValue properties() {
+        return properties;
+    }
+
+    @Override
+    public SendResult send(final Message message) {
+        return send(message, this.rocketmqProducer.getSendMsgTimeout());
+    }
+
+    @Override
+    public SendResult send(final Message message, final KeyValue properties) {
+        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
+            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
+        return send(message, timeout);
+    }
+
+    private SendResult send(final Message message, long timeout) {
+        checkMessageType(message);
+        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+        try {
+            org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
+            if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
+                log.error(String.format("Send message to RocketMQ failed, %s", message));
+                throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
+            }
+            message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
+            return OMSUtil.sendResultConvert(rmqResult);
+        } catch (Exception e) {
+            log.error(String.format("Send message to RocketMQ failed, %s", message), e);
+            throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
+        }
+    }
+
+    @Override
+    public Promise<SendResult> sendAsync(final Message message) {
+        return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
+    }
+
+    @Override
+    public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
+        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
+            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
+        return sendAsync(message, timeout);
+    }
+
+    private Promise<SendResult> sendAsync(final Message message, long timeout) {
+        checkMessageType(message);
+        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+        final Promise<SendResult> promise = new DefaultPromise<>();
+        try {
+            this.rocketmqProducer.send(rmqMessage, new SendCallback() {
+                @Override
+                public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
+                    message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
+                    promise.set(OMSUtil.sendResultConvert(rmqResult));
+                }
+
+                @Override
+                public void onException(final Throwable e) {
+                    promise.setFailure(e);
+                }
+            }, timeout);
+        } catch (Exception e) {
+            promise.setFailure(e);
+        }
+        return promise;
+    }
+
+    @Override
+    public void sendOneway(final Message message) {
+        checkMessageType(message);
+        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+        try {
+            this.rocketmqProducer.sendOneway(rmqMessage);
+        } catch (Exception ignore) { //Ignore the oneway exception.
+        }
+    }
+
+    @Override
+    public void sendOneway(final Message message, final KeyValue properties) {
+        sendOneway(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
new file mode 100644
index 0000000..05225cc
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.SequenceProducer;
+import io.openmessaging.rocketmq.utils.OMSUtil;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.SendResult;
+
+public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {
+
+    private BlockingQueue<Message> msgCacheQueue;
+
+    public SequenceProducerImpl(final KeyValue properties) {
+        super(properties);
+        this.msgCacheQueue = new LinkedBlockingQueue<>();
+    }
+
+    @Override
+    public KeyValue properties() {
+        return properties;
+    }
+
+    @Override
+    public void send(final Message message) {
+        checkMessageType(message);
+        org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message);
+        try {
+            Validators.checkMessage(rmqMessage, this.rocketmqProducer);
+        } catch (MQClientException e) {
+            throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
+        }
+        msgCacheQueue.add(message);
+    }
+
+    @Override
+    public void send(final Message message, final KeyValue properties) {
+        send(message);
+    }
+
+    @Override
+    public synchronized void commit() {
+        List<Message> messages = new ArrayList<>();
+        msgCacheQueue.drainTo(messages);
+
+        List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>();
+
+        for (Message message : messages) {
+            rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
+        }
+
+        if (rmqMessages.size() == 0) {
+            return;
+        }
+
+        try {
+            SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
+            String[] msgIdArray = sendResult.getMsgId().split(",");
+            for (int i = 0; i < messages.size(); i++) {
+                Message message = messages.get(i);
+                message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
+            }
+        } catch (Exception e) {
+            throw checkProducerException("", "", e);
+        }
+    }
+
+    @Override
+    public synchronized void rollback() {
+        msgCacheQueue.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1d966b50/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
----------------------------------------------------------------------
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
new file mode 100644
index 0000000..c863ccf
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.promise;
+
+import io.openmessaging.Promise;
+import io.openmessaging.PromiseListener;
+import io.openmessaging.exception.OMSRuntimeException;
+import java.util.ArrayList;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultPromise<V> implements Promise<V> {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);
+    private final Object lock = new Object();
+    private volatile FutureState state = FutureState.DOING;
+    private V result = null;
+    private long timeout;
+    private long createTime;
+    private Throwable exception = null;
+    private List<PromiseListener<V>> promiseListenerList;
+
+    public DefaultPromise() {
+        createTime = System.currentTimeMillis();
+        promiseListenerList = new ArrayList<>();
+        timeout = 5000;
+    }
+
+    @Override
+    public boolean cancel(final boolean mayInterruptIfRunning) {
+        return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return state.isCancelledState();
+    }
+
+    @Override
+    public boolean isDone() {
+        return state.isDoneState();
+    }
+
+    @Override
+    public V get() {
+        return result;
+    }
+
+    @Override
+    public V get(final long timeout) {
+        synchronized (lock) {
+            if (!isDoing()) {
+                return getValueOrThrowable();
+            }
+
+            if (timeout <= 0) {
+                try {
+                    lock.wait();
+                } catch (Exception e) {
+                    cancel(e);
+                }
+                return getValueOrThrowable();
+            } else {
+                long waitTime = timeout - (System.currentTimeMillis() - createTime);
+                if (waitTime > 0) {
+                    for (;; ) {
+                        try {
+                            lock.wait(waitTime);
+                        } catch (InterruptedException e) {
+                            LOG.error("promise get value interrupted,excepiton:{}", e.getMessage());
+                        }
+
+                        if (!isDoing()) {
+                            break;
+                        } else {
+                            waitTime = timeout - (System.currentTimeMillis() - createTime);
+                            if (waitTime <= 0) {
+                                break;
+                            }
+                        }
+                    }
+                }
+
+                if (isDoing()) {
+                    timeoutSoCancel();
+                }
+            }
+            return getValueOrThrowable();
+        }
+    }
+
+    @Override
+    public boolean set(final V value) {
+        if (value == null)
+            return false;
+        this.result = value;
+        return done();
+    }
+
+    @Override
+    public boolean setFailure(final Throwable cause) {
+        if (cause == null)
+            return false;
+        this.exception = cause;
+        return done();
+    }
+
+    @Override
+    public void addListener(final PromiseListener<V> listener) {
+        if (listener == null) {
+            throw new NullPointerException("FutureListener is null");
+        }
+
+        boolean notifyNow = false;
+        synchronized (lock) {
+            if (!isDoing()) {
+                notifyNow = true;
+            } else {
+                if (promiseListenerList == null) {
+                    promiseListenerList = new ArrayList<>();
+                }
+                promiseListenerList.add(listener);
+            }
+        }
+
+        if (notifyNow) {
+            notifyListener(listener);
+        }
+    }
+
+    @Override
+    public Throwable getThrowable() {
+        return exception;
+    }
+
+    private void notifyListeners() {
+        if (promiseListenerList != null) {
+            for (PromiseListener<V> listener : promiseListenerList) {
+                notifyListener(listener);
+            }
+        }
+    }
+
+    private boolean isSuccess() {
+        return isDone() && (exception == null);
+    }
+
+    private void timeoutSoCancel() {
+        synchronized (lock) {
+            if (!isDoing()) {
+                return;
+            }
+            state = FutureState.CANCELLED;
+            exception = new RuntimeException("Get request result is timeout or interrupted");
+            lock.notifyAll();
+        }
+        notifyListeners();
+    }
+
+    private V getValueOrThrowable() {
+        if (exception != null) {
+            Throwable e = exception.getCause() != null ? exception.getCause() : exception;
+            throw new OMSRuntimeException("-1", e);
+        }
+        notifyListeners();
+        return result;
+    }
+
+    private boolean isDoing() {
+        return state.isDoingState();
+    }
+
+    private boolean done() {
+        synchronized (lock) {
+            if (!isDoing()) {
+                return false;
+            }
+
+            state = FutureState.DONE;
+            lock.notifyAll();
+        }
+
+        notifyListeners();
+        return true;
+    }
+
+    private void notifyListener(final PromiseListener<V> listener) {
+        try {
+            if (exception != null)
+                listener.operationFailed(this);
+            else
+                listener.operationCompleted(this);
+        } catch (Throwable t) {
+            LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
+        }
+    }
+
+    private boolean cancel(Exception e) {
+        synchronized (lock) {
+            if (!isDoing()) {
+                return false;
+            }
+
+            state = FutureState.CANCELLED;
+            exception = e;
+            lock.notifyAll();
+        }
+
+        notifyListeners();
+        return true;
+    }
+}
+